You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Dave Katten <dk...@geoforce.com> on 2014/05/09 17:42:55 UTC

Storm/Kafka/Trident Topology Dies in local and cluster modes

Hi All,
I'm having trouble getting my trident topology to reliably process when 
using the OpaqueTridentKafkaSpout.

Local Mode
In local mode, I experience behavior similar to [1] - the topology runs 
through 1-8 batches, and then slows to a stop, ultimately getting 
messages starting with:

633053 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN  
org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable 
to read additional data from client sessionid 0x145dcad612b001b, likely 
client has closed socket

then:

641356 [Thread-38-spout0-EventThread] INFO  
com.netflix.curator.framework.state.ConnectionStateManager - State 
change: SUSPENDED
641358 [ConnectionStateManager-0] WARN  
com.netflix.curator.framework.state.ConnectionStateManager - There are 
no ConnectionStateListeners registered.

And lastly:

675253 [main-EventThread] WARN  backtype.storm.cluster - Received event 
:disconnected::none: with disconnected Zookeeper.
677965 [main-EventThread] INFO  
com.netflix.curator.framework.state.ConnectionStateManager - State 
change: LOST
677965 [main-EventThread] WARN  backtype.storm.cluster - Received event 
:expired::none: with disconnected Zookeeper.
684088 [SyncThread:0] ERROR org.apache.zookeeper.server.NIOServerCnxn - 
Unexpected Exception:
java.nio.channels.CancelledKeyException: null
     at 
sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) 
~[na:1.7.0_07]
     at 
sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77) 
~[na:1.7.0_07]
     at 
org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:418) 
[zookeeper-3.3.3.jar:3.3.3-1073969]
     at 
org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1509) 
[zookeeper-3.3.3.jar:3.3.3-1073969]
     at 
org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:171) 
[zookeeper-3.3.3.jar:3.3.3-1073969]
     at 
org.apache.zookeeper.server.SyncRequestProcessor.run(SyncRequestProcessor.java:135) 
[zookeeper-3.3.3.jar:3.3.3-1073969]

918289 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN  
org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable 
to read additional data from client sessionid 0x0, likely client has 
closed socket

(I can put more in a gist if necessary)
============================

It appears that the spout loses its connection to zookeeper for 
maintaining the transactional state, but I am confused how that happens 
when, in localmode, storm uses the in-process zookeeper.

I have tried connecting to both Kafka 0.7 and 0.8 instances (using 
storm-kafka and the Wurstmeister jars, respectively), and both end the 
same way. Additionally, I've replaced the kafka spout with a 
FixedBatchSpout, and that runs just fine (which is why I think this has 
something to do with my kafka spout configuration).

As for my kafka spout config, it's in Jruby, but it's a very basic 
config with 1 static broker host. The only thing that's non standard is 
that I have a custom scheme (written in Java) that parses the Kafka 
message (which is JSON) and exposes the fields as named Values in the 
tuple (so that I can do groupBy immediately after a tuple comes off of 
the spout). But using the same Scheme, the FixedBatch spout works fine.

Cluster Mode

In cluster mode, I don't have the same crashes, but I do see in the log 
where a worker reconnects to zookeeper and relaunches the worker. The 
following occurs about 10 minutes into the deployment:

