You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Shane <sh...@gmail.com> on 2014/01/03 10:50:00 UTC

How can TransactionalSpoutBatchExecutor skip the tick tuple?

Hi,
I found TransactionalSpoutBatchExecutor source code like this:
 @Override
    public void execute(Tuple input) {
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
 ......
    }

so if I use tick tuple frequently to do someting , I get the error:

 12139 [Thread-23-userflow_spout] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
cannot be cast to backtype.storm.transactional.TransactionAttempt
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
~[storm-core-0.9.0.1.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]
-- 
-----------------------------------------------------------------------
Yours sincerely,
Shane

Re: Re: How can TransactionalSpoutBatchExecutor skip the tick tuple?

Posted by "shane.lisy@gmail.com" <sh...@gmail.com>.
Yes, this is the right solution,and I have seen it in RollingCountBolt at the Storm-Start of the RollingTopWords  example, which encapsulate this process in the TupleHelpers 
  public static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
        Constants.SYSTEM_TICK_STREAM_ID);
  }

But I means that when I need both Tick Tuple and Transactional batch bolt/commiter bolt,should I mush override Storm backtype.storm.transactional.TransactionalSpoutBatchExecutor  source code  at line 37 in storm version 0.9.0.1, and add logic switch above ? Is the any configuration can help me to do this?




shane.lisy@gmail.com

From: Susheel Kumar Gadalay
Date: 2014-01-03 19:25
To: user
Subject: Re: How can TransactionalSpoutBatchExecutor skip the tick tuple?
You should do like this in your bolt execute method

if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
    tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
  // Processing for tick tuple
} else {
  // Processing for non tick tuple
}

import backtype.storm.Constants;



On 1/3/14, Shane <sh...@gmail.com> wrote:
> Hi,
> I found TransactionalSpoutBatchExecutor source code like this:
>  @Override
>     public void execute(Tuple input) {
>         TransactionAttempt attempt = (TransactionAttempt)
> input.getValue(0);
>  ......
>     }
>
> so if I use tick tuple frequently to do someting , I get the error:
>
>  12139 [Thread-23-userflow_spout] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
> cannot be cast to backtype.storm.transactional.TransactionAttempt
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]
> --
> -----------------------------------------------------------------------
> Yours sincerely,
> Shane
>

Re: How can TransactionalSpoutBatchExecutor skip the tick tuple?

Posted by Susheel Kumar Gadalay <sk...@gmail.com>.
You should do like this in your bolt execute method

if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
    tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
  // Processing for tick tuple
} else {
  // Processing for non tick tuple
}

import backtype.storm.Constants;



On 1/3/14, Shane <sh...@gmail.com> wrote:
> Hi,
> I found TransactionalSpoutBatchExecutor source code like this:
>  @Override
>     public void execute(Tuple input) {
>         TransactionAttempt attempt = (TransactionAttempt)
> input.getValue(0);
>  ......
>     }
>
> so if I use tick tuple frequently to do someting , I get the error:
>
>  12139 [Thread-23-userflow_spout] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
> cannot be cast to backtype.storm.transactional.TransactionAttempt
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:662) [na:1.6.0_45]
> --
> -----------------------------------------------------------------------
> Yours sincerely,
> Shane
>