You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Chris Bedford <ch...@buildlackey.com> on 2014/06/19 10:08:37 UTC
When i set TOPOLOGY_TICK_TUPLE_FREQ_SECS into my Storm config, I get
"Non-system tuples should never be sent to __system bolt"
Hello, Storm experts:
I'm trying to use the system 'ticks' in Storm by setting
TOPOLOGY_TICK_TUPLE_FREQ_SECS and then acting on the
received tick tuples. However, I find that when I set
TOPOLOGY_TICK_TUPLE_FREQ_SECS into the config
of even a very very simple topology (one dummy spout that just emits to
nowhere), i get the following error:
"Non-system tuples should never be sent to __system bolt"
I see the following in the log:
10665 [Thread-21-__system] INFO backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __tick, id: {}, [1]
followed by this trace:
java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
should never be sent to __system bolt.
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
~[storm-core-0.9.0-rc2.jar:na]
at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
~[storm-core-0.9.0-rc2.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
Caused by: java.lang.RuntimeException: Non-system tuples should never be
sent to __system bolt.
at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0-rc2.jar:na]
... 6 common frames omitted
I certainly did not wire anything to send tuples to the system bolt, as you
can see from the simple test topology i'm including below.
I'm wondering if there is any other set up that i need to do. I'm also
enclosing most of the log output (sorry it is long) just in case
that is helpful.
Thanks in advance to whoever can shed light on this...
- chris
>>>>> The CODE -- a testNG test in groovy... but it can be changed to
java with a few semicolons here and there ... >>>>
package com.example
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.TopologyBuilder
import backtype.storm.topology.base.BaseRichSpout
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Values
import backtype.storm.utils.Utils
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testng.annotations.Test
/*
* Author: cbedford
* Date: 6/18/14
* Time: 10:05 PM
*/
public class TickTest {
public static class DummySpout extends BaseRichSpout {
public static Logger LOG =
LoggerFactory.getLogger(DummySpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
private static Integer count = 0;
public DummySpout() {
this(true);
}
public DummySpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {}
public void nextTuple() {
Utils.sleep(100);
count += 1;
final Values values =
new Values(
"commandFoo",
"sourceFoo");
_collector.emit(values);
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
final Fields fields = new Fields("_command", "_source");
declarer.declare(fields);
}
@Override
public Map<String, Object> getComponentConfiguration() {
if (!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
}
private static Logger logger = LoggerFactory.getLogger(
TickTest.class.name)
@Test(enabled = true)
public void verifyClockTicks() {
protected final LocalCluster cluster = new LocalCluster();
Config config = new Config();
String topology_message_timeout_secs =
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
String topology_tick_tuple_freq_secs =
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
config.put(topology_message_timeout_secs, 200);
config.put(topology_tick_tuple_freq_secs, 1);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("dummy", new DummySpout())
config.setDebug(true);
cluster.submitTopology("test", config, builder.createTopology());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.shutdown()
}
}
>>>>> The (more or less) full log output >>>
7953 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN
org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable
to read additional data from client sessionid 0x146b32327350008, likely
client has closed socket
7981 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor
with id b8bcdc08-7580-41ff-b117-8009431d6e5c at host chris-ubuntu-64
8276 [main] INFO backtype.storm.daemon.nimbus - Received topology
submission for test with conf {"topology.max.task.parallelism" nil,
"topology.acker.executors" nil, "topology.kryo.register" nil,
"topology.kryo.decorators" (), "topology.name" "test", "storm.id"
"test-1-1403164765", "topology.debug" true, "topology.tick.tuple.freq.secs"
1, "topology.message.timeout.secs" 200}
8418 [main] INFO backtype.storm.daemon.nimbus - Activating test:
test-1-1403164765
8592 [main] INFO backtype.storm.scheduler.EvenScheduler - Available slots:
(["660a2e2b-ac98-411e-bfcd-705bcb033bca" 1]
["660a2e2b-ac98-411e-bfcd-705bcb033bca" 2]
["660a2e2b-ac98-411e-bfcd-705bcb033bca" 3]
["b8bcdc08-7580-41ff-b117-8009431d6e5c" 4]
["b8bcdc08-7580-41ff-b117-8009431d6e5c" 5]
["b8bcdc08-7580-41ff-b117-8009431d6e5c" 6])
8640 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment for
topology id test-1-1403164765:
#backtype.storm.daemon.common.Assignment{:master-code-dir
"/tmp/850b6bc1-af6c-40a6-a60e-5f5efc000218/nimbus/stormdist/test-1-1403164765",
:node->host {"660a2e2b-ac98-411e-bfcd-705bcb033bca" "chris-ubuntu-64"},
:executor->node+port {[1 1] ["660a2e2b-ac98-411e-bfcd-705bcb033bca" 1], [2
2] ["660a2e2b-ac98-411e-bfcd-705bcb033bca" 1]}, :executor->start-time-secs
{[2 2] 1403164765, [1 1] 1403164765}}
8881 [Thread-6] INFO backtype.storm.daemon.supervisor - Downloading code
for storm id test-1-1403164765 from
/tmp/850b6bc1-af6c-40a6-a60e-5f5efc000218/nimbus/stormdist/test-1-1403164765
8901 [Thread-6] INFO backtype.storm.daemon.supervisor - Extracting
resources from jar at
/home/chris/Dropbox/3rdparty/idea-IU-129.1359/plugins/testng/lib/testng-plugin.jar
to
/tmp/b26847ea-5082-4ef0-b48d-53c3b6d96a94/supervisor/stormdist/test-1-1403164765/resources
8976 [Thread-6] INFO backtype.storm.daemon.supervisor - Finished
downloading code for storm id test-1-1403164765 from
/tmp/850b6bc1-af6c-40a6-a60e-5f5efc000218/nimbus/stormdist/test-1-1403164765
9012 [Thread-7] INFO backtype.storm.daemon.supervisor - Launching worker
with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id
"test-1-1403164765", :executors ([1 1] [2 2])} for this supervisor
660a2e2b-ac98-411e-bfcd-705bcb033bca on port 1 with id
80a0f02c-771f-46d3-91fd-3decdbd77ee3
9021 [Thread-7] INFO backtype.storm.daemon.worker - Launching worker for
test-1-1403164765 on 660a2e2b-ac98-411e-bfcd-705bcb033bca:1 with id
80a0f02c-771f-46d3-91fd-3decdbd77ee3 and conf {"dev.zookeeper.path"
"/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil,
"topology.builtin.metrics.bucket.size.secs" 60,
"topology.fall.back.on.java.serialization" true,
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
"topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m",
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
"topology.trident.batch.emit.interval.millis" 50,
"nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
"java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib",
"topology.executor.send.buffer.size" 1024, "storm.local.dir"
"/tmp/b26847ea-5082-4ef0-b48d-53c3b6d96a94",
"supervisor.worker.start.timeout.secs" 120,
"topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
"topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost",
"storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil,
"topology.executor.receive.buffer.size" 1024,
"transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
"storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
true, "storm.zookeeper.servers" ["localhost"],
"transactional.zookeeper.root" "/transactional", "topology.acker.executors"
nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
"drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
"supervisor.heartbeat.frequency.secs" 5,
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
"topology.tasks" nil, "topology.spout.wait.strategy"
"backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
nil, "storm.zookeeper.retry.interval" 1000, "
topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
(1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120,
"nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
"task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
"-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
"worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
"backtype.storm.serialization.types.ListDelegateSerializer",
"topology.disruptor.wait.strategy"
"com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
"storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
"backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
5, "storm.thrift.transport"
"backtype.storm.security.auth.SimpleTransportPlugin",
"topology.state.synchronization.timeout.secs" 60,
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
600, "storm.messaging.transport" "backtype.storm.messaging.zmq",
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
"topology.optimize" true, "topology.max.task.parallelism" nil}
9022 [Thread-7] INFO
com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
9044 [Thread-7-EventThread] INFO backtype.storm.zookeeper - Zookeeper
state update: :connected:none
9051 [Thread-7] INFO
com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
9575 [Thread-7] INFO backtype.storm.daemon.executor - Loading executor
dummy:[2 2]
9590 [Thread-7] INFO backtype.storm.daemon.task - Emitting: dummy __system
["startup"]
9591 [Thread-7] INFO backtype.storm.daemon.executor - Loaded executor
tasks dummy:[2 2]
9631 [Thread-19-dummy] INFO backtype.storm.daemon.executor - Opening spout
dummy:(2)
9634 [Thread-7] INFO backtype.storm.daemon.executor - Finished loading
executor dummy:[2 2]
9646 [Thread-19-dummy] INFO backtype.storm.daemon.executor - Opened spout
dummy:(2)
9653 [Thread-7] INFO backtype.storm.daemon.executor - Loading executor
__system:[-1 -1]
9654 [Thread-19-dummy] INFO backtype.storm.daemon.executor - Activating
spout dummy:(2)
9654 [Thread-7] INFO backtype.storm.daemon.task - Emitting: __system
__system ["startup"]
9654 [Thread-7] INFO backtype.storm.daemon.executor - Loaded executor
tasks __system:[-1 -1]
9664 [Thread-7] INFO backtype.storm.daemon.executor - Finished loading
executor __system:[-1 -1]
9664 [Thread-21-__system] INFO backtype.storm.daemon.executor - Preparing
bolt __system:(-1)
9681 [Thread-7] INFO backtype.storm.daemon.executor - Loading executor
__acker:[1 1]
9682 [Thread-21-__system] INFO backtype.storm.daemon.executor - Prepared
bolt __system:(-1)
9682 [Thread-7] INFO backtype.storm.daemon.task - Emitting: __acker
__system ["startup"]
9683 [Thread-7] INFO backtype.storm.daemon.executor - Loaded executor
tasks __acker:[1 1]
9688 [Thread-7] INFO backtype.storm.daemon.executor - Finished loading
executor __acker:[1 1]
9688 [Thread-23-__acker] INFO backtype.storm.daemon.executor - Preparing
bolt __acker:(1)
9688 [Thread-7] INFO backtype.storm.daemon.worker - Launching
receive-thread for 660a2e2b-ac98-411e-bfcd-705bcb033bca:1
9691 [Thread-23-__acker] INFO backtype.storm.daemon.executor - Prepared
bolt __acker:(1)
9709 [Thread-7] INFO backtype.storm.daemon.worker - Worker has topology
config {"storm.id" "test-1-1403164765", "dev.zookeeper.path"
"/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" 1,
"topology.builtin.metrics.bucket.size.secs" 60,
"topology.fall.back.on.java.serialization" true,
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
"topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m",
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
"topology.trident.batch.emit.interval.millis" 50,
"nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
"java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib",
"topology.executor.send.buffer.size" 1024, "storm.local.dir"
"/tmp/b26847ea-5082-4ef0-b48d-53c3b6d96a94",
"supervisor.worker.start.timeout.secs" 120,
"topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
"topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost",
"storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil,
"topology.executor.receive.buffer.size" 1024,
"transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
"storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
true, "storm.zookeeper.servers" ["localhost"],
"transactional.zookeeper.root" "/transactional", "topology.acker.executors"
nil, "topology.kryo.decorators" (), "topology.name" "test",
"topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
"drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
"supervisor.heartbeat.frequency.secs" 5,
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
"topology.tasks" nil, "topology.spout.wait.strategy"
"backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
nil, "storm.zookeeper.retry.interval" 1000, "
topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
(1 2 3), "topology.debug" true, "nimbus.task.launch.secs" 120,
"nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil,
"topology.message.timeout.secs" 200, "task.refresh.poll.secs" 10,
"topology.workers" 1, "supervisor.childopts" "-Xmx256m",
"nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
"worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
"backtype.storm.serialization.types.ListDelegateSerializer",
"topology.disruptor.wait.strategy"
"com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
"storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
"backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
5, "storm.thrift.transport"
"backtype.storm.security.auth.SimpleTransportPlugin",
"topology.state.synchronization.timeout.secs" 60,
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
600, "storm.messaging.transport" "backtype.storm.messaging.zmq",
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
"topology.optimize" true, "topology.max.task.parallelism" nil}
9709 [Thread-7] INFO backtype.storm.daemon.worker - Worker
80a0f02c-771f-46d3-91fd-3decdbd77ee3 for storm test-1-1403164765 on
660a2e2b-ac98-411e-bfcd-705bcb033bca:1 has finished loading
9826 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
9928 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10029 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10130 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10231 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10331 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10433 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10534 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10635 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
default [commandFoo, sourceFoo]
10665 [Thread-21-__system] INFO backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __tick, id: {}, [1]
10688 [Thread-21-__system] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
should never be sent to __system bolt.
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
~[storm-core-0.9.0-rc2.jar:na]
at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
~[storm-core-0.9.0-rc2.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
Caused by: java.lang.RuntimeException: Non-system tuples should never be
sent to __system bolt.
at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0-rc2.jar:na]
... 6 common frames omitted
10689 [Thread-21-__system] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
should never be sent to __system bolt.
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
~[storm-core-0.9.0-rc2.jar:na]
at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
~[storm-core-0.9.0-rc2.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
Caused by: java.lang.RuntimeException: Non-system tuples should never be
sent to __system bolt.
at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
~[storm-core-0.9.0-rc2.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0-rc2.jar:na]
... 6 common frames omitted
10721 [Thread-21-__system] INFO backtype.storm.util - Halting process:
("Worker died")
--
Chris Bedford
Founder & Lead Lackey
Build Lackey Labs: http://buildlackey.com
Go Grails!: http://blog.buildlackey.com
Re: When i set TOPOLOGY_TICK_TUPLE_FREQ_SECS into my Storm config, I
get "Non-system tuples should never be sent to __system bolt"
Posted by Chris Bedford <ch...@buildlackey.com>.
Hi.
after poking around a bit I found that I went about things wrong. This
article helped me get my bearings:
http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/
The trick is to not set TOPOLOGY_TICK_TUPLE_FREQ_SECS globally, but instead
to set it in every bolt that you want to receive ticks, by implementing
getComponentConfiguration something like this>
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
}
Hope this is helpful for other people seeking to incorporate ticks (the
clock kind) into their topologies.
On Thu, Jun 19, 2014 at 1:08 AM, Chris Bedford <ch...@buildlackey.com>
wrote:
>
>
> Hello, Storm experts:
>
> I'm trying to use the system 'ticks' in Storm by setting
> TOPOLOGY_TICK_TUPLE_FREQ_SECS and then acting on the
> received tick tuples. However, I find that when I set
> TOPOLOGY_TICK_TUPLE_FREQ_SECS into the config
> of even a very very simple topology (one dummy spout that just emits to
> nowhere), i get the following error:
>
> "Non-system tuples should never be sent to __system bolt"
>
>
> I see the following in the log:
>
>
> 10665 [Thread-21-__system] INFO backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {}, [1]
>
>
> followed by this trace:
>
> java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
> should never be sent to __system bolt.
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
> ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
> ~[storm-core-0.9.0-rc2.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
> Caused by: java.lang.RuntimeException: Non-system tuples should never be
> sent to __system bolt.
> at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0-rc2.jar:na]
> ... 6 common frames omitted
>
>
> I certainly did not wire anything to send tuples to the system bolt, as
> you can see from the simple test topology i'm including below.
> I'm wondering if there is any other set up that i need to do. I'm also
> enclosing most of the log output (sorry it is long) just in case
> that is helpful.
>
> Thanks in advance to whoever can shed light on this...
> - chris
>
>
> >>>>> The CODE -- a testNG test in groovy... but it can be changed to
> java with a few semicolons here and there ... >>>>
>
> package com.example
> import backtype.storm.Config
> import backtype.storm.LocalCluster
> import backtype.storm.spout.SpoutOutputCollector
> import backtype.storm.task.TopologyContext
> import backtype.storm.topology.OutputFieldsDeclarer
> import backtype.storm.topology.TopologyBuilder
> import backtype.storm.topology.base.BaseRichSpout
> import backtype.storm.tuple.Fields
> import backtype.storm.tuple.Values
> import backtype.storm.utils.Utils
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> import org.testng.annotations.Test
> /*
> * Author: cbedford
> * Date: 6/18/14
> * Time: 10:05 PM
> */
> public class TickTest {
>
> public static class DummySpout extends BaseRichSpout {
> public static Logger LOG =
> LoggerFactory.getLogger(DummySpout.class);
> boolean _isDistributed;
> SpoutOutputCollector _collector;
>
>
> private static Integer count = 0;
>
> public DummySpout() {
> this(true);
> }
>
> public DummySpout(boolean isDistributed) {
> _isDistributed = isDistributed;
> }
>
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> _collector = collector;
> }
>
> public void close() {}
>
> public void nextTuple() {
> Utils.sleep(100);
> count += 1;
> final Values values =
> new Values(
> "commandFoo",
> "sourceFoo");
> _collector.emit(values);
> }
>
> public void ack(Object msgId) {
>
> }
>
> public void fail(Object msgId) {
>
> }
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> final Fields fields = new Fields("_command", "_source");
> declarer.declare(fields);
> }
>
> @Override
> public Map<String, Object> getComponentConfiguration() {
> if (!_isDistributed) {
> Map<String, Object> ret = new HashMap<String, Object>();
> ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
> return ret;
> } else {
> return null;
> }
> }
> }
>
> private static Logger logger = LoggerFactory.getLogger(
> TickTest.class.name)
>
>
> @Test(enabled = true)
> public void verifyClockTicks() {
>
> protected final LocalCluster cluster = new LocalCluster();
> Config config = new Config();
>
> String topology_message_timeout_secs =
> Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
> String topology_tick_tuple_freq_secs =
> Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
>
> config.put(topology_message_timeout_secs, 200);
> config.put(topology_tick_tuple_freq_secs, 1);
>
> TopologyBuilder builder = new TopologyBuilder();
>
> builder.setSpout("dummy", new DummySpout())
>
> config.setDebug(true);
> cluster.submitTopology("test", config, builder.createTopology());
> try {
> Thread.sleep(3000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> cluster.shutdown()
> }
> }
>
>
>
>
>
>
> >>>>> The (more or less) full log output >>>
>
>
>
> 7953 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] WARN
> org.apache.zookeeper.server.NIOServerCnxn - EndOfStreamException: Unable
> to read additional data from client sessionid 0x146b32327350008, likely
> client has closed socket
> 7981 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor
> with id b8bcdc08-7580-41ff-b117-8009431d6e5c at host chris-ubuntu-64
> 8276 [main] INFO backtype.storm.daemon.nimbus - Received topology
> submission for test with conf {"topology.max.task.parallelism" nil,
> "topology.acker.executors" nil, "topology.kryo.register" nil,
> "topology.kryo.decorators" (), "topology.name" "test", "storm.id"
> "test-1-1403164765", "topology.debug" true, "topology.tick.tuple.freq.secs"
> 1, "topology.message.timeout.secs" 200}
> 8418 [main] INFO backtype.storm.daemon.nimbus - Activating test:
> test-1-1403164765
> 8592 [main] INFO backtype.storm.scheduler.EvenScheduler - Available
> slots: (["660a2e2b-ac98-411e-bfcd-705bcb033bca" 1]
> ["660a2e2b-ac98-411e-bfcd-705bcb033bca" 2]
> ["660a2e2b-ac98-411e-bfcd-705bcb033bca" 3]
> ["b8bcdc08-7580-41ff-b117-8009431d6e5c" 4]
> ["b8bcdc08-7580-41ff-b117-8009431d6e5c" 5]
> ["b8bcdc08-7580-41ff-b117-8009431d6e5c" 6])
> 8640 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment
> for topology id test-1-1403164765:
> #backtype.storm.daemon.common.Assignment{:master-code-dir
> "/tmp/850b6bc1-af6c-40a6-a60e-5f5efc000218/nimbus/stormdist/test-1-1403164765",
> :node->host {"660a2e2b-ac98-411e-bfcd-705bcb033bca" "chris-ubuntu-64"},
> :executor->node+port {[1 1] ["660a2e2b-ac98-411e-bfcd-705bcb033bca" 1], [2
> 2] ["660a2e2b-ac98-411e-bfcd-705bcb033bca" 1]}, :executor->start-time-secs
> {[2 2] 1403164765, [1 1] 1403164765}}
> 8881 [Thread-6] INFO backtype.storm.daemon.supervisor - Downloading code
> for storm id test-1-1403164765 from
> /tmp/850b6bc1-af6c-40a6-a60e-5f5efc000218/nimbus/stormdist/test-1-1403164765
> 8901 [Thread-6] INFO backtype.storm.daemon.supervisor - Extracting
> resources from jar at
> /home/chris/Dropbox/3rdparty/idea-IU-129.1359/plugins/testng/lib/testng-plugin.jar
> to
> /tmp/b26847ea-5082-4ef0-b48d-53c3b6d96a94/supervisor/stormdist/test-1-1403164765/resources
> 8976 [Thread-6] INFO backtype.storm.daemon.supervisor - Finished
> downloading code for storm id test-1-1403164765 from
> /tmp/850b6bc1-af6c-40a6-a60e-5f5efc000218/nimbus/stormdist/test-1-1403164765
> 9012 [Thread-7] INFO backtype.storm.daemon.supervisor - Launching worker
> with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id
> "test-1-1403164765", :executors ([1 1] [2 2])} for this supervisor
> 660a2e2b-ac98-411e-bfcd-705bcb033bca on port 1 with id
> 80a0f02c-771f-46d3-91fd-3decdbd77ee3
> 9021 [Thread-7] INFO backtype.storm.daemon.worker - Launching worker for
> test-1-1403164765 on 660a2e2b-ac98-411e-bfcd-705bcb033bca:1 with id
> 80a0f02c-771f-46d3-91fd-3decdbd77ee3 and conf {"dev.zookeeper.path"
> "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil,
> "topology.builtin.metrics.bucket.size.secs" 60,
> "topology.fall.back.on.java.serialization" true,
> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
> "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m",
> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
> "topology.trident.batch.emit.interval.millis" 50,
> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
> "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib",
> "topology.executor.send.buffer.size" 1024, "storm.local.dir"
> "/tmp/b26847ea-5082-4ef0-b48d-53c3b6d96a94",
> "supervisor.worker.start.timeout.secs" 120,
> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost",
> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil,
> "topology.executor.receive.buffer.size" 1024,
> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
> true, "storm.zookeeper.servers" ["localhost"],
> "transactional.zookeeper.root" "/transactional", "topology.acker.executors"
> nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
> "supervisor.heartbeat.frequency.secs" 5,
> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
> "topology.tasks" nil, "topology.spout.wait.strategy"
> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
> nil, "storm.zookeeper.retry.interval" 1000, "
> topology.sleep.spout.wait.strategy.time.ms" 1,
> "nimbus.topology.validator"
> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
> (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120,
> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
> "backtype.storm.serialization.types.ListDelegateSerializer",
> "topology.disruptor.wait.strategy"
> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
> 5, "storm.thrift.transport"
> "backtype.storm.security.auth.SimpleTransportPlugin",
> "topology.state.synchronization.timeout.secs" 60,
> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq",
> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
> "topology.optimize" true, "topology.max.task.parallelism" nil}
> 9022 [Thread-7] INFO
> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
> 9044 [Thread-7-EventThread] INFO backtype.storm.zookeeper - Zookeeper
> state update: :connected:none
> 9051 [Thread-7] INFO
> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
> 9575 [Thread-7] INFO backtype.storm.daemon.executor - Loading executor
> dummy:[2 2]
> 9590 [Thread-7] INFO backtype.storm.daemon.task - Emitting: dummy
> __system ["startup"]
> 9591 [Thread-7] INFO backtype.storm.daemon.executor - Loaded executor
> tasks dummy:[2 2]
> 9631 [Thread-19-dummy] INFO backtype.storm.daemon.executor - Opening
> spout dummy:(2)
> 9634 [Thread-7] INFO backtype.storm.daemon.executor - Finished loading
> executor dummy:[2 2]
> 9646 [Thread-19-dummy] INFO backtype.storm.daemon.executor - Opened spout
> dummy:(2)
> 9653 [Thread-7] INFO backtype.storm.daemon.executor - Loading executor
> __system:[-1 -1]
> 9654 [Thread-19-dummy] INFO backtype.storm.daemon.executor - Activating
> spout dummy:(2)
> 9654 [Thread-7] INFO backtype.storm.daemon.task - Emitting: __system
> __system ["startup"]
> 9654 [Thread-7] INFO backtype.storm.daemon.executor - Loaded executor
> tasks __system:[-1 -1]
> 9664 [Thread-7] INFO backtype.storm.daemon.executor - Finished loading
> executor __system:[-1 -1]
> 9664 [Thread-21-__system] INFO backtype.storm.daemon.executor - Preparing
> bolt __system:(-1)
> 9681 [Thread-7] INFO backtype.storm.daemon.executor - Loading executor
> __acker:[1 1]
> 9682 [Thread-21-__system] INFO backtype.storm.daemon.executor - Prepared
> bolt __system:(-1)
> 9682 [Thread-7] INFO backtype.storm.daemon.task - Emitting: __acker
> __system ["startup"]
> 9683 [Thread-7] INFO backtype.storm.daemon.executor - Loaded executor
> tasks __acker:[1 1]
> 9688 [Thread-7] INFO backtype.storm.daemon.executor - Finished loading
> executor __acker:[1 1]
> 9688 [Thread-23-__acker] INFO backtype.storm.daemon.executor - Preparing
> bolt __acker:(1)
> 9688 [Thread-7] INFO backtype.storm.daemon.worker - Launching
> receive-thread for 660a2e2b-ac98-411e-bfcd-705bcb033bca:1
> 9691 [Thread-23-__acker] INFO backtype.storm.daemon.executor - Prepared
> bolt __acker:(1)
> 9709 [Thread-7] INFO backtype.storm.daemon.worker - Worker has topology
> config {"storm.id" "test-1-1403164765", "dev.zookeeper.path"
> "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" 1,
> "topology.builtin.metrics.bucket.size.secs" 60,
> "topology.fall.back.on.java.serialization" true,
> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
> "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m",
> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
> "topology.trident.batch.emit.interval.millis" 50,
> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
> "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib",
> "topology.executor.send.buffer.size" 1024, "storm.local.dir"
> "/tmp/b26847ea-5082-4ef0-b48d-53c3b6d96a94",
> "supervisor.worker.start.timeout.secs" 120,
> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost",
> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil,
> "topology.executor.receive.buffer.size" 1024,
> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
> true, "storm.zookeeper.servers" ["localhost"],
> "transactional.zookeeper.root" "/transactional", "topology.acker.executors"
> nil, "topology.kryo.decorators" (), "topology.name" "test",
> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
> "supervisor.heartbeat.frequency.secs" 5,
> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
> "topology.tasks" nil, "topology.spout.wait.strategy"
> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
> nil, "storm.zookeeper.retry.interval" 1000, "
> topology.sleep.spout.wait.strategy.time.ms" 1,
> "nimbus.topology.validator"
> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
> (1 2 3), "topology.debug" true, "nimbus.task.launch.secs" 120,
> "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil,
> "topology.message.timeout.secs" 200, "task.refresh.poll.secs" 10,
> "topology.workers" 1, "supervisor.childopts" "-Xmx256m",
> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
> "backtype.storm.serialization.types.ListDelegateSerializer",
> "topology.disruptor.wait.strategy"
> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
> 5, "storm.thrift.transport"
> "backtype.storm.security.auth.SimpleTransportPlugin",
> "topology.state.synchronization.timeout.secs" 60,
> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq",
> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
> "topology.optimize" true, "topology.max.task.parallelism" nil}
> 9709 [Thread-7] INFO backtype.storm.daemon.worker - Worker
> 80a0f02c-771f-46d3-91fd-3decdbd77ee3 for storm test-1-1403164765 on
> 660a2e2b-ac98-411e-bfcd-705bcb033bca:1 has finished loading
> 9826 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 9928 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10029 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10130 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10231 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10331 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10433 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10534 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10635 [Thread-19-dummy] INFO backtype.storm.daemon.task - Emitting: dummy
> default [commandFoo, sourceFoo]
> 10665 [Thread-21-__system] INFO backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {}, [1]
> 10688 [Thread-21-__system] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
> should never be sent to __system bolt.
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
> ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
> ~[storm-core-0.9.0-rc2.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
> Caused by: java.lang.RuntimeException: Non-system tuples should never be
> sent to __system bolt.
> at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0-rc2.jar:na]
> ... 6 common frames omitted
> 10689 [Thread-21-__system] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.RuntimeException: Non-system tuples
> should never be sent to __system bolt.
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$fn__3495$fn__3507$fn__3554.invoke(executor.clj:729)
> ~[storm-core-0.9.0-rc2.jar:na]
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
> ~[storm-core-0.9.0-rc2.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
> Caused by: java.lang.RuntimeException: Non-system tuples should never be
> sent to __system bolt.
> at backtype.storm.metric.SystemBolt.execute(SystemBolt.java:132)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$fn__3495$tuple_action_fn__3497.invoke(executor.clj:614)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3418.invoke(executor.clj:385)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0-rc2.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0-rc2.jar:na]
> ... 6 common frames omitted
> 10721 [Thread-21-__system] INFO backtype.storm.util - Halting process:
> ("Worker died")
>
> --
> Chris Bedford
>
> Founder & Lead Lackey
> Build Lackey Labs: http://buildlackey.com
> Go Grails!: http://blog.buildlackey.com
>
>
>
--
Chris Bedford
Founder & Lead Lackey
Build Lackey Labs: http://buildlackey.com
Go Grails!: http://blog.buildlackey.com