You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Santhosh Kumar Sagi <sa...@gmail.com> on 2017/02/01 12:28:38 UTC

Storm 1.0.1 - No integration with Kafka 0.10.1.1, pointing to Kafka 0.9.x

Hi team,

I am developing a sample project for Storm - Kafka integration, where I am
enabling SSL and ACL feature for Kafka - using Kafka 0.10.x.

I have used storm src code for storm-kafka-client to check for new
KafkaSpout that supports for Kafka streaming data. But, I am running into
multiple issues while testing the code for same.


1. Getting below exception while I try to run Storm Topology with 1.0.1 and
Kafka-client 0.10.1.1

25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser -
Kafka version : 0.9.0.1
25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser -
Kafka commitId : 23c69d62a0cabf06
25974 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Async loop
died!
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
25994 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.d.executor -
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
26028 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Halting
process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.RestFn.invoke(RestFn.java:423)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at
org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at clojure.lang.AFn.run(AFn.java:22)
[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]


It seems that the Storm-Kafka-Client 1.0.2 (latest) is having dependency
with Kafka version 0.9.0.1. Even when I try to resolve the dependency using
0.10.1.1, while running topology for sending data through Kafka Producer is
throwing the above exception.


Please do suggest any other workaround if available. As, I see that there
latest version of storm 1.1.0 is having the dependency for Kafka 0.10, but
which is NOT available in GIT. And I am help less here to test the changes
further.

Do let me know any further information required from my side to resolve
this. I'll be sending as attachments. PFA my POM.xml used for the project.

Expecting an early reply.

Thanks,
Santhosh

Re: Storm 1.0.1 - No integration with Kafka 0.10.1.1, pointing to Kafka 0.9.x

Posted by Stig Rohde Døssing <st...@gmail.com>.
Hi Santhosh,

I don't think you need to rewrite the storm-kafka-client pom to switch
Kafka versions. I've attached a minimal pom that should work for what you
want to do. You'll need to use storm-kafka-client 1.1.0, since 1.0.2 isn't
compatible with Kafka 0.10.

Note that the pom bundles kafka-client in the resulting jar. If you want to
generate a smaller jar, you can declare the client as "provided" scope, and
copy the Kafka library into your cluster's /extlib. See
https://github.com/apache/storm/blob/1.x-branch/examples/storm-kafka-client-examples/pom.xml
for a pom that does it that way.

2017-02-01 13:28 GMT+01:00 Santhosh Kumar Sagi <sa...@gmail.com>:

> Hi team,
>
> I am developing a sample project for Storm - Kafka integration, where I am
> enabling SSL and ACL feature for Kafka - using Kafka 0.10.x.
>
> I have used storm src code for storm-kafka-client to check for new
> KafkaSpout that supports for Kafka streaming data. But, I am running into
> multiple issues while testing the code for same.
>
>
> 1. Getting below exception while I try to run Storm Topology with 1.0.1
> and Kafka-client 0.10.1.1
>
> 25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser
> - Kafka version : 0.9.0.1
> 25959 [Thread-23-kafka_spout-executor[4 4]] INFO  o.a.k.c.u.AppInfoParser
> - Kafka commitId : 23c69d62a0cabf06
> 25974 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Async loop
> died!
> java.lang.NoSuchMethodError: org.apache.kafka.clients.
> consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/
> clients/consumer/ConsumerRebalanceListener;)V
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
> 25994 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.d.executor -
> java.lang.NoSuchMethodError: org.apache.kafka.clients.
> consumer.KafkaConsumer.subscribe(Ljava/util/Collection;Lorg/apache/kafka/
> clients/consumer/ConsumerRebalanceListener;)V
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.subscribe(AutomaticKafkaRecordsFetcher.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> AutomaticKafkaRecordsFetcher.<init>(AutomaticKafkaRecordsFetcher.java:45)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.internal.fetcher.
> KafkaRecordsFetchers.create(KafkaRecordsFetchers.java:51)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.KafkaSpout.subscribeKafkaConsumer(KafkaSpout.java:415)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.kafka.spout.KafkaSpout.activate(KafkaSpout.java:407)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:640)
> ~[storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
> 26028 [Thread-23-kafka_spout-executor[4 4]] ERROR o.a.s.util - Halting
> process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>     at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.RestFn.invoke(RestFn.java:423)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.daemon.executor$mk_executor_data$fn__
> 7773$fn__7774.invoke(executor.clj:271) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494)
> [storm-kafka-client-2.0.0-SNAPSHOT-jar-with-dependencies.jar:?]
>     at clojure.lang.AFn.run(AFn.java:22) [storm-kafka-client-2.0.0-
> SNAPSHOT-jar-with-dependencies.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
>
>
> It seems that the Storm-Kafka-Client 1.0.2 (latest) is having dependency
> with Kafka version 0.9.0.1. Even when I try to resolve the dependency using
> 0.10.1.1, while running topology for sending data through Kafka Producer is
> throwing the above exception.
>
>
> Please do suggest any other workaround if available. As, I see that there
> latest version of storm 1.1.0 is having the dependency for Kafka 0.10, but
> which is NOT available in GIT. And I am help less here to test the changes
> further.
>
> Do let me know any further information required from my side to resolve
> this. I'll be sending as attachments. PFA my POM.xml used for the project.
>
> Expecting an early reply.
>
> Thanks,
> Santhosh
>