You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Dominik Safaric <do...@gmail.com> on 2016/09/16 19:40:00 UTC

KafkaSpout fails on open()

Hi,

For the past two days, I’ve been tying to implement a KafkaSpout within our topology. Here 
are some important information.

All three services are running on the same instance. Kafka’s brokers use as by default the 9092 
port, with advertised.listeners set to PLAINTEXT://localhost:9092. Zookeeper, uses the default 
client port 2181. Whereas the Storm Nimbus host name has been set to localhost as well.

A custom Kafka Producer creates log messages successfully, whereas by using the zkCli 
Zookeeper script I’ve seen that when using the /brokers path, the partitions and other relevant 
information are stored correctly.

However, I keep getting the error when activating, and afterwards monitoring the topology. 
Here is the source code of the Storm topology I’ve implemented:

BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");

SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes");

StormTopology topology = builder.createTopology();

Config config = new Config();

StormSubmitter.submitTopology("topology", config, topology);

However, the error message I keep getting when executing the bin/storm monitor <topology_name> 
-m bytes is the following:

Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)

Whereas by inspecting the logs of the workers (the worker.log file), I’ve concluded that 
the KafkaSpout fails on the open() method. 

java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more

Could someone explain what might be the reason for the KafkaSpout to fail on the 
open() method? 

I would really appreciate for your help!

Thanks in advance,
Dominik