You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/08/05 23:04:28 UTC

can't consumer kafka message

Hi, all

This is a follow-up message related to my another thread “kafka-spout running error” in which I described unable to run kafka consumer. Here I run jabbaugh’s consumer (https://github.com/jabbaugh/kafka-storm-consumer) , here I set KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092, KAFKA_TOPIC=topictest. Then I run 
 storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.example.trident.ExampleTopology ExampleTopology

Then I got similar error as my last post, not able to connect to zookeeper

3593 [Thread-9] INFO  backtype.storm.daemon.worker - Worker ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043 on 0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
3614 [Thread-26-$mastercoord-bg0] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
3626 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor - Opened spout $mastercoord-bg0:(1)
3629 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor - Activating spout $mastercoord-bg0:(1)
3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
	at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403) ~[storm-core-0.9.0.1.jar:na]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
	at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor - 
java.lang.NoSuchMethodError: kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
	at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60) ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
	at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65) ~[storm-core-0.9.0.1.jar:na]
	at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730) ~[storm-core-0.9.0.1.jar:na]
	at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403) ~[storm-core-0.9.0.1.jar:na]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
	at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
3719 [Thread-22-spout0] INFO  backtype.storm.util - Halting process: ("Worker died")
root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer# 

Can anyone point me what the problem is? thanks

Alec

Re: can't consumer kafka message

Posted by Sa Li <sa...@gmail.com>.
Hello, Marcelo

I have been able to print out the message getting from producer, what I did
is to make all the version of various cluster consistent. First I updated
the storm to 0.9.2-incubating, by checking the jar in ./storm/lib, we know
the zookeeper version is 3.4.5. I also upgrade kafka to Kafka0.8.1.1
compiled scala 2.9.2. In pom, I exclude the zookeeper in kafka, like
following,
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.1.1</version>
        <scope>compile</scope>
                              <!-- exclude the zookeeper package from Kafka
-->
                <exclusions>
                  <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                  </exclusion>
                  <exclusion>
                       <groupId>log4j</groupId>
                       <artifactId>log4j</artifactId>
                  </exclusion>
                  <exclusion>
                       <groupId>com.sun.jmx</groupId>
                       <artifactId>jmxri</artifactId>
                  </exclusion>
                  <exclusion>
                       <groupId>com.sun.jdmk</groupId>
                       <artifactId>jmxtools</artifactId>
                  </exclusion>
                  <exclusion>
                       <groupId>javax.jms</groupId>
                       <artifactId>jms</artifactId>
                  </exclusion>
                </exclusions>
    </dependency>

For storm-kafka, I use two packages:
 <dependency>
            <artifactId>storm-kafka</artifactId>
            <groupId>org.apache.storm</groupId>
            <version>0.9.2-incubating</version>
  <!--
            <scope>*compile*</scope>
  -->
                   <!-- exclude the zookeeper package from storm-Kafka -->
                <exclusions>
                   <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                   </exclusion>
                </exclusions>
    </dependency>

     <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.0-wip16a-scala292</version>
                        <!-- exclude the zookeeper package from storm-Kafka
-->
                <exclusions>
                   <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                   </exclusion>
                </exclusions>
     </dependency>


In term of my experience, I think for the starter of storm-kafka, we really
need to check the conflict of versions among different platforms.


Thanks

Alec


On Tue, Aug 12, 2014 at 2:01 AM, Marcelo Valle <mv...@redoop.org> wrote:

