You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Shane <sh...@gmail.com> on 2014/01/03 10:50:00 UTC
How can TransactionalSpoutBatchExecutor skip the tick tuple?
Hi,
I found TransactionalSpoutBatchExecutor source code like this:
@Override
public void execute(Tuple input) {
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
......
}
so if I use tick tuple frequently to do someting , I get the error:
12139 [Thread-23-userflow_spout] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
cannot be cast to backtype.storm.transactional.TransactionAttempt
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
~[storm-core-0.9.0.1.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]
--
-----------------------------------------------------------------------
Yours sincerely,
Shane
Re: Re: How can TransactionalSpoutBatchExecutor skip the tick tuple?
Posted by "shane.lisy@gmail.com" <sh...@gmail.com>.
Yes, this is the right solution,and I have seen it in RollingCountBolt at the Storm-Start of the RollingTopWords example, which encapsulate this process in the TupleHelpers
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
Constants.SYSTEM_TICK_STREAM_ID);
}
But I means that when I need both Tick Tuple and Transactional batch bolt/commiter bolt,should I mush override Storm backtype.storm.transactional.TransactionalSpoutBatchExecutor source code at line 37 in storm version 0.9.0.1, and add logic switch above ? Is the any configuration can help me to do this?
shane.lisy@gmail.com
From: Susheel Kumar Gadalay
Date: 2014-01-03 19:25
To: user
Subject: Re: How can TransactionalSpoutBatchExecutor skip the tick tuple?
You should do like this in your bolt execute method
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
// Processing for tick tuple
} else {
// Processing for non tick tuple
}
import backtype.storm.Constants;
On 1/3/14, Shane <sh...@gmail.com> wrote:
> Hi,
> I found TransactionalSpoutBatchExecutor source code like this:
> @Override
> public void execute(Tuple input) {
> TransactionAttempt attempt = (TransactionAttempt)
> input.getValue(0);
> ......
> }
>
> so if I use tick tuple frequently to do someting , I get the error:
>
> 12139 [Thread-23-userflow_spout] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
> cannot be cast to backtype.storm.transactional.TransactionAttempt
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]
> --
> -----------------------------------------------------------------------
> Yours sincerely,
> Shane
>
Re: How can TransactionalSpoutBatchExecutor skip the tick tuple?
Posted by Susheel Kumar Gadalay <sk...@gmail.com>.
You should do like this in your bolt execute method
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
// Processing for tick tuple
} else {
// Processing for non tick tuple
}
import backtype.storm.Constants;
On 1/3/14, Shane <sh...@gmail.com> wrote:
> Hi,
> I found TransactionalSpoutBatchExecutor source code like this:
> @Override
> public void execute(Tuple input) {
> TransactionAttempt attempt = (TransactionAttempt)
> input.getValue(0);
> ......
> }
>
> so if I use tick tuple frequently to do someting , I get the error:
>
> 12139 [Thread-23-userflow_spout] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
> cannot be cast to backtype.storm.transactional.TransactionAttempt
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]
> --
> -----------------------------------------------------------------------
> Yours sincerely,
> Shane
>