2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:host.name=stormsingle1
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.version=1.7.0_55
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.vendor=Oracle Corporation
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55.x86_64/jre
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/gf_topology-2-1399643628/stormjar.jar
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.io.tmpdir=/tmp
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:java.compiler=<NA>
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.name=Linux
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.arch=amd64
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:os.version=2.6.32-431.el6.x86_64
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:user.name=storm
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client 
environment:user.home=/home/storm
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:user.dir=/
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:host.name=stormsingle1
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.version=1.7.0_55
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.vendor=Oracle Corporation
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55.x86_64/jre
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/gf_topology-2-1399643628/stormjar.jar
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.io.tmpdir=/tmp
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:java.compiler=<NA>
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:os.name=Linux
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:os.arch=amd64
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:os.version=2.6.32-431.el6.x86_64
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:user.name=storm
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:user.home=/home/storm
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server 
environment:user.dir=/
2014-05-09 14:15:53 b.s.d.worker [INFO] Launching worker for 
gf_topology-2-1399643628 on fca61061-ffad-41a2-8f93-8c87cbbe358b:6701 
with id db592f15-cb7d-4fcd-8dff-9e2b4df3e973 and conf 
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
"topology.tick.tuple.freq.secs" nil, 
"topology.builtin.metrics.bucket.size.secs" 60, 
"topology.fall.back.on.java.serialization" true, 
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, 
"topology.skip.missing.kryo.registrations" false, 
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
"-Xmx256m -Djava.net.preferIPv4Stack=true", 
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
"topology.trident.batch.emit.interval.millis" 500, 
"nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m 
-Djava.net.preferIPv4Stack=true", "java.library.path" 
"/usr/local/lib:/opt/local/lib:/usr/lib", 
"topology.executor.send.buffer.size" 1024, "storm.local.dir" 
"/app/storm", "storm.messaging.netty.buffer_size" 5242880, 
"supervisor.worker.start.timeout.secs" 120, 
"topology.enable.message.timeouts" true, 
"nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 
3600, "drpc.worker.threads" 64, 
"topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
"stormsingle1", "storm.messaging.netty.min_wait_ms" 100, 
"storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, 
"topology.executor.receive.buffer.size" 1024, 
"transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
"storm.zookeeper.retry.intervalceiling.millis" 30000, 
"supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 
1, "storm.zookeeper.servers" ["stormsingle1"], 
"transactional.zookeeper.root" "/transactional", 
"topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, 
"topology.worker.childopts" nil, "drpc.queue.size" 128, 
"worker.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true", 
"supervisor.heartbeat.frequency.secs" 5, 
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 
3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" 
"-Xmx768m", "topology.receiver.buffer.size" 8, 
"task.heartbeat.frequency.secs" 3, "topology.tasks" nil, 
"storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" 
"backtype.storm.spout.SleepSpoutWaitStrategy", 
"nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" 
nil, "storm.zookeeper.retry.interval" 1000, 
"topology.sleep.spout.wait.strategy.time.ms" 1, 
"nimbus.topology.validator" 
"backtype.storm.nimbus.DefaultTopologyValidator", 
"supervisor.slots.ports" [6700 6701], "topology.debug" false, 
"nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, 
"topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
"topology.workers" 1, "supervisor.childopts" "-Xmx256m 
-Djava.net.preferIPv4Stack=true", "nimbus.thrift.port" 6627, 
"topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, 
"topology.tuple.serializer" 
"backtype.storm.serialization.types.ListDelegateSerializer", 
"topology.disruptor.wait.strategy" 
"com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 
30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
"backtype.storm.serialization.DefaultKryoFactory", 
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
"storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
"backtype.storm.security.auth.SimpleTransportPlugin", 
"topology.state.synchronization.timeout.secs" 60, 
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
600, "storm.messaging.transport" 
"backtype.storm.messaging.netty.Context", "logviewer.appender.name" 
"A1", "storm.messaging.netty.max_wait_ms" 1000, 
"drpc.request.timeout.secs" 600, "storm.local.hostname" "stormsingle1", 
"storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" 
"-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.cluster.mode" 
"distributed", "topology.optimize" true, "topology.max.task.parallelism" 
nil}
2014-05-09 14:15:53 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-05-09 14:15:53 o.a.z.ZooKeeper [INFO] Initiating client connection, 
connectString=stormsingle1:2181 sessionTimeout=20000 
watcher=com.netflix.curator.ConnectionState@418d276b
2014-05-09 14:15:53 o.a.z.ClientCnxn [INFO] Opening socket connection to 
server stormsingle1/127.0.0.1:2181
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Socket connection 
established to stormsingle1/127.0.0.1:2181, initiating session
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Session establishment 
complete on server stormsingle1/127.0.0.1:2181, sessionid = 
0x145dd2a3780004c, negotiated timeout = 20000
2014-05-09 14:15:54 b.s.zookeeper [INFO] Zookeeper state update: 
:connected:none
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] EventThread shut down
2014-05-09 14:15:54 o.a.z.ZooKeeper [INFO] Session: 0x145dd2a3780004c closed
2014-05-09 14:15:54 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-05-09 14:15:54 o.a.z.ZooKeeper [INFO] Initiating client connection, 
connectString=stormsingle1:2181/storm sessionTimeout=20000 
watcher=com.netflix.curator.ConnectionState@36189ddf
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Opening socket connection to 
server stormsingle1/127.0.0.1:2181
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Socket connection 
established to stormsingle1/127.0.0.1:2181, initiating session
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Session establishment 
complete on server stormsingle1/127.0.0.1:2181, sessionid = 
0x145dd2a3780004d, negotiated timeout = 20000

===================

I'm just running 2 workers, and the other one will continue to run the 
same batch over and over. Eg, here's a debug line:

2014-05-09 15:23:26 b.s.d.task [INFO] Emitting: spout0 s1 [2:67, 
**FIELD1**,**FIELD2**,**FIELD3**,**FIELD4**]

Which I am assuming translates to the 67th attempt at batch 2 (or txid 2)

Questions

 1. Why does my connection to zookeeper seem so tenuous? I've tried
    in-process (in local mode), single node cluster vm (where
    supervisors, workers, nimbus, and ZK live on the same VM), multi
    node VMs (where nimbus and supervisors are on separate VMs on the
    same host), in both a ZK VM configuration, and using a remote ZK
    cluster config. I have ensured that /etc/hosts all point to all the
    right things, which I know can be problematic.
 2. I have a strong suspicion that my batches are taking too long to
    complete, and thus the spout replays the same ginormous batch over
    and over again. How can I tell the kafka spout to take, say, 100
    msgs per batch? Or am I not understanding batching in Storm.

Thanks all.
Dave K


[1] https://groups.google.com/d/msg/storm-user/9qTq-6P9bys/xuCG3pz0yPUJ