> Hello Alec,
>
> What version of kafka are you using? 0.7.2?
> And Storm Version ? 0.8.1?
>
> The consumer you are using is prepared for kafka 0.7.2 and Storm 0.8.1.
>
> If you are using Storm 0.9.1 (or 0.9.2) and Kafka 0.8.0 (or 0.8.1.1) you
> can try this example:
> https://github.com/mvalleavila/Storm-0.9.1-Kafka-0.8-Test
>
> I need to change some versions in pom.xml, but is working correctly with
> current versions.
>
> If you want to use storm-kafka native connector included in 0.9.2 version
> you can try this patch to avoid ClassNotFound Errors:
>
>
> https://github.com/buildoop/buildoop/blob/development/recipes/storm/storm-0.9.2_openbus-0.0.1-r1/rpm/sources/storm-kafka-dependencies.patch
>
> Regards
>
> Marcelo
>
>
>
>
> 2014-08-05 23:04 GMT+02:00 Sa Li <sa...@gmail.com>:
>
> Hi, all
>>
>> This is a follow-up message related to my another thread “kafka-spout
>> running error” in which I described unable to run kafka consumer. Here I
>> run jabbaugh’s consumer (https://github.com/jabbaugh/kafka-storm-consumer)
>> , here I set KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092,
>> KAFKA_TOPIC=topictest. Then I run
>>  storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> storm.example.trident.ExampleTopology ExampleTopology
>>
>> Then I got similar error as my last post, not able to connect to zookeeper
>>
>> 3593 [Thread-9] INFO  backtype.storm.daemon.worker - Worker
>> ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043
>> on 0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
>> 3614 [Thread-26-$mastercoord-bg0] INFO
>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 3626 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor -
>> Opened spout $mastercoord-bg0:(1)
>> 3629 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor -
>> Activating spout $mastercoord-bg0:(1)
>> 3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
>> java.lang.NoSuchMethodError:
>> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
>> at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>  at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor -
>> java.lang.NoSuchMethodError:
>> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
>>  at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>  at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at
>> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
>> ~[storm-core-0.9.0.1.jar:na]
>>  at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>  at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3719 [Thread-22-spout0] INFO  backtype.storm.util - Halting process:
>> ("Worker died")
>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer#
>>
>> Can anyone point me what the problem is? thanks
>>
>> Alec
>>
>
>

Re: can't consumer kafka message

Posted by Marcelo Valle <mv...@redoop.org>.
Hello Alec,

What version of kafka are you using? 0.7.2?
And Storm Version ? 0.8.1?

The consumer you are using is prepared for kafka 0.7.2 and Storm 0.8.1.

If you are using Storm 0.9.1 (or 0.9.2) and Kafka 0.8.0 (or 0.8.1.1) you
can try this example:
https://github.com/mvalleavila/Storm-0.9.1-Kafka-0.8-Test

I need to change some versions in pom.xml, but is working correctly with
current versions.

If you want to use storm-kafka native connector included in 0.9.2 version
you can try this patch to avoid ClassNotFound Errors:

https://github.com/buildoop/buildoop/blob/development/recipes/storm/storm-0.9.2_openbus-0.0.1-r1/rpm/sources/storm-kafka-dependencies.patch

Regards

Marcelo




2014-08-05 23:04 GMT+02:00 Sa Li <sa...@gmail.com>:

> Hi, all
>
> This is a follow-up message related to my another thread “kafka-spout
> running error” in which I described unable to run kafka consumer. Here I
> run jabbaugh’s consumer (https://github.com/jabbaugh/kafka-storm-consumer)
> , here I set KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092,
> KAFKA_TOPIC=topictest. Then I run
>  storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar
> storm.example.trident.ExampleTopology ExampleTopology
>
> Then I got similar error as my last post, not able to connect to zookeeper
>
> 3593 [Thread-9] INFO  backtype.storm.daemon.worker - Worker
> ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043
> on 0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
> 3614 [Thread-26-$mastercoord-bg0] INFO
> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
> 3626 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor -
> Opened spout $mastercoord-bg0:(1)
> 3629 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor -
> Activating spout $mastercoord-bg0:(1)
> 3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
> java.lang.NoSuchMethodError:
> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor -
> java.lang.NoSuchMethodError:
> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3719 [Thread-22-spout0] INFO  backtype.storm.util - Halting process:
> ("Worker died")
> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer#
>
> Can anyone point me what the problem is? thanks
>
> Alec
>