You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Dominik Safaric <do...@gmail.com> on 2016/09/16 19:40:00 UTC
KafkaSpout fails on open()
Hi,
For the past two days, I’ve been tying to implement a KafkaSpout within our topology. Here
are some important information.
All three services are running on the same instance. Kafka’s brokers use as by default the 9092
port, with advertised.listeners set to PLAINTEXT://localhost:9092. Zookeeper, uses the default
client port 2181. Whereas the Storm Nimbus host name has been set to localhost as well.
A custom Kafka Producer creates log messages successfully, whereas by using the zkCli
Zookeeper script I’ve seen that when using the /brokers path, the partitions and other relevant
information are stored correctly.
However, I keep getting the error when activating, and afterwards monitoring the topology.
Here is the source code of the Storm topology I’ve implemented:
BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", "bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new KafkaByteProcessingBolt()).shuffleGrouping("bytes");
StormTopology topology = builder.createTopology();
Config config = new Config();
StormSubmitter.submitTopology("topology", config, topology);
However, the error message I keep getting when executing the bin/storm monitor <topology_name>
-m bytes is the following:
Exception in thread "main" java.lang.IllegalArgumentException: stream: default not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)
Whereas by inspecting the logs of the workers (the worker.log file), I’ve concluded that
the KafkaSpout fails on the open() method.
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more
Could someone explain what might be the reason for the KafkaSpout to fail on the
open() method?
I would really appreciate for your help!
Thanks in advance,
Dominik