You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Dave Katten <dk...@geoforce.com> on 2014/05/09 17:42:55 UTC
Storm/Kafka/Trident Topology Dies in local and cluster modes
Hi All,
I'm having trouble getting my trident topology to reliably process when
using the OpaqueTridentKafkaSpout.
Local Mode
In local mode, I experience behavior similar to [1] - the topology runs
through 1-8 batches, and then slows to a stop, ultimately getting
messages starting with:
633053 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN
org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable
to read additional data from client sessionid 0x145dcad612b001b, likely
client has closed socket
then:
641356 [Thread-38-spout0-EventThread] INFO
com.netflix.curator.framework.state.ConnectionStateManager - State
change: SUSPENDED
641358 [ConnectionStateManager-0] WARN
com.netflix.curator.framework.state.ConnectionStateManager - There are
no ConnectionStateListeners registered.
And lastly:
675253 [main-EventThread] WARN backtype.storm.cluster - Received event
:disconnected::none: with disconnected Zookeeper.
677965 [main-EventThread] INFO
com.netflix.curator.framework.state.ConnectionStateManager - State
change: LOST
677965 [main-EventThread] WARN backtype.storm.cluster - Received event
:expired::none: with disconnected Zookeeper.
684088 [SyncThread:0] ERROR org.apache.zookeeper.server.NIOServerCnxn -
Unexpected Exception:
java.nio.channels.CancelledKeyException: null
at
sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
~[na:1.7.0_07]
at
sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
~[na:1.7.0_07]
at
org.apache.zookeeper.server.NIOServerCnxn.sendBuffer(NIOServerCnxn.java:418)
[zookeeper-3.3.3.jar:3.3.3-1073969]
at
org.apache.zookeeper.server.NIOServerCnxn.sendResponse(NIOServerCnxn.java:1509)
[zookeeper-3.3.3.jar:3.3.3-1073969]
at
org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:171)
[zookeeper-3.3.3.jar:3.3.3-1073969]
at
org.apache.zookeeper.server.SyncRequestProcessor.run(SyncRequestProcessor.java:135)
[zookeeper-3.3.3.jar:3.3.3-1073969]
918289 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN
org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable
to read additional data from client sessionid 0x0, likely client has
closed socket
(I can put more in a gist if necessary)
============================
It appears that the spout loses its connection to zookeeper for
maintaining the transactional state, but I am confused how that happens
when, in localmode, storm uses the in-process zookeeper.
I have tried connecting to both Kafka 0.7 and 0.8 instances (using
storm-kafka and the Wurstmeister jars, respectively), and both end the
same way. Additionally, I've replaced the kafka spout with a
FixedBatchSpout, and that runs just fine (which is why I think this has
something to do with my kafka spout configuration).
As for my kafka spout config, it's in Jruby, but it's a very basic
config with 1 static broker host. The only thing that's non standard is
that I have a custom scheme (written in Java) that parses the Kafka
message (which is JSON) and exposes the fields as named Values in the
tuple (so that I can do groupBy immediately after a tuple comes off of
the spout). But using the same Scheme, the FixedBatch spout works fine.
Cluster Mode
In cluster mode, I don't have the same crashes, but I do see in the log
where a worker reconnects to zookeeper and relaunches the worker. The
following occurs about 10 minutes into the deployment:
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:host.name=stormsingle1
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.version=1.7.0_55
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.vendor=Oracle Corporation
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55.x86_64/jre
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/gf_topology-2-1399643628/stormjar.jar
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.io.tmpdir=/tmp
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:java.compiler=<NA>
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.name=Linux
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:os.arch=amd64
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:os.version=2.6.32-431.el6.x86_64
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:user.name=storm
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client
environment:user.home=/home/storm
2014-05-09 14:15:47 o.a.z.ZooKeeper [INFO] Client environment:user.dir=/
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:host.name=stormsingle1
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.version=1.7.0_55
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.vendor=Oracle Corporation
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.55.x86_64/jre
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.class.path=/opt/storm/lib/junit-3.8.1.jar:/opt/storm/lib/minlog-1.2.jar:/opt/storm/lib/tools.macro-0.1.0.jar:/opt/storm/lib/servlet-api-2.5-20081211.jar:/opt/storm/lib/carbonite-1.3.2.jar:/opt/storm/lib/compojure-1.1.3.jar:/opt/storm/lib/clojure-1.4.0.jar:/opt/storm/lib/kryo-2.17.jar:/opt/storm/lib/tools.logging-0.2.3.jar:/opt/storm/lib/slf4j-api-1.6.5.jar:/opt/storm/lib/log4j-over-slf4j-1.6.6.jar:/opt/storm/lib/commons-logging-1.1.1.jar:/opt/storm/lib/clj-stacktrace-0.2.4.jar:/opt/storm/lib/httpcore-4.1.jar:/opt/storm/lib/hiccup-0.3.6.jar:/opt/storm/lib/netty-3.6.3.Final.jar:/opt/storm/lib/jgrapht-core-0.9.0.jar:/opt/storm/lib/commons-exec-1.1.jar:/opt/storm/lib/servlet-api-2.5.jar:/opt/storm/lib/ring-servlet-0.3.11.jar:/opt/storm/lib/clj-time-0.4.1.jar:/opt/storm/lib/objenesis-1.2.jar:/opt/storm/lib/jline-2.11.jar:/opt/storm/lib/meat-locker-0.3.1.jar:/opt/storm/lib/logback-classic-1.0.6.jar:/opt/storm/lib/json-simple-1.1.jar:/opt/storm/lib/curator-framework-1.0.1.jar:/opt/storm/lib/commons-io-1.4.jar:/opt/storm/lib/curator-client-1.0.1.jar:/opt/storm/lib/guava-13.0.jar:/opt/storm/lib/storm-core-0.9.1-incubating.jar:/opt/storm/lib/commons-fileupload-1.2.1.jar:/opt/storm/lib/logback-core-1.0.6.jar:/opt/storm/lib/math.numeric-tower-0.0.1.jar:/opt/storm/lib/snakeyaml-1.11.jar:/opt/storm/lib/tools.cli-0.2.2.jar:/opt/storm/lib/core.incubator-0.1.0.jar:/opt/storm/lib/zookeeper-3.3.3.jar:/opt/storm/lib/httpclient-4.1.1.jar:/opt/storm/lib/clout-1.0.1.jar:/opt/storm/lib/jetty-6.1.26.jar:/opt/storm/lib/ring-devel-0.3.11.jar:/opt/storm/lib/asm-4.0.jar:/opt/storm/lib/ring-jetty-adapter-0.3.11.jar:/opt/storm/lib/commons-codec-1.4.jar:/opt/storm/lib/ring-core-1.1.5.jar:/opt/storm/lib/joda-time-2.0.jar:/opt/storm/lib/commons-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:/opt/storm/conf:/app/storm/supervisor/stormdist/gf_topology-2-1399643628/stormjar.jar
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.io.tmpdir=/tmp
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:java.compiler=<NA>
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:os.name=Linux
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:os.arch=amd64
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:os.version=2.6.32-431.el6.x86_64
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:user.name=storm
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:user.home=/home/storm
2014-05-09 14:15:47 o.a.z.s.ZooKeeperServer [INFO] Server
environment:user.dir=/
2014-05-09 14:15:53 b.s.d.worker [INFO] Launching worker for
gf_topology-2-1399643628 on fca61061-ffad-41a2-8f93-8c87cbbe358b:6701
with id db592f15-cb7d-4fcd-8dff-9e2b4df3e973 and conf
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
"topology.tick.tuple.freq.secs" nil,
"topology.builtin.metrics.bucket.size.secs" 60,
"topology.fall.back.on.java.serialization" true,
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000,
"topology.skip.missing.kryo.registrations" false,
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts"
"-Xmx256m -Djava.net.preferIPv4Stack=true",
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
"topology.trident.batch.emit.interval.millis" 500,
"nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m
-Djava.net.preferIPv4Stack=true", "java.library.path"
"/usr/local/lib:/opt/local/lib:/usr/lib",
"topology.executor.send.buffer.size" 1024, "storm.local.dir"
"/app/storm", "storm.messaging.netty.buffer_size" 5242880,
"supervisor.worker.start.timeout.secs" 120,
"topology.enable.message.timeouts" true,
"nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs"
3600, "drpc.worker.threads" 64,
"topology.worker.shared.thread.pool.size" 4, "nimbus.host"
"stormsingle1", "storm.messaging.netty.min_wait_ms" 100,
"storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil,
"topology.executor.receive.buffer.size" 1024,
"transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
"storm.zookeeper.retry.intervalceiling.millis" 30000,
"supervisor.enable" true, "storm.messaging.netty.server_worker_threads"
1, "storm.zookeeper.servers" ["stormsingle1"],
"transactional.zookeeper.root" "/transactional",
"topology.acker.executors" nil, "topology.transfer.buffer.size" 1024,
"topology.worker.childopts" nil, "drpc.queue.size" 128,
"worker.childopts" "-Xmx256m -Djava.net.preferIPv4Stack=true",
"supervisor.heartbeat.frequency.secs" 5,
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port"
3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts"
"-Xmx768m", "topology.receiver.buffer.size" 8,
"task.heartbeat.frequency.secs" 3, "topology.tasks" nil,
"storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy"
"backtype.storm.spout.SleepSpoutWaitStrategy",
"nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending"
nil, "storm.zookeeper.retry.interval" 1000,
"topology.sleep.spout.wait.strategy.time.ms" 1,
"nimbus.topology.validator"
"backtype.storm.nimbus.DefaultTopologyValidator",
"supervisor.slots.ports" [6700 6701], "topology.debug" false,
"nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60,
"topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10,
"topology.workers" 1, "supervisor.childopts" "-Xmx256m
-Djava.net.preferIPv4Stack=true", "nimbus.thrift.port" 6627,
"topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1,
"topology.tuple.serializer"
"backtype.storm.serialization.types.ListDelegateSerializer",
"topology.disruptor.wait.strategy"
"com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs"
30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
"backtype.storm.serialization.DefaultKryoFactory",
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1,
"storm.zookeeper.retry.times" 5, "storm.thrift.transport"
"backtype.storm.security.auth.SimpleTransportPlugin",
"topology.state.synchronization.timeout.secs" 60,
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
600, "storm.messaging.transport"
"backtype.storm.messaging.netty.Context", "logviewer.appender.name"
"A1", "storm.messaging.netty.max_wait_ms" 1000,
"drpc.request.timeout.secs" 600, "storm.local.hostname" "stormsingle1",
"storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts"
"-Xmx256m -Djava.net.preferIPv4Stack=true", "storm.cluster.mode"
"distributed", "topology.optimize" true, "topology.max.task.parallelism"
nil}
2014-05-09 14:15:53 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-05-09 14:15:53 o.a.z.ZooKeeper [INFO] Initiating client connection,
connectString=stormsingle1:2181 sessionTimeout=20000
watcher=com.netflix.curator.ConnectionState@418d276b
2014-05-09 14:15:53 o.a.z.ClientCnxn [INFO] Opening socket connection to
server stormsingle1/127.0.0.1:2181
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Socket connection
established to stormsingle1/127.0.0.1:2181, initiating session
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Session establishment
complete on server stormsingle1/127.0.0.1:2181, sessionid =
0x145dd2a3780004c, negotiated timeout = 20000
2014-05-09 14:15:54 b.s.zookeeper [INFO] Zookeeper state update:
:connected:none
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] EventThread shut down
2014-05-09 14:15:54 o.a.z.ZooKeeper [INFO] Session: 0x145dd2a3780004c closed
2014-05-09 14:15:54 c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-05-09 14:15:54 o.a.z.ZooKeeper [INFO] Initiating client connection,
connectString=stormsingle1:2181/storm sessionTimeout=20000
watcher=com.netflix.curator.ConnectionState@36189ddf
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Opening socket connection to
server stormsingle1/127.0.0.1:2181
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Socket connection
established to stormsingle1/127.0.0.1:2181, initiating session
2014-05-09 14:15:54 o.a.z.ClientCnxn [INFO] Session establishment
complete on server stormsingle1/127.0.0.1:2181, sessionid =
0x145dd2a3780004d, negotiated timeout = 20000
===================
I'm just running 2 workers, and the other one will continue to run the
same batch over and over. Eg, here's a debug line:
2014-05-09 15:23:26 b.s.d.task [INFO] Emitting: spout0 s1 [2:67,
**FIELD1**,**FIELD2**,**FIELD3**,**FIELD4**]
Which I am assuming translates to the 67th attempt at batch 2 (or txid 2)
Questions
1. Why does my connection to zookeeper seem so tenuous? I've tried
in-process (in local mode), single node cluster vm (where
supervisors, workers, nimbus, and ZK live on the same VM), multi
node VMs (where nimbus and supervisors are on separate VMs on the
same host), in both a ZK VM configuration, and using a remote ZK
cluster config. I have ensured that /etc/hosts all point to all the
right things, which I know can be problematic.
2. I have a strong suspicion that my batches are taking too long to
complete, and thus the spout replays the same ginormous batch over
and over again. How can I tell the kafka spout to take, say, 100
msgs per batch? Or am I not understanding batching in Storm.
Thanks all.
Dave K
[1] https://groups.google.com/d/msg/storm-user/9qTq-6P9bys/xuCG3pz0yPUJ