You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Rick Kellogg (JIRA)" <ji...@apache.org> on 2015/09/26 04:49:04 UTC

[jira] [Updated] (STORM-86) KafkaSpout in storm 0.9.0_rc2 does not work

     [ https://issues.apache.org/jira/browse/STORM-86?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rick Kellogg updated STORM-86:
------------------------------
    Component/s: storm-kafka

> KafkaSpout in storm 0.9.0_rc2 does not work
> -------------------------------------------
>
>                 Key: STORM-86
>                 URL: https://issues.apache.org/jira/browse/STORM-86
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>            Reporter: James Xu
>             Fix For: 0.9.2-incubating
>
>
> https://github.com/nathanmarz/storm/issues/723     
> <dependency>
>                     <groupId>storm</groupId>
>                     <artifactId>storm-kafka</artifactId>
>                     <version>0.9.0-wip16a-scala292</version>
>             </dependency>
> {code}
> BrokerHosts hosts = KafkaConfig.StaticHosts.fromHostString(ImmutableList.of("localhost"), 1);
> SpoutConfig conf = new SpoutConfig(hosts, "words", "/kafkastorm", "myId");
> KafkaSpout kafkaSpout = new KafkaSpout(conf);
> return kafkaSpout;
> }
> public static void main (String [] args){
> Serializable spout = makeSpout();
> TridentTopology topology = new TridentTopology();
> Stream stream = null;
> if (spout instanceof IBatchSpout) {
>   stream = topology.newStream("simplespout", (IBatchSpout) spout);
> } else if (spout instanceof IRichSpout){
>   stream = topology.newStream("simplespout", (IRichSpout) spout);
> }
> Stream words = stream.each(new Fields("bytes"), new Split(), new Fields("word"));
> {code}
> 5947 [Thread-34-spout0] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.RuntimeException: TopologyContext.registerMetric can only be called from within overridden IBolt::prepare() or ISpout::open() method.
> at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729) ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc2.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:722) [na:1.7.0_13]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)