You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sam Mati <sm...@appnexus.com> on 2014/10/23 22:00:38 UTC

What is the purpose of timing out tuples?

Hi all.  I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.

I'm seeing that timing out tuples does nothing except call "fail" on the Spout.  The tuple itself will still be processed through the Topology, except acking/failing will have no effect.  Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology.  Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).

Here's my topology:
- I have a spout.  On nextTuple, it either re-emits a tuple that has failed, and if none are present, creates a new tuple.
- I have a bolt that takes 4 seconds to ack a tuple.
- topology.max.spout.pending = 5
- topology.message.timeout.secs = 5

I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples.  Over time, more and more tuples would be acked (though they would frequently time out).

What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt.  I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it.  Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.

I'm assuming, and hoping, that I'm missing something obvious here…

Two questions:
1.  Can I prevent Bolts from processing already-timed-out tuples?
2.  What is the point of timing out tuples?  It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!

Thanks,
-Sam


Spout:
public class SampleSpout extends BaseRichSpout {
    private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);

    SpoutOutputCollector collector;
    Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
    Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();

    int contentCounter;
    int curMsgId;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // unique-id always increments each time we emit.
        // msg-id gets incremented only when new tuples are created.
       declarer.declare(new Fields("msg-id", "content"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // either replay a failed tuple, or create a new one
        List<Object> tuple = null;
        if (replay_queue.size() > 0){
            tuple = replay_queue.poll();
        }else{
            tuple = new ArrayList<Object>();
            tuple.add(null);
            tuple.add("Content #" + contentCounter++);
        }

        // increment msgId and set it as the first item in the tuple
        int msgId = this.curMsgId++;
        tuple.set(0, msgId);
        logger.info("Emitting: " + tuple);
        // add this tuple to the 'pending' map, and emit it.
        pending_map.put(msgId, tuple);
        collector.emit(tuple, msgId);
        Utils.sleep(100);
    }

    @Override
    public void ack(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> acked_tuple = pending_map.remove(msgId);
        logger.info("Acked: " + acked_tuple);
    }

    @Override
    public void fail(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> failed_tuple = pending_map.remove(msgId);
        logger.info("Failed: " + failed_tuple);

        // put a copy into the replay queue
        ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
        replay_queue.add(copy);
    }
}


Bolt:
public class SamplePrintBolt extends BaseRichBolt {

    private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    @Override
    public void execute(Tuple input) {
        logger.info("I see: " + input.getValues());
        Utils.sleep(4000);
        logger.info("Done sleeping. Acking: "  + input.getValues());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // doesn't emit
    }
}


Main:
public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        conf.setMessageTimeoutSecs(5);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SampleSpout());
        builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("local", conf, builder.createTopology());
}


Output:
30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
-- Why is it looking at tuple #3?  This has already failed.
45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
-- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.

Re: What is the purpose of timing out tuples?

Posted by 임정택 <ka...@gmail.com>.
AFAIK, Storm guarantees "tuple runs at least once" when spout can replay
failed tuple.
If tuple should run only once, Trident supports it.

2014년 10월 24일 금요일, Sam Mati<sm...@appnexus.com>님이 작성한 메시지:

>  Thanks for the response.
>
>  My takeaway is that "timing out" is a last ditch effort, and that your
> timeout time should be higher than the maximum amount of time you expect
> your topology to take.  Otherwise you risk a vicious cycle:  timed-out out
> tuples are still processed, but are also re-emitted, causing more timeouts,
> causing more replays, etc.
>
>  I'm now wondering:  If all of my bolts always end in ack or fail, under
> what circumstances can tuples get "stuck"?  If a Worker gets killed,
> possibly?
>
>  One last question:  I frequently see mentioned that Storm guarantees
> everything gets run once.  Please correct me if I'm wrong, but Storm
> doesn't guarantee this at all -- It's up to your Spout to keep track of
> each pending tuple and to replay failed ones (as my example Spout below
> does).  Am I missing something here?
>
>  Again, thanks for your clarification!
>
>  Best,
> -Sam
>
>   From: Michael Rose <michael@fullcontact.com
> <javascript:_e(%7B%7D,'cvml','michael@fullcontact.com');>>
> Reply-To: "user@storm.apache.org
> <javascript:_e(%7B%7D,'cvml','user@storm.apache.org');>" <
> user@storm.apache.org
> <javascript:_e(%7B%7D,'cvml','user@storm.apache.org');>>
> Date: Thursday, October 23, 2014 4:57 PM
> To: "user@storm.apache.org
> <javascript:_e(%7B%7D,'cvml','user@storm.apache.org');>" <
> user@storm.apache.org
> <javascript:_e(%7B%7D,'cvml','user@storm.apache.org');>>
> Subject: Re: What is the purpose of timing out tuples?
>
>   It's a last ditch mechanism for replaying work which might have gotten
> stuck. Storm is an at-least-once processing system and doesn't aim to
> provide exactly once / transactional behavior with base Storm. Trident aims
> to implement that on top of the underlying at-least-once system.
>
>  Timed out in-flight tuples will *not* be cleared, this is true.
> Controlling latencies within a topology is a key to making Storm work. We
> have IO work isolated by Hystrix commands to ensure we're always coming in
> under our timeout period. We've experimented with using global streams to
> "kill" a particular tuple tree, essentially adding some unique work to a
> time-based cache to drop it at each bolt. It ultimately wasn't really
> necessary by instead improving the consistency of external IO through
> circuit breaking.
>
>  If you take away nothing else, remember that Storm is at least once
> processing. Its goal is to ensure processing eventually happens for
> everything, no matter how many times it might take. It's up to you to
> remove bad input or park it.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
> <javascript:_e(%7B%7D,'cvml','michael@fullcontact.com');>
>
> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <smati@appnexus.com
> <javascript:_e(%7B%7D,'cvml','smati@appnexus.com');>> wrote:
>
>>  Hi all.  I'm hoping somebody can explain this behavior to me because it
>> seems pretty unexpected.
>>
>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>> the Spout.  The tuple itself will still be processed through the Topology,
>> except acking/failing will have no effect.  Another problem is that the
>> number of pending tuples will increase — timed out tuples do not count as
>> pending even though they will flow through the topology.  Unless I'm
>> missing something, these two combined problems make timing out tuples, at
>> best. utterly pointless, and at worst very problematic (as it will just
>> throw more tuples into a topology that is already maxed out).
>>
>>  Here's my topology:
>> - I have a spout.  On nextTuple, it either re-emits a tuple that has
>> failed, and if none are present, creates a new tuple.
>> - I have a bolt that takes 4 seconds to ack a tuple.
>> - topology.max.spout.pending = 5
>> - topology.message.timeout.secs = 5
>>
>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout
>> — then the Bolt would next process the *resent* tuples.  Over time, more
>> and more tuples would be acked (though they would frequently time out).
>>
>>  What I'm seeing instead is that even though tuples are timed-out, they
>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>> for the Bolt, and that timed-out tuples are not cleared from it.
>> Regardless, this leads to all tuples timing out, since the Bolt will
>> eventually only process tuples that have been timed out.
>>
>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>
>>  Two questions:
>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>> *fail* on the Spout even though the tuple will still be processed by the
>> rest of the Topology!
>>
>>  Thanks,
>> -Sam
>>
>>
>>  Spout:
>>  public class SampleSpout extends BaseRichSpout {
>>     private static Logger logger =
>> LoggerFactory.getLogger(SampleSpout.class);
>>
>>      SpoutOutputCollector collector;
>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>> List<Object>>();
>>     Queue<List<Object>> replay_queue = new
>> LinkedBlockingQueue<List<Object>>();
>>
>>      int contentCounter;
>>     int curMsgId;
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // unique-id always increments each time we emit.
>>         // msg-id gets incremented only when new tuples are created.
>>        declarer.declare(new Fields("msg-id", "content"));
>>     }
>>
>>      @Override
>>     public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector spoutOutputCollector) {
>>         collector = spoutOutputCollector;
>>     }
>>
>>      @Override
>>     public void nextTuple() {
>>         // either replay a failed tuple, or create a new one
>>         List<Object> tuple = null;
>>         if (replay_queue.size() > 0){
>>             tuple = replay_queue.poll();
>>         }else{
>>             tuple = new ArrayList<Object>();
>>             tuple.add(null);
>>             tuple.add("Content #" + contentCounter++);
>>         }
>>
>>          // increment msgId and set it as the first item in the tuple
>>         int msgId = this.curMsgId++;
>>         tuple.set(0, msgId);
>>         logger.info("Emitting: " + tuple);
>>         // add this tuple to the 'pending' map, and emit it.
>>         pending_map.put(msgId, tuple);
>>         collector.emit(tuple, msgId);
>>         Utils.sleep(100);
>>     }
>>
>>      @Override
>>     public void ack(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>         logger.info("Acked: " + acked_tuple);
>>     }
>>
>>      @Override
>>     public void fail(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>         logger.info("Failed: " + failed_tuple);
>>
>>          // put a copy into the replay queue
>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>         replay_queue.add(copy);
>>     }
>> }
>>
>>
>>  Bolt:
>>  public class SamplePrintBolt extends BaseRichBolt {
>>
>>      private static Logger logger =
>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>
>>      OutputCollector collector;
>>
>>      @Override
>>     public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector outputCollector) {
>>         collector = outputCollector;
>>     }
>>
>>      @Override
>>     public void execute(Tuple input) {
>>         logger.info("I see: " + input.getValues());
>>         Utils.sleep(4000);
>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>         collector.ack(input);
>>     }
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // doesn't emit
>>     }
>> }
>>
>>
>>  Main:
>> public static void main(String[] args) throws Exception {
>>          Config conf = new Config();
>>         conf.setMaxSpoutPending(5);
>>         conf.setMessageTimeoutSecs(5);
>>
>>          TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("spout", new SampleSpout());
>>         builder.setBolt("bolt1", new
>> SamplePrintBolt()).shuffleGrouping("spout");
>>
>>          LocalCluster cluster = new LocalCluster();
>>         cluster.submitTopology("local", conf, builder.createTopology());
>>  }
>>
>>
>>  Output:
>>  30084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>> #0]
>> 30085 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>> #0]. Will now sleep...
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>> #1]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>> #2]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>> #3]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>> #4]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [0, Content #0]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>> #1]. Will now sleep...
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>> #5]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [1, Content #1]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>> #2]. Will now sleep...
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>> #6]
>> *-- So far, so good… however, now it's time for things to timeout.*
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>> #5]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>> #4]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>> #3]
>> 40085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>> #2]
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [2, Content #2]
>> *-- Acking a timed-out tuple… this does nothing.*
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>> #3]. Will now sleep…
>> *-- Why is it looking at tuple #3?  This has already failed.*
>> 45084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>> 45085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>> #6]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [3, Content #3]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>> #4]. Will now sleep...
>> 50084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>> #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>> #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>> #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>> #3]
>> *-- More timeouts**…*
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [4, Content #4]
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>> #5]. Will now sleep...
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [5, Content #5]
>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>> #6]. Will now sleep...
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>> #6]
>> 58091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [6, Content #6]
>> 58092 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>> #5]. Will now sleep...
>> 60085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>> #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>> #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>> #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>> #4]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [7, Content #5]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>> #4]. Will now sleep…
>> *-- It's clear that the Bolt looks at tuples even if they have
>> timed-out.  It's queue will get longer and longer and tuples will always
>> timeout.*
>> 65086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>> 65087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>> #6]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [8, Content #4]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>> #3]. Will now sleep...
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>> #4]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>> #5]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>> #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>> #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [9, Content #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>> #2]. Will now sleep...
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [10, Content #2]
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>> #6]. Will now sleep...
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>> #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [11, Content #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>> #2]. Will now sleep...
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>> #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>> #2]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>> #5]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>> #4]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [12, Content #2]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>> #5]. Will now sleep...
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>> #6]
>> 86098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [13, Content #5]
>> 86099 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>> #4]. Will now sleep...
>> 90100 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [14, Content #4]
>> 90101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>> #3]. Will now sleep...
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>> #5]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>> #4]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>> #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>> #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [15, Content #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>> #6]. Will now sleep…
>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>> failed 30 seconds ago.*
>>
>
>

-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: What is the purpose of timing out tuples?

Posted by Nathan Leung <nc...@gmail.com>.
Anchor your tuples, use message acks, and set the max spout pending:
https://storm.incubator.apache.org/apidocs/backtype/storm/Config.html#TOPOLOGY_MAX_SPOUT_PENDING

This will throttle data at the spout when the number of tuples emitted by
the spout but not yet fully acknowledged reaches the max value.


On Fri, Oct 24, 2014 at 2:13 PM, Sam Mati <sm...@appnexus.com> wrote:

>  Got it — timeout will catch any tuples that may be "lost", and shouldn't
> really be used for throttling purposes.
>
>  So, I'm curious, how is throttling best handled?  I assume Storm stops
> calling nextTuple() under some conditions, such as once some buffer is
> full… just looking for confirmation.
>
>  Thanks,
> -Sam
>
>   From: Michael Rose <mi...@fullcontact.com>
> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
> Date: Friday, October 24, 2014 12:32 PM
>
> To: "user@storm.apache.org" <us...@storm.apache.org>
> Subject: Re: What is the purpose of timing out tuples?
>
>   So lets say your worker dies and your tuple tree is incomplete. The
> timeout will ensure that your tuple gets replayed.
>
>  However lets say even worse happens: the worker hosting your acker goes
> down. Or even the whole cluster. As long as your upstream dependency is
> smart enough to not mark work as complete until ack or fail is called,
> it'll eventually be replayed. An upstream which supports these semantics is
> required to receive any guarantees from Storm. This can work with any
> traditional message queue, Redis, SQS, Kafka, and probably a host of other
> systems.
>
>  In one of our systems, our topologies pull from SQS. In the case where
> our topology is terminated abruptly, our SQS visibility timeout (the time
> before invisible work is marked visible again) SQS is set to wait slightly
> longer than our tuple timeout to mark it visible.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
> On Thu, Oct 23, 2014 at 4:11 PM, Sam Mati <sm...@appnexus.com> wrote:
>
>>  Thanks for the response.
>>
>>  My takeaway is that "timing out" is a last ditch effort, and that your
>> timeout time should be higher than the maximum amount of time you expect
>> your topology to take.  Otherwise you risk a vicious cycle:  timed-out out
>> tuples are still processed, but are also re-emitted, causing more timeouts,
>> causing more replays, etc.
>>
>>  I'm now wondering:  If all of my bolts always end in ack or fail, under
>> what circumstances can tuples get "stuck"?  If a Worker gets killed,
>> possibly?
>>
>>  One last question:  I frequently see mentioned that Storm guarantees
>> everything gets run once.  Please correct me if I'm wrong, but Storm
>> doesn't guarantee this at all -- It's up to your Spout to keep track of
>> each pending tuple and to replay failed ones (as my example Spout below
>> does).  Am I missing something here?
>>
>>  Again, thanks for your clarification!
>>
>>  Best,
>> -Sam
>>
>>   From: Michael Rose <mi...@fullcontact.com>
>> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
>> Date: Thursday, October 23, 2014 4:57 PM
>> To: "user@storm.apache.org" <us...@storm.apache.org>
>> Subject: Re: What is the purpose of timing out tuples?
>>
>>   It's a last ditch mechanism for replaying work which might have gotten
>> stuck. Storm is an at-least-once processing system and doesn't aim to
>> provide exactly once / transactional behavior with base Storm. Trident aims
>> to implement that on top of the underlying at-least-once system.
>>
>>  Timed out in-flight tuples will *not* be cleared, this is true.
>> Controlling latencies within a topology is a key to making Storm work. We
>> have IO work isolated by Hystrix commands to ensure we're always coming in
>> under our timeout period. We've experimented with using global streams to
>> "kill" a particular tuple tree, essentially adding some unique work to a
>> time-based cache to drop it at each bolt. It ultimately wasn't really
>> necessary by instead improving the consistency of external IO through
>> circuit breaking.
>>
>>  If you take away nothing else, remember that Storm is at least once
>> processing. Its goal is to ensure processing eventually happens for
>> everything, no matter how many times it might take. It's up to you to
>> remove bad input or park it.
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> michael@fullcontact.com
>>
>> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:
>>
>>>  Hi all.  I'm hoping somebody can explain this behavior to me because
>>> it seems pretty unexpected.
>>>
>>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>>> the Spout.  The tuple itself will still be processed through the Topology,
>>> except acking/failing will have no effect.  Another problem is that the
>>> number of pending tuples will increase — timed out tuples do not count as
>>> pending even though they will flow through the topology.  Unless I'm
>>> missing something, these two combined problems make timing out tuples, at
>>> best. utterly pointless, and at worst very problematic (as it will just
>>> throw more tuples into a topology that is already maxed out).
>>>
>>>  Here's my topology:
>>> - I have a spout.  On nextTuple, it either re-emits a tuple that has
>>> failed, and if none are present, creates a new tuple.
>>> - I have a bolt that takes 4 seconds to ack a tuple.
>>> - topology.max.spout.pending = 5
>>> - topology.message.timeout.secs = 5
>>>
>>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to
>>> timeout — then the Bolt would next process the *resent* tuples.  Over
>>> time, more and more tuples would be acked (though they would frequently
>>> time out).
>>>
>>>  What I'm seeing instead is that even though tuples are timed-out, they
>>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>>> for the Bolt, and that timed-out tuples are not cleared from it.
>>> Regardless, this leads to all tuples timing out, since the Bolt will
>>> eventually only process tuples that have been timed out.
>>>
>>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>>
>>>  Two questions:
>>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>>> *fail* on the Spout even though the tuple will still be processed by
>>> the rest of the Topology!
>>>
>>>  Thanks,
>>> -Sam
>>>
>>>
>>>  Spout:
>>>  public class SampleSpout extends BaseRichSpout {
>>>     private static Logger logger =
>>> LoggerFactory.getLogger(SampleSpout.class);
>>>
>>>      SpoutOutputCollector collector;
>>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>>> List<Object>>();
>>>     Queue<List<Object>> replay_queue = new
>>> LinkedBlockingQueue<List<Object>>();
>>>
>>>      int contentCounter;
>>>     int curMsgId;
>>>
>>>      @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // unique-id always increments each time we emit.
>>>         // msg-id gets incremented only when new tuples are created.
>>>        declarer.declare(new Fields("msg-id", "content"));
>>>     }
>>>
>>>      @Override
>>>     public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector spoutOutputCollector) {
>>>         collector = spoutOutputCollector;
>>>     }
>>>
>>>      @Override
>>>     public void nextTuple() {
>>>         // either replay a failed tuple, or create a new one
>>>         List<Object> tuple = null;
>>>         if (replay_queue.size() > 0){
>>>             tuple = replay_queue.poll();
>>>         }else{
>>>             tuple = new ArrayList<Object>();
>>>             tuple.add(null);
>>>             tuple.add("Content #" + contentCounter++);
>>>         }
>>>
>>>          // increment msgId and set it as the first item in the tuple
>>>         int msgId = this.curMsgId++;
>>>         tuple.set(0, msgId);
>>>         logger.info("Emitting: " + tuple);
>>>         // add this tuple to the 'pending' map, and emit it.
>>>         pending_map.put(msgId, tuple);
>>>         collector.emit(tuple, msgId);
>>>         Utils.sleep(100);
>>>     }
>>>
>>>      @Override
>>>     public void ack(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>>         logger.info("Acked: " + acked_tuple);
>>>     }
>>>
>>>      @Override
>>>     public void fail(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>>         logger.info("Failed: " + failed_tuple);
>>>
>>>          // put a copy into the replay queue
>>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>>         replay_queue.add(copy);
>>>     }
>>> }
>>>
>>>
>>>  Bolt:
>>>  public class SamplePrintBolt extends BaseRichBolt {
>>>
>>>      private static Logger logger =
>>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>>
>>>      OutputCollector collector;
>>>
>>>      @Override
>>>     public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector outputCollector) {
>>>         collector = outputCollector;
>>>     }
>>>
>>>      @Override
>>>     public void execute(Tuple input) {
>>>         logger.info("I see: " + input.getValues());
>>>         Utils.sleep(4000);
>>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>>         collector.ack(input);
>>>     }
>>>
>>>      @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // doesn't emit
>>>     }
>>> }
>>>
>>>
>>>  Main:
>>> public static void main(String[] args) throws Exception {
>>>          Config conf = new Config();
>>>         conf.setMaxSpoutPending(5);
>>>         conf.setMessageTimeoutSecs(5);
>>>
>>>          TopologyBuilder builder = new TopologyBuilder();
>>>         builder.setSpout("spout", new SampleSpout());
>>>         builder.setBolt("bolt1", new
>>> SamplePrintBolt()).shuffleGrouping("spout");
>>>
>>>          LocalCluster cluster = new LocalCluster();
>>>         cluster.submitTopology("local", conf, builder.createTopology());
>>>  }
>>>
>>>
>>>  Output:
>>>  30084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>>> #0]
>>> 30085 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>>> #0]. Will now sleep...
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>>> #1]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>>> #2]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>>> #3]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>>> #4]
>>> 34086 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [0, Content #0]
>>> 34086 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>>> #1]. Will now sleep...
>>> 34087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>>> 34087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>>> #5]
>>> 38087 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [1, Content #1]
>>> 38087 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>>> #2]. Will now sleep...
>>> 38089 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>>> 38089 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>>> #6]
>>> *-- So far, so good… however, now it's time for things to timeout.*
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>>> 40083 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>>> 40083 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>>> #5]
>>> 40084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>>> #4]
>>> 40084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>>> #3]
>>> 40085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>>> #2]
>>> 42088 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [2, Content #2]
>>> *-- Acking a timed-out tuple… this does nothing.*
>>> 42088 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>>> #3]. Will now sleep…
>>> *-- Why is it looking at tuple #3?  This has already failed.*
>>> 45084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>>> 45085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>>> #6]
>>> 46089 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [3, Content #3]
>>> 46089 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>>> #4]. Will now sleep...
>>> 50084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>>> #2]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>>> #5]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>>> #4]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>>> #3]
>>> *-- More timeouts**…*
>>> 50090 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [4, Content #4]
>>> 50090 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>>> #5]. Will now sleep...
>>> 54091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [5, Content #5]
>>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>>> 54091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>>> #6]. Will now sleep...
>>> 55085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>>> 55085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>>> #6]
>>> 58091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [6, Content #6]
>>> 58092 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>>> #5]. Will now sleep...
>>> 60085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>>> #3]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>>> #2]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>>> #5]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>>> #4]
>>> 62093 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [7, Content #5]
>>> 62093 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>>> #4]. Will now sleep…
>>> *-- It's clear that the Bolt looks at tuples even if they have
>>> timed-out.  It's queue will get longer and longer and tuples will always
>>> timeout.*
>>> 65086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>>> 65087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>>> #6]
>>> 66094 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [8, Content #4]
>>> 66094 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>>> #3]. Will now sleep...
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>>> #4]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>>> #5]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>>> #2]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>>> #3]
>>> 70095 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [9, Content #3]
>>> 70095 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>>> #2]. Will now sleep...
>>> 74096 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [10, Content #2]
>>> 74096 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>>> #6]. Will now sleep...
>>> 75088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>>> 75088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>>> #6]
>>> 78097 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [11, Content #6]
>>> 78097 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>>> #2]. Will now sleep...
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>>> #3]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>>> #2]
>>> 80088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>>> #5]
>>> 80088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>>> #4]
>>> 82098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [12, Content #2]
>>> 82098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>>> #5]. Will now sleep...
>>> 85088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>>> 85088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>>> #6]
>>> 86098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [13, Content #5]
>>> 86099 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>>> #4]. Will now sleep...
>>> 90100 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [14, Content #4]
>>> 90101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>>> #3]. Will now sleep...
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>>> #5]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>>> #4]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>>> #2]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>>> #3]
>>> 94101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [15, Content #3]
>>> 94101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>>> #6]. Will now sleep…
>>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>>> failed 30 seconds ago.*
>>>
>>
>>
>

Re: What is the purpose of timing out tuples?

Posted by Sam Mati <sm...@appnexus.com>.
Thanks again.

What else, besides max tuples pending, will cause the Spout to stop emitting?  What happens if it emits so many tuples that one or more subscribed bolts cannot fit them into their buffer?

Thanks
-Sam

From: Michael Rose <mi...@fullcontact.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Friday, October 24, 2014 2:28 PM
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

Storm tracks the number of outstanding tuples per *bolt instance*. If you have a max spout pending of 200, and 5 instances of your spout, you'll have 1000 tuples in flight max.

This (somewhat crude) mechanism is how it throttles. There's currently no real backpressure other than the # of pending spout tuples.

If your spout has nothing to return (you don't provide a tuple during nextTuple()) it'll sleep for 1ms (configurable) and try again.

FWIW, we have a spout wrapper that tosses a Guava ratelimiter around nextTuple that we can change dynamically (https://gist.github.com/Xorlev/1ac3642828e40f7e59aa) that we use to manually rate limit spouts when necessary.

Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Fri, Oct 24, 2014 at 12:13 PM, Sam Mati <sm...@appnexus.com>> wrote:
Got it — timeout will catch any tuples that may be "lost", and shouldn't really be used for throttling purposes.

So, I'm curious, how is throttling best handled?  I assume Storm stops calling nextTuple() under some conditions, such as once some buffer is full… just looking for confirmation.

Thanks,
-Sam

From: Michael Rose <mi...@fullcontact.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Friday, October 24, 2014 12:32 PM

To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

So lets say your worker dies and your tuple tree is incomplete. The timeout will ensure that your tuple gets replayed.

However lets say even worse happens: the worker hosting your acker goes down. Or even the whole cluster. As long as your upstream dependency is smart enough to not mark work as complete until ack or fail is called, it'll eventually be replayed. An upstream which supports these semantics is required to receive any guarantees from Storm. This can work with any traditional message queue, Redis, SQS, Kafka, and probably a host of other systems.

In one of our systems, our topologies pull from SQS. In the case where our topology is terminated abruptly, our SQS visibility timeout (the time before invisible work is marked visible again) SQS is set to wait slightly longer than our tuple timeout to mark it visible.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Thu, Oct 23, 2014 at 4:11 PM, Sam Mati <sm...@appnexus.com>> wrote:
Thanks for the response.

My takeaway is that "timing out" is a last ditch effort, and that your timeout time should be higher than the maximum amount of time you expect your topology to take.  Otherwise you risk a vicious cycle:  timed-out out tuples are still processed, but are also re-emitted, causing more timeouts, causing more replays, etc.

I'm now wondering:  If all of my bolts always end in ack or fail, under what circumstances can tuples get "stuck"?  If a Worker gets killed, possibly?

One last question:  I frequently see mentioned that Storm guarantees everything gets run once.  Please correct me if I'm wrong, but Storm doesn't guarantee this at all -- It's up to your Spout to keep track of each pending tuple and to replay failed ones (as my example Spout below does).  Am I missing something here?

Again, thanks for your clarification!

Best,
-Sam

From: Michael Rose <mi...@fullcontact.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Thursday, October 23, 2014 4:57 PM
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

It's a last ditch mechanism for replaying work which might have gotten stuck. Storm is an at-least-once processing system and doesn't aim to provide exactly once / transactional behavior with base Storm. Trident aims to implement that on top of the underlying at-least-once system.

Timed out in-flight tuples will *not* be cleared, this is true. Controlling latencies within a topology is a key to making Storm work. We have IO work isolated by Hystrix commands to ensure we're always coming in under our timeout period. We've experimented with using global streams to "kill" a particular tuple tree, essentially adding some unique work to a time-based cache to drop it at each bolt. It ultimately wasn't really necessary by instead improving the consistency of external IO through circuit breaking.

If you take away nothing else, remember that Storm is at least once processing. Its goal is to ensure processing eventually happens for everything, no matter how many times it might take. It's up to you to remove bad input or park it.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com>> wrote:
Hi all.  I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.

I'm seeing that timing out tuples does nothing except call "fail" on the Spout.  The tuple itself will still be processed through the Topology, except acking/failing will have no effect.  Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology.  Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).

Here's my topology:
- I have a spout.  On nextTuple, it either re-emits a tuple that has failed, and if none are present, creates a new tuple.
- I have a bolt that takes 4 seconds to ack a tuple.
- topology.max.spout.pending = 5
- topology.message.timeout.secs = 5

I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples.  Over time, more and more tuples would be acked (though they would frequently time out).

What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt.  I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it.  Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.

I'm assuming, and hoping, that I'm missing something obvious here…

Two questions:
1.  Can I prevent Bolts from processing already-timed-out tuples?
2.  What is the point of timing out tuples?  It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!

Thanks,
-Sam


Spout:
public class SampleSpout extends BaseRichSpout {
    private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);

    SpoutOutputCollector collector;
    Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
    Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();

    int contentCounter;
    int curMsgId;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // unique-id always increments each time we emit.
        // msg-id gets incremented only when new tuples are created.
       declarer.declare(new Fields("msg-id", "content"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // either replay a failed tuple, or create a new one
        List<Object> tuple = null;
        if (replay_queue.size() > 0){
            tuple = replay_queue.poll();
        }else{
            tuple = new ArrayList<Object>();
            tuple.add(null);
            tuple.add("Content #" + contentCounter++);
        }

        // increment msgId and set it as the first item in the tuple
        int msgId = this.curMsgId++;
        tuple.set(0, msgId);
        logger.info<http://logger.info>("Emitting: " + tuple);
        // add this tuple to the 'pending' map, and emit it.
        pending_map.put(msgId, tuple);
        collector.emit(tuple, msgId);
        Utils.sleep(100);
    }

    @Override
    public void ack(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> acked_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Acked: " + acked_tuple);
    }

    @Override
    public void fail(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> failed_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Failed: " + failed_tuple);

        // put a copy into the replay queue
        ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
        replay_queue.add(copy);
    }
}


Bolt:
public class SamplePrintBolt extends BaseRichBolt {

    private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    @Override
    public void execute(Tuple input) {
        logger.info<http://logger.info>("I see: " + input.getValues());
        Utils.sleep(4000);
        logger.info<http://logger.info>("Done sleeping. Acking: "  + input.getValues());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // doesn't emit
    }
}


Main:
public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        conf.setMessageTimeoutSecs(5);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SampleSpout());
        builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("local", conf, builder.createTopology());
}


Output:
30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
-- Why is it looking at tuple #3?  This has already failed.
45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
-- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.




Re: What is the purpose of timing out tuples?

Posted by Michael Rose <mi...@fullcontact.com>.
Storm tracks the number of outstanding tuples per *bolt instance*. If you
have a max spout pending of 200, and 5 instances of your spout, you'll have
1000 tuples in flight max.

This (somewhat crude) mechanism is how it throttles. There's currently no
real backpressure other than the # of pending spout tuples.

If your spout has nothing to return (you don't provide a tuple during
nextTuple()) it'll sleep for 1ms (configurable) and try again.

FWIW, we have a spout wrapper that tosses a Guava ratelimiter around
nextTuple that we can change dynamically (
https://gist.github.com/Xorlev/1ac3642828e40f7e59aa) that we use to
manually rate limit spouts when necessary.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com

On Fri, Oct 24, 2014 at 12:13 PM, Sam Mati <sm...@appnexus.com> wrote:

>  Got it — timeout will catch any tuples that may be "lost", and shouldn't
> really be used for throttling purposes.
>
>  So, I'm curious, how is throttling best handled?  I assume Storm stops
> calling nextTuple() under some conditions, such as once some buffer is
> full… just looking for confirmation.
>
>  Thanks,
> -Sam
>
>   From: Michael Rose <mi...@fullcontact.com>
> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
> Date: Friday, October 24, 2014 12:32 PM
>
> To: "user@storm.apache.org" <us...@storm.apache.org>
> Subject: Re: What is the purpose of timing out tuples?
>
>   So lets say your worker dies and your tuple tree is incomplete. The
> timeout will ensure that your tuple gets replayed.
>
>  However lets say even worse happens: the worker hosting your acker goes
> down. Or even the whole cluster. As long as your upstream dependency is
> smart enough to not mark work as complete until ack or fail is called,
> it'll eventually be replayed. An upstream which supports these semantics is
> required to receive any guarantees from Storm. This can work with any
> traditional message queue, Redis, SQS, Kafka, and probably a host of other
> systems.
>
>  In one of our systems, our topologies pull from SQS. In the case where
> our topology is terminated abruptly, our SQS visibility timeout (the time
> before invisible work is marked visible again) SQS is set to wait slightly
> longer than our tuple timeout to mark it visible.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
> On Thu, Oct 23, 2014 at 4:11 PM, Sam Mati <sm...@appnexus.com> wrote:
>
>>  Thanks for the response.
>>
>>  My takeaway is that "timing out" is a last ditch effort, and that your
>> timeout time should be higher than the maximum amount of time you expect
>> your topology to take.  Otherwise you risk a vicious cycle:  timed-out out
>> tuples are still processed, but are also re-emitted, causing more timeouts,
>> causing more replays, etc.
>>
>>  I'm now wondering:  If all of my bolts always end in ack or fail, under
>> what circumstances can tuples get "stuck"?  If a Worker gets killed,
>> possibly?
>>
>>  One last question:  I frequently see mentioned that Storm guarantees
>> everything gets run once.  Please correct me if I'm wrong, but Storm
>> doesn't guarantee this at all -- It's up to your Spout to keep track of
>> each pending tuple and to replay failed ones (as my example Spout below
>> does).  Am I missing something here?
>>
>>  Again, thanks for your clarification!
>>
>>  Best,
>> -Sam
>>
>>   From: Michael Rose <mi...@fullcontact.com>
>> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
>> Date: Thursday, October 23, 2014 4:57 PM
>> To: "user@storm.apache.org" <us...@storm.apache.org>
>> Subject: Re: What is the purpose of timing out tuples?
>>
>>   It's a last ditch mechanism for replaying work which might have gotten
>> stuck. Storm is an at-least-once processing system and doesn't aim to
>> provide exactly once / transactional behavior with base Storm. Trident aims
>> to implement that on top of the underlying at-least-once system.
>>
>>  Timed out in-flight tuples will *not* be cleared, this is true.
>> Controlling latencies within a topology is a key to making Storm work. We
>> have IO work isolated by Hystrix commands to ensure we're always coming in
>> under our timeout period. We've experimented with using global streams to
>> "kill" a particular tuple tree, essentially adding some unique work to a
>> time-based cache to drop it at each bolt. It ultimately wasn't really
>> necessary by instead improving the consistency of external IO through
>> circuit breaking.
>>
>>  If you take away nothing else, remember that Storm is at least once
>> processing. Its goal is to ensure processing eventually happens for
>> everything, no matter how many times it might take. It's up to you to
>> remove bad input or park it.
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> michael@fullcontact.com
>>
>> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:
>>
>>>  Hi all.  I'm hoping somebody can explain this behavior to me because
>>> it seems pretty unexpected.
>>>
>>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>>> the Spout.  The tuple itself will still be processed through the Topology,
>>> except acking/failing will have no effect.  Another problem is that the
>>> number of pending tuples will increase — timed out tuples do not count as
>>> pending even though they will flow through the topology.  Unless I'm
>>> missing something, these two combined problems make timing out tuples, at
>>> best. utterly pointless, and at worst very problematic (as it will just
>>> throw more tuples into a topology that is already maxed out).
>>>
>>>  Here's my topology:
>>> - I have a spout.  On nextTuple, it either re-emits a tuple that has
>>> failed, and if none are present, creates a new tuple.
>>> - I have a bolt that takes 4 seconds to ack a tuple.
>>> - topology.max.spout.pending = 5
>>> - topology.message.timeout.secs = 5
>>>
>>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to
>>> timeout — then the Bolt would next process the *resent* tuples.  Over
>>> time, more and more tuples would be acked (though they would frequently
>>> time out).
>>>
>>>  What I'm seeing instead is that even though tuples are timed-out, they
>>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>>> for the Bolt, and that timed-out tuples are not cleared from it.
>>> Regardless, this leads to all tuples timing out, since the Bolt will
>>> eventually only process tuples that have been timed out.
>>>
>>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>>
>>>  Two questions:
>>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>>> *fail* on the Spout even though the tuple will still be processed by
>>> the rest of the Topology!
>>>
>>>  Thanks,
>>> -Sam
>>>
>>>
>>>  Spout:
>>>  public class SampleSpout extends BaseRichSpout {
>>>     private static Logger logger =
>>> LoggerFactory.getLogger(SampleSpout.class);
>>>
>>>      SpoutOutputCollector collector;
>>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>>> List<Object>>();
>>>     Queue<List<Object>> replay_queue = new
>>> LinkedBlockingQueue<List<Object>>();
>>>
>>>      int contentCounter;
>>>     int curMsgId;
>>>
>>>      @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // unique-id always increments each time we emit.
>>>         // msg-id gets incremented only when new tuples are created.
>>>        declarer.declare(new Fields("msg-id", "content"));
>>>     }
>>>
>>>      @Override
>>>     public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector spoutOutputCollector) {
>>>         collector = spoutOutputCollector;
>>>     }
>>>
>>>      @Override
>>>     public void nextTuple() {
>>>         // either replay a failed tuple, or create a new one
>>>         List<Object> tuple = null;
>>>         if (replay_queue.size() > 0){
>>>             tuple = replay_queue.poll();
>>>         }else{
>>>             tuple = new ArrayList<Object>();
>>>             tuple.add(null);
>>>             tuple.add("Content #" + contentCounter++);
>>>         }
>>>
>>>          // increment msgId and set it as the first item in the tuple
>>>         int msgId = this.curMsgId++;
>>>         tuple.set(0, msgId);
>>>         logger.info("Emitting: " + tuple);
>>>         // add this tuple to the 'pending' map, and emit it.
>>>         pending_map.put(msgId, tuple);
>>>         collector.emit(tuple, msgId);
>>>         Utils.sleep(100);
>>>     }
>>>
>>>      @Override
>>>     public void ack(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>>         logger.info("Acked: " + acked_tuple);
>>>     }
>>>
>>>      @Override
>>>     public void fail(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>>         logger.info("Failed: " + failed_tuple);
>>>
>>>          // put a copy into the replay queue
>>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>>         replay_queue.add(copy);
>>>     }
>>> }
>>>
>>>
>>>  Bolt:
>>>  public class SamplePrintBolt extends BaseRichBolt {
>>>
>>>      private static Logger logger =
>>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>>
>>>      OutputCollector collector;
>>>
>>>      @Override
>>>     public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector outputCollector) {
>>>         collector = outputCollector;
>>>     }
>>>
>>>      @Override
>>>     public void execute(Tuple input) {
>>>         logger.info("I see: " + input.getValues());
>>>         Utils.sleep(4000);
>>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>>         collector.ack(input);
>>>     }
>>>
>>>      @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // doesn't emit
>>>     }
>>> }
>>>
>>>
>>>  Main:
>>> public static void main(String[] args) throws Exception {
>>>          Config conf = new Config();
>>>         conf.setMaxSpoutPending(5);
>>>         conf.setMessageTimeoutSecs(5);
>>>
>>>          TopologyBuilder builder = new TopologyBuilder();
>>>         builder.setSpout("spout", new SampleSpout());
>>>         builder.setBolt("bolt1", new
>>> SamplePrintBolt()).shuffleGrouping("spout");
>>>
>>>          LocalCluster cluster = new LocalCluster();
>>>         cluster.submitTopology("local", conf, builder.createTopology());
>>>  }
>>>
>>>
>>>  Output:
>>>  30084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>>> #0]
>>> 30085 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>>> #0]. Will now sleep...
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>>> #1]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>>> #2]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>>> #3]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>>> #4]
>>> 34086 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [0, Content #0]
>>> 34086 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>>> #1]. Will now sleep...
>>> 34087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>>> 34087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>>> #5]
>>> 38087 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [1, Content #1]
>>> 38087 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>>> #2]. Will now sleep...
>>> 38089 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>>> 38089 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>>> #6]
>>> *-- So far, so good… however, now it's time for things to timeout.*
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>>> 40083 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>>> 40083 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>>> #5]
>>> 40084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>>> #4]
>>> 40084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>>> #3]
>>> 40085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>>> #2]
>>> 42088 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [2, Content #2]
>>> *-- Acking a timed-out tuple… this does nothing.*
>>> 42088 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>>> #3]. Will now sleep…
>>> *-- Why is it looking at tuple #3?  This has already failed.*
>>> 45084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>>> 45085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>>> #6]
>>> 46089 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [3, Content #3]
>>> 46089 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>>> #4]. Will now sleep...
>>> 50084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>>> #2]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>>> #5]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>>> #4]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>>> #3]
>>> *-- More timeouts**…*
>>> 50090 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [4, Content #4]
>>> 50090 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>>> #5]. Will now sleep...
>>> 54091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [5, Content #5]
>>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>>> 54091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>>> #6]. Will now sleep...
>>> 55085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>>> 55085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>>> #6]
>>> 58091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [6, Content #6]
>>> 58092 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>>> #5]. Will now sleep...
>>> 60085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>>> #3]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>>> #2]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>>> #5]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>>> #4]
>>> 62093 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [7, Content #5]
>>> 62093 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>>> #4]. Will now sleep…
>>> *-- It's clear that the Bolt looks at tuples even if they have
>>> timed-out.  It's queue will get longer and longer and tuples will always
>>> timeout.*
>>> 65086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>>> 65087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>>> #6]
>>> 66094 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [8, Content #4]
>>> 66094 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>>> #3]. Will now sleep...
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>>> #4]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>>> #5]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>>> #2]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>>> #3]
>>> 70095 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [9, Content #3]
>>> 70095 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>>> #2]. Will now sleep...
>>> 74096 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [10, Content #2]
>>> 74096 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>>> #6]. Will now sleep...
>>> 75088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>>> 75088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>>> #6]
>>> 78097 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [11, Content #6]
>>> 78097 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>>> #2]. Will now sleep...
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>>> #3]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>>> #2]
>>> 80088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>>> #5]
>>> 80088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>>> #4]
>>> 82098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [12, Content #2]
>>> 82098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>>> #5]. Will now sleep...
>>> 85088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>>> 85088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>>> #6]
>>> 86098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [13, Content #5]
>>> 86099 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>>> #4]. Will now sleep...
>>> 90100 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [14, Content #4]
>>> 90101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>>> #3]. Will now sleep...
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>>> #5]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>>> #4]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>>> #2]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>>> #3]
>>> 94101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [15, Content #3]
>>> 94101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>>> #6]. Will now sleep…
>>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>>> failed 30 seconds ago.*
>>>
>>
>>
>

Re: What is the purpose of timing out tuples?

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Throttling is best handled by setting topology.max.spout.pending. By default this is unset, meaning there is no limit to the number of in-flight tuple trees. For Storm’s spout/bolt API, this refers to the number of tuples, for Trident, it is the number of batches. Start high (~1000) for spouts/bolts, and low for Trident (~5).

If storm detects that topology.max.spout.pending has been reached, it will pause the spout (i.e. stop calling `nextTuple()`) until it falls below that threshold again.

A good way to tell if you are vulnerable to replays is to look at the complete latency of the topology in Storm UI. If it is at or exceeding what you have set for topology.message.timeout.secs, you will likely get timeouts. Then it’s a matter of tuning by either increasing parallelism and/or increasing capacity (more nodes), increasing the timeout, or lowering topology.max.spout.pending until your topology can handle the volume it’s receiving.

-Taylor


On Oct 24, 2014, at 2:13 PM, Sam Mati <sm...@appnexus.com> wrote:

> Got it — timeout will catch any tuples that may be "lost", and shouldn't really be used for throttling purposes.
> 
> So, I'm curious, how is throttling best handled?  I assume Storm stops calling nextTuple() under some conditions, such as once some buffer is full… just looking for confirmation.
> 
> Thanks,
> -Sam
> 
> From: Michael Rose <mi...@fullcontact.com>
> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
> Date: Friday, October 24, 2014 12:32 PM
> To: "user@storm.apache.org" <us...@storm.apache.org>
> Subject: Re: What is the purpose of timing out tuples?
> 
> So lets say your worker dies and your tuple tree is incomplete. The timeout will ensure that your tuple gets replayed.
> 
> However lets say even worse happens: the worker hosting your acker goes down. Or even the whole cluster. As long as your upstream dependency is smart enough to not mark work as complete until ack or fail is called, it'll eventually be replayed. An upstream which supports these semantics is required to receive any guarantees from Storm. This can work with any traditional message queue, Redis, SQS, Kafka, and probably a host of other systems.
> 
> In one of our systems, our topologies pull from SQS. In the case where our topology is terminated abruptly, our SQS visibility timeout (the time before invisible work is marked visible again) SQS is set to wait slightly longer than our tuple timeout to mark it visible.
> 
> Michael Rose (@Xorlev)
> Senior Platform Engineer, FullContact
> michael@fullcontact.com
> 
> On Thu, Oct 23, 2014 at 4:11 PM, Sam Mati <sm...@appnexus.com> wrote:
>> Thanks for the response.
>> 
>> My takeaway is that "timing out" is a last ditch effort, and that your timeout time should be higher than the maximum amount of time you expect your topology to take.  Otherwise you risk a vicious cycle:  timed-out out tuples are still processed, but are also re-emitted, causing more timeouts, causing more replays, etc.
>> 
>> I'm now wondering:  If all of my bolts always end in ack or fail, under what circumstances can tuples get "stuck"?  If a Worker gets killed, possibly?
>> 
>> One last question:  I frequently see mentioned that Storm guarantees everything gets run once.  Please correct me if I'm wrong, but Storm doesn't guarantee this at all -- It's up to your Spout to keep track of each pending tuple and to replay failed ones (as my example Spout below does).  Am I missing something here?
>> 
>> Again, thanks for your clarification!
>> 
>> Best,
>> -Sam
>> 
>> From: Michael Rose <mi...@fullcontact.com>
>> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
>> Date: Thursday, October 23, 2014 4:57 PM
>> To: "user@storm.apache.org" <us...@storm.apache.org>
>> Subject: Re: What is the purpose of timing out tuples?
>> 
>> It's a last ditch mechanism for replaying work which might have gotten stuck. Storm is an at-least-once processing system and doesn't aim to provide exactly once / transactional behavior with base Storm. Trident aims to implement that on top of the underlying at-least-once system.
>> 
>> Timed out in-flight tuples will *not* be cleared, this is true. Controlling latencies within a topology is a key to making Storm work. We have IO work isolated by Hystrix commands to ensure we're always coming in under our timeout period. We've experimented with using global streams to "kill" a particular tuple tree, essentially adding some unique work to a time-based cache to drop it at each bolt. It ultimately wasn't really necessary by instead improving the consistency of external IO through circuit breaking.
>> 
>> If you take away nothing else, remember that Storm is at least once processing. Its goal is to ensure processing eventually happens for everything, no matter how many times it might take. It's up to you to remove bad input or park it.
>> 
>> Michael Rose (@Xorlev)
>> Senior Platform Engineer, FullContact
>> michael@fullcontact.com
>> 
>> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:
>>> Hi all.  I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.
>>> 
>>> I'm seeing that timing out tuples does nothing except call "fail" on the Spout.  The tuple itself will still be processed through the Topology, except acking/failing will have no effect.  Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology.  Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).
>>> 
>>> Here's my topology:
>>> - I have a spout.  On nextTuple, it either re-emits a tuple that has failed, and if none are present, creates a new tuple.
>>> - I have a bolt that takes 4 seconds to ack a tuple.
>>> - topology.max.spout.pending = 5
>>> - topology.message.timeout.secs = 5 
>>> 
>>> I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples.  Over time, more and more tuples would be acked (though they would frequently time out).
>>> 
>>> What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt.  I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it.  Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.
>>> 
>>> I'm assuming, and hoping, that I'm missing something obvious here…
>>> 
>>> Two questions:
>>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>>> 2.  What is the point of timing out tuples?  It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!
>>> 
>>> Thanks,
>>> -Sam
>>> 
>>> 
>>> Spout:
>>> public class SampleSpout extends BaseRichSpout {
>>>     private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);
>>> 
>>>     SpoutOutputCollector collector;
>>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
>>>     Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();
>>> 
>>>     int contentCounter;
>>>     int curMsgId;
>>> 
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // unique-id always increments each time we emit.
>>>         // msg-id gets incremented only when new tuples are created.
>>>        declarer.declare(new Fields("msg-id", "content"));
>>>     }
>>> 
>>>     @Override
>>>     public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
>>>         collector = spoutOutputCollector;
>>>     }
>>> 
>>>     @Override
>>>     public void nextTuple() {
>>>         // either replay a failed tuple, or create a new one
>>>         List<Object> tuple = null;
>>>         if (replay_queue.size() > 0){
>>>             tuple = replay_queue.poll();
>>>         }else{
>>>             tuple = new ArrayList<Object>();
>>>             tuple.add(null);
>>>             tuple.add("Content #" + contentCounter++);
>>>         }
>>> 
>>>         // increment msgId and set it as the first item in the tuple
>>>         int msgId = this.curMsgId++;
>>>         tuple.set(0, msgId);
>>>         logger.info("Emitting: " + tuple);
>>>         // add this tuple to the 'pending' map, and emit it.
>>>         pending_map.put(msgId, tuple);
>>>         collector.emit(tuple, msgId);
>>>         Utils.sleep(100);
>>>     }
>>> 
>>>     @Override
>>>     public void ack(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>>         logger.info("Acked: " + acked_tuple);
>>>     }
>>> 
>>>     @Override
>>>     public void fail(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>>         logger.info("Failed: " + failed_tuple);
>>> 
>>>         // put a copy into the replay queue
>>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>>         replay_queue.add(copy);
>>>     }
>>> }
>>> 
>>> 
>>> Bolt:
>>> public class SamplePrintBolt extends BaseRichBolt {
>>> 
>>>     private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);
>>> 
>>>     OutputCollector collector;
>>> 
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
>>>         collector = outputCollector;
>>>     }
>>> 
>>>     @Override
>>>     public void execute(Tuple input) {
>>>         logger.info("I see: " + input.getValues());
>>>         Utils.sleep(4000);
>>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>>         collector.ack(input);
>>>     }
>>> 
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // doesn't emit
>>>     }
>>> }
>>> 
>>> 
>>> Main:
>>> public static void main(String[] args) throws Exception {
>>>         Config conf = new Config();
>>>         conf.setMaxSpoutPending(5);
>>>         conf.setMessageTimeoutSecs(5);
>>> 
>>>         TopologyBuilder builder = new TopologyBuilder();
>>>         builder.setSpout("spout", new SampleSpout());
>>>         builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");
>>> 
>>>         LocalCluster cluster = new LocalCluster();
>>>         cluster.submitTopology("local", conf, builder.createTopology());
>>> }
>>> 
>>> 
>>> Output:
>>> 30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
>>> 30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
>>> 30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
>>> 30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
>>> 30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
>>> 30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
>>> 34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
>>> 34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
>>> 34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>>> 34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
>>> 38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
>>> 38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
>>> 38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>>> 38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
>>> -- So far, so good… however, now it's time for things to timeout.
>>> 40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>>> 40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>>> 40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>>> 40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>>> 40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
>>> 40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
>>> 40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
>>> 40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
>>> 42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
>>> -- Acking a timed-out tuple… this does nothing.
>>> 42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
>>> -- Why is it looking at tuple #3?  This has already failed.
>>> 45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>>> 45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
>>> 46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
>>> 46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
>>> 50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
>>> 50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
>>> -- More timeouts…
>>> 50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
>>> 50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
>>> 54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
>>> -- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
>>> 54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
>>> 55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>>> 55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
>>> 58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
>>> 58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
>>> 60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
>>> 60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
>>> 62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
>>> 62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
>>> -- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
>>> 65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>>> 65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
>>> 66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
>>> 66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
>>> 70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>>> 70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>>> 70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>>> 70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>>> 70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
>>> 70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
>>> 70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
>>> 70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
>>> 70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
>>> 70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
>>> 74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
>>> 74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
>>> 75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>>> 75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
>>> 78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
>>> 78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
>>> 80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>>> 80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>>> 80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>>> 80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>>> 80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
>>> 80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
>>> 80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
>>> 80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
>>> 82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
>>> 82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
>>> 85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>>> 85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
>>> 86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
>>> 86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
>>> 90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
>>> 90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
>>> 90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>>> 90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>>> 90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>>> 90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>>> 90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
>>> 90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
>>> 90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
>>> 90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
>>> 94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
>>> 94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
>>> -- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.
>> 
> 


Re: What is the purpose of timing out tuples?

Posted by Sam Mati <sm...@appnexus.com>.
Got it — timeout will catch any tuples that may be "lost", and shouldn't really be used for throttling purposes.

So, I'm curious, how is throttling best handled?  I assume Storm stops calling nextTuple() under some conditions, such as once some buffer is full… just looking for confirmation.

Thanks,
-Sam

From: Michael Rose <mi...@fullcontact.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Friday, October 24, 2014 12:32 PM
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

So lets say your worker dies and your tuple tree is incomplete. The timeout will ensure that your tuple gets replayed.

However lets say even worse happens: the worker hosting your acker goes down. Or even the whole cluster. As long as your upstream dependency is smart enough to not mark work as complete until ack or fail is called, it'll eventually be replayed. An upstream which supports these semantics is required to receive any guarantees from Storm. This can work with any traditional message queue, Redis, SQS, Kafka, and probably a host of other systems.

In one of our systems, our topologies pull from SQS. In the case where our topology is terminated abruptly, our SQS visibility timeout (the time before invisible work is marked visible again) SQS is set to wait slightly longer than our tuple timeout to mark it visible.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Thu, Oct 23, 2014 at 4:11 PM, Sam Mati <sm...@appnexus.com>> wrote:
Thanks for the response.

My takeaway is that "timing out" is a last ditch effort, and that your timeout time should be higher than the maximum amount of time you expect your topology to take.  Otherwise you risk a vicious cycle:  timed-out out tuples are still processed, but are also re-emitted, causing more timeouts, causing more replays, etc.

I'm now wondering:  If all of my bolts always end in ack or fail, under what circumstances can tuples get "stuck"?  If a Worker gets killed, possibly?

One last question:  I frequently see mentioned that Storm guarantees everything gets run once.  Please correct me if I'm wrong, but Storm doesn't guarantee this at all -- It's up to your Spout to keep track of each pending tuple and to replay failed ones (as my example Spout below does).  Am I missing something here?

Again, thanks for your clarification!

Best,
-Sam

From: Michael Rose <mi...@fullcontact.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Thursday, October 23, 2014 4:57 PM
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

It's a last ditch mechanism for replaying work which might have gotten stuck. Storm is an at-least-once processing system and doesn't aim to provide exactly once / transactional behavior with base Storm. Trident aims to implement that on top of the underlying at-least-once system.

Timed out in-flight tuples will *not* be cleared, this is true. Controlling latencies within a topology is a key to making Storm work. We have IO work isolated by Hystrix commands to ensure we're always coming in under our timeout period. We've experimented with using global streams to "kill" a particular tuple tree, essentially adding some unique work to a time-based cache to drop it at each bolt. It ultimately wasn't really necessary by instead improving the consistency of external IO through circuit breaking.

If you take away nothing else, remember that Storm is at least once processing. Its goal is to ensure processing eventually happens for everything, no matter how many times it might take. It's up to you to remove bad input or park it.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com>> wrote:
Hi all.  I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.

I'm seeing that timing out tuples does nothing except call "fail" on the Spout.  The tuple itself will still be processed through the Topology, except acking/failing will have no effect.  Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology.  Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).

Here's my topology:
- I have a spout.  On nextTuple, it either re-emits a tuple that has failed, and if none are present, creates a new tuple.
- I have a bolt that takes 4 seconds to ack a tuple.
- topology.max.spout.pending = 5
- topology.message.timeout.secs = 5

I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples.  Over time, more and more tuples would be acked (though they would frequently time out).

What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt.  I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it.  Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.

I'm assuming, and hoping, that I'm missing something obvious here…

Two questions:
1.  Can I prevent Bolts from processing already-timed-out tuples?
2.  What is the point of timing out tuples?  It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!

Thanks,
-Sam


Spout:
public class SampleSpout extends BaseRichSpout {
    private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);

    SpoutOutputCollector collector;
    Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
    Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();

    int contentCounter;
    int curMsgId;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // unique-id always increments each time we emit.
        // msg-id gets incremented only when new tuples are created.
       declarer.declare(new Fields("msg-id", "content"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // either replay a failed tuple, or create a new one
        List<Object> tuple = null;
        if (replay_queue.size() > 0){
            tuple = replay_queue.poll();
        }else{
            tuple = new ArrayList<Object>();
            tuple.add(null);
            tuple.add("Content #" + contentCounter++);
        }

        // increment msgId and set it as the first item in the tuple
        int msgId = this.curMsgId++;
        tuple.set(0, msgId);
        logger.info<http://logger.info>("Emitting: " + tuple);
        // add this tuple to the 'pending' map, and emit it.
        pending_map.put(msgId, tuple);
        collector.emit(tuple, msgId);
        Utils.sleep(100);
    }

    @Override
    public void ack(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> acked_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Acked: " + acked_tuple);
    }

    @Override
    public void fail(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> failed_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Failed: " + failed_tuple);

        // put a copy into the replay queue
        ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
        replay_queue.add(copy);
    }
}


Bolt:
public class SamplePrintBolt extends BaseRichBolt {

    private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    @Override
    public void execute(Tuple input) {
        logger.info<http://logger.info>("I see: " + input.getValues());
        Utils.sleep(4000);
        logger.info<http://logger.info>("Done sleeping. Acking: "  + input.getValues());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // doesn't emit
    }
}


Main:
public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        conf.setMessageTimeoutSecs(5);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SampleSpout());
        builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("local", conf, builder.createTopology());
}


Output:
30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
-- Why is it looking at tuple #3?  This has already failed.
45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
-- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.



Re: What is the purpose of timing out tuples?

Posted by Michael Rose <mi...@fullcontact.com>.
So lets say your worker dies and your tuple tree is incomplete. The timeout
will ensure that your tuple gets replayed.

However lets say even worse happens: the worker hosting your acker goes
down. Or even the whole cluster. As long as your upstream dependency is
smart enough to not mark work as complete until ack or fail is called,
it'll eventually be replayed. An upstream which supports these semantics is
required to receive any guarantees from Storm. This can work with any
traditional message queue, Redis, SQS, Kafka, and probably a host of other
systems.

In one of our systems, our topologies pull from SQS. In the case where our
topology is terminated abruptly, our SQS visibility timeout (the time
before invisible work is marked visible again) SQS is set to wait slightly
longer than our tuple timeout to mark it visible.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com

On Thu, Oct 23, 2014 at 4:11 PM, Sam Mati <sm...@appnexus.com> wrote:

>  Thanks for the response.
>
>  My takeaway is that "timing out" is a last ditch effort, and that your
> timeout time should be higher than the maximum amount of time you expect
> your topology to take.  Otherwise you risk a vicious cycle:  timed-out out
> tuples are still processed, but are also re-emitted, causing more timeouts,
> causing more replays, etc.
>
>  I'm now wondering:  If all of my bolts always end in ack or fail, under
> what circumstances can tuples get "stuck"?  If a Worker gets killed,
> possibly?
>
>  One last question:  I frequently see mentioned that Storm guarantees
> everything gets run once.  Please correct me if I'm wrong, but Storm
> doesn't guarantee this at all -- It's up to your Spout to keep track of
> each pending tuple and to replay failed ones (as my example Spout below
> does).  Am I missing something here?
>
>  Again, thanks for your clarification!
>
>  Best,
> -Sam
>
>   From: Michael Rose <mi...@fullcontact.com>
> Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
> Date: Thursday, October 23, 2014 4:57 PM
> To: "user@storm.apache.org" <us...@storm.apache.org>
> Subject: Re: What is the purpose of timing out tuples?
>
>   It's a last ditch mechanism for replaying work which might have gotten
> stuck. Storm is an at-least-once processing system and doesn't aim to
> provide exactly once / transactional behavior with base Storm. Trident aims
> to implement that on top of the underlying at-least-once system.
>
>  Timed out in-flight tuples will *not* be cleared, this is true.
> Controlling latencies within a topology is a key to making Storm work. We
> have IO work isolated by Hystrix commands to ensure we're always coming in
> under our timeout period. We've experimented with using global streams to
> "kill" a particular tuple tree, essentially adding some unique work to a
> time-based cache to drop it at each bolt. It ultimately wasn't really
> necessary by instead improving the consistency of external IO through
> circuit breaking.
>
>  If you take away nothing else, remember that Storm is at least once
> processing. Its goal is to ensure processing eventually happens for
> everything, no matter how many times it might take. It's up to you to
> remove bad input or park it.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:
>
>>  Hi all.  I'm hoping somebody can explain this behavior to me because it
>> seems pretty unexpected.
>>
>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>> the Spout.  The tuple itself will still be processed through the Topology,
>> except acking/failing will have no effect.  Another problem is that the
>> number of pending tuples will increase — timed out tuples do not count as
>> pending even though they will flow through the topology.  Unless I'm
>> missing something, these two combined problems make timing out tuples, at
>> best. utterly pointless, and at worst very problematic (as it will just
>> throw more tuples into a topology that is already maxed out).
>>
>>  Here's my topology:
>> - I have a spout.  On nextTuple, it either re-emits a tuple that has
>> failed, and if none are present, creates a new tuple.
>> - I have a bolt that takes 4 seconds to ack a tuple.
>> - topology.max.spout.pending = 5
>> - topology.message.timeout.secs = 5
>>
>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout
>> — then the Bolt would next process the *resent* tuples.  Over time, more
>> and more tuples would be acked (though they would frequently time out).
>>
>>  What I'm seeing instead is that even though tuples are timed-out, they
>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>> for the Bolt, and that timed-out tuples are not cleared from it.
>> Regardless, this leads to all tuples timing out, since the Bolt will
>> eventually only process tuples that have been timed out.
>>
>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>
>>  Two questions:
>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>> *fail* on the Spout even though the tuple will still be processed by the
>> rest of the Topology!
>>
>>  Thanks,
>> -Sam
>>
>>
>>  Spout:
>>  public class SampleSpout extends BaseRichSpout {
>>     private static Logger logger =
>> LoggerFactory.getLogger(SampleSpout.class);
>>
>>      SpoutOutputCollector collector;
>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>> List<Object>>();
>>     Queue<List<Object>> replay_queue = new
>> LinkedBlockingQueue<List<Object>>();
>>
>>      int contentCounter;
>>     int curMsgId;
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // unique-id always increments each time we emit.
>>         // msg-id gets incremented only when new tuples are created.
>>        declarer.declare(new Fields("msg-id", "content"));
>>     }
>>
>>      @Override
>>     public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector spoutOutputCollector) {
>>         collector = spoutOutputCollector;
>>     }
>>
>>      @Override
>>     public void nextTuple() {
>>         // either replay a failed tuple, or create a new one
>>         List<Object> tuple = null;
>>         if (replay_queue.size() > 0){
>>             tuple = replay_queue.poll();
>>         }else{
>>             tuple = new ArrayList<Object>();
>>             tuple.add(null);
>>             tuple.add("Content #" + contentCounter++);
>>         }
>>
>>          // increment msgId and set it as the first item in the tuple
>>         int msgId = this.curMsgId++;
>>         tuple.set(0, msgId);
>>         logger.info("Emitting: " + tuple);
>>         // add this tuple to the 'pending' map, and emit it.
>>         pending_map.put(msgId, tuple);
>>         collector.emit(tuple, msgId);
>>         Utils.sleep(100);
>>     }
>>
>>      @Override
>>     public void ack(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>         logger.info("Acked: " + acked_tuple);
>>     }
>>
>>      @Override
>>     public void fail(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>         logger.info("Failed: " + failed_tuple);
>>
>>          // put a copy into the replay queue
>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>         replay_queue.add(copy);
>>     }
>> }
>>
>>
>>  Bolt:
>>  public class SamplePrintBolt extends BaseRichBolt {
>>
>>      private static Logger logger =
>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>
>>      OutputCollector collector;
>>
>>      @Override
>>     public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector outputCollector) {
>>         collector = outputCollector;
>>     }
>>
>>      @Override
>>     public void execute(Tuple input) {
>>         logger.info("I see: " + input.getValues());
>>         Utils.sleep(4000);
>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>         collector.ack(input);
>>     }
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // doesn't emit
>>     }
>> }
>>
>>
>>  Main:
>> public static void main(String[] args) throws Exception {
>>          Config conf = new Config();
>>         conf.setMaxSpoutPending(5);
>>         conf.setMessageTimeoutSecs(5);
>>
>>          TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("spout", new SampleSpout());
>>         builder.setBolt("bolt1", new
>> SamplePrintBolt()).shuffleGrouping("spout");
>>
>>          LocalCluster cluster = new LocalCluster();
>>         cluster.submitTopology("local", conf, builder.createTopology());
>>  }
>>
>>
>>  Output:
>>  30084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>> #0]
>> 30085 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>> #0]. Will now sleep...
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>> #1]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>> #2]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>> #3]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>> #4]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [0, Content #0]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>> #1]. Will now sleep...
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>> #5]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [1, Content #1]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>> #2]. Will now sleep...
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>> #6]
>> *-- So far, so good… however, now it's time for things to timeout.*
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>> #5]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>> #4]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>> #3]
>> 40085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>> #2]
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [2, Content #2]
>> *-- Acking a timed-out tuple… this does nothing.*
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>> #3]. Will now sleep…
>> *-- Why is it looking at tuple #3?  This has already failed.*
>> 45084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>> 45085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>> #6]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [3, Content #3]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>> #4]. Will now sleep...
>> 50084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>> #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>> #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>> #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>> #3]
>> *-- More timeouts**…*
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [4, Content #4]
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>> #5]. Will now sleep...
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [5, Content #5]
>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>> #6]. Will now sleep...
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>> #6]
>> 58091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [6, Content #6]
>> 58092 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>> #5]. Will now sleep...
>> 60085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>> #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>> #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>> #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>> #4]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [7, Content #5]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>> #4]. Will now sleep…
>> *-- It's clear that the Bolt looks at tuples even if they have
>> timed-out.  It's queue will get longer and longer and tuples will always
>> timeout.*
>> 65086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>> 65087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>> #6]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [8, Content #4]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>> #3]. Will now sleep...
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>> #4]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>> #5]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>> #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>> #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [9, Content #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>> #2]. Will now sleep...
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [10, Content #2]
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>> #6]. Will now sleep...
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>> #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [11, Content #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>> #2]. Will now sleep...
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>> #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>> #2]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>> #5]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>> #4]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [12, Content #2]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>> #5]. Will now sleep...
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>> #6]
>> 86098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [13, Content #5]
>> 86099 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>> #4]. Will now sleep...
>> 90100 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [14, Content #4]
>> 90101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>> #3]. Will now sleep...
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>> #5]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>> #4]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>> #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>> #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [15, Content #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>> #6]. Will now sleep…
>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>> failed 30 seconds ago.*
>>
>
>

Re: What is the purpose of timing out tuples?

Posted by Sam Mati <sm...@appnexus.com>.
Great suggestion, thank you.

From: Tom Brown <to...@gmail.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Thursday, October 23, 2014 5:54 PM
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

You can always add your own timestamp to the tuple (propagate the parent timestamp if you're emitting an anchored one). Assuming all the machines have synchronized clocks, you can have your bolts silently drop tuples that were emitted and have since timed out. No need to fail them since the timeout mechanism already did.

--Tom

On Thu, Oct 23, 2014 at 3:13 PM, saiprasad mishra <sa...@gmail.com>> wrote:
+1 Michael Rose Hysterix CB is the way to go if REST APIs are invoked from topology

Timeout in topology becomes controllable :)


Regards
Sai

On Thu, Oct 23, 2014 at 1:57 PM, Michael Rose <mi...@fullcontact.com>> wrote:
It's a last ditch mechanism for replaying work which might have gotten stuck. Storm is an at-least-once processing system and doesn't aim to provide exactly once / transactional behavior with base Storm. Trident aims to implement that on top of the underlying at-least-once system.

Timed out in-flight tuples will *not* be cleared, this is true. Controlling latencies within a topology is a key to making Storm work. We have IO work isolated by Hystrix commands to ensure we're always coming in under our timeout period. We've experimented with using global streams to "kill" a particular tuple tree, essentially adding some unique work to a time-based cache to drop it at each bolt. It ultimately wasn't really necessary by instead improving the consistency of external IO through circuit breaking.

If you take away nothing else, remember that Storm is at least once processing. Its goal is to ensure processing eventually happens for everything, no matter how many times it might take. It's up to you to remove bad input or park it.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com>> wrote:
Hi all.  I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.

I'm seeing that timing out tuples does nothing except call "fail" on the Spout.  The tuple itself will still be processed through the Topology, except acking/failing will have no effect.  Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology.  Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).

Here's my topology:
- I have a spout.  On nextTuple, it either re-emits a tuple that has failed, and if none are present, creates a new tuple.
- I have a bolt that takes 4 seconds to ack a tuple.
- topology.max.spout.pending = 5
- topology.message.timeout.secs = 5

I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples.  Over time, more and more tuples would be acked (though they would frequently time out).

What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt.  I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it.  Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.

I'm assuming, and hoping, that I'm missing something obvious here…

Two questions:
1.  Can I prevent Bolts from processing already-timed-out tuples?
2.  What is the point of timing out tuples?  It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!

Thanks,
-Sam


Spout:
public class SampleSpout extends BaseRichSpout {
    private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);

    SpoutOutputCollector collector;
    Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
    Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();

    int contentCounter;
    int curMsgId;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // unique-id always increments each time we emit.
        // msg-id gets incremented only when new tuples are created.
       declarer.declare(new Fields("msg-id", "content"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // either replay a failed tuple, or create a new one
        List<Object> tuple = null;
        if (replay_queue.size() > 0){
            tuple = replay_queue.poll();
        }else{
            tuple = new ArrayList<Object>();
            tuple.add(null);
            tuple.add("Content #" + contentCounter++);
        }

        // increment msgId and set it as the first item in the tuple
        int msgId = this.curMsgId++;
        tuple.set(0, msgId);
        logger.info<http://logger.info>("Emitting: " + tuple);
        // add this tuple to the 'pending' map, and emit it.
        pending_map.put(msgId, tuple);
        collector.emit(tuple, msgId);
        Utils.sleep(100);
    }

    @Override
    public void ack(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> acked_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Acked: " + acked_tuple);
    }

    @Override
    public void fail(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> failed_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Failed: " + failed_tuple);

        // put a copy into the replay queue
        ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
        replay_queue.add(copy);
    }
}


Bolt:
public class SamplePrintBolt extends BaseRichBolt {

    private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    @Override
    public void execute(Tuple input) {
        logger.info<http://logger.info>("I see: " + input.getValues());
        Utils.sleep(4000);
        logger.info<http://logger.info>("Done sleeping. Acking: "  + input.getValues());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // doesn't emit
    }
}


Main:
public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        conf.setMessageTimeoutSecs(5);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SampleSpout());
        builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("local", conf, builder.createTopology());
}


Output:
30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
-- Why is it looking at tuple #3?  This has already failed.
45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
-- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.




Re: What is the purpose of timing out tuples?

Posted by Tom Brown <to...@gmail.com>.
You can always add your own timestamp to the tuple (propagate the parent
timestamp if you're emitting an anchored one). Assuming all the machines
have synchronized clocks, you can have your bolts silently drop tuples that
were emitted and have since timed out. No need to fail them since the
timeout mechanism already did.

--Tom

On Thu, Oct 23, 2014 at 3:13 PM, saiprasad mishra <saiprasadmishra@gmail.com
> wrote:

> +1 Michael Rose Hysterix CB is the way to go if REST APIs are invoked
> from topology
>
> *Timeout in topology becomes controllable :)*
>
>
> Regards
> Sai
>
> On Thu, Oct 23, 2014 at 1:57 PM, Michael Rose <mi...@fullcontact.com>
> wrote:
>
>> It's a last ditch mechanism for replaying work which might have gotten
>> stuck. Storm is an at-least-once processing system and doesn't aim to
>> provide exactly once / transactional behavior with base Storm. Trident aims
>> to implement that on top of the underlying at-least-once system.
>>
>> Timed out in-flight tuples will *not* be cleared, this is true.
>> Controlling latencies within a topology is a key to making Storm work. We
>> have IO work isolated by Hystrix commands to ensure we're always coming in
>> under our timeout period. We've experimented with using global streams to
>> "kill" a particular tuple tree, essentially adding some unique work to a
>> time-based cache to drop it at each bolt. It ultimately wasn't really
>> necessary by instead improving the consistency of external IO through
>> circuit breaking.
>>
>> If you take away nothing else, remember that Storm is at least once
>> processing. Its goal is to ensure processing eventually happens for
>> everything, no matter how many times it might take. It's up to you to
>> remove bad input or park it.
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> michael@fullcontact.com
>>
>> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:
>>
>>>  Hi all.  I'm hoping somebody can explain this behavior to me because
>>> it seems pretty unexpected.
>>>
>>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>>> the Spout.  The tuple itself will still be processed through the Topology,
>>> except acking/failing will have no effect.  Another problem is that the
>>> number of pending tuples will increase — timed out tuples do not count as
>>> pending even though they will flow through the topology.  Unless I'm
>>> missing something, these two combined problems make timing out tuples, at
>>> best. utterly pointless, and at worst very problematic (as it will just
>>> throw more tuples into a topology that is already maxed out).
>>>
>>>  Here's my topology:
>>>  - I have a spout.  On nextTuple, it either re-emits a tuple that has
>>> failed, and if none are present, creates a new tuple.
>>>  - I have a bolt that takes 4 seconds to ack a tuple.
>>>  - topology.max.spout.pending = 5
>>>  - topology.message.timeout.secs = 5
>>>
>>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to
>>> timeout — then the Bolt would next process the *resent* tuples.  Over
>>> time, more and more tuples would be acked (though they would frequently
>>> time out).
>>>
>>>  What I'm seeing instead is that even though tuples are timed-out, they
>>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>>> for the Bolt, and that timed-out tuples are not cleared from it.
>>> Regardless, this leads to all tuples timing out, since the Bolt will
>>> eventually only process tuples that have been timed out.
>>>
>>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>>
>>>  Two questions:
>>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>>> *fail* on the Spout even though the tuple will still be processed by
>>> the rest of the Topology!
>>>
>>>  Thanks,
>>> -Sam
>>>
>>>
>>>  Spout:
>>>  public class SampleSpout extends BaseRichSpout {
>>>     private static Logger logger =
>>> LoggerFactory.getLogger(SampleSpout.class);
>>>
>>>      SpoutOutputCollector collector;
>>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>>> List<Object>>();
>>>     Queue<List<Object>> replay_queue = new
>>> LinkedBlockingQueue<List<Object>>();
>>>
>>>      int contentCounter;
>>>     int curMsgId;
>>>
>>>      @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // unique-id always increments each time we emit.
>>>         // msg-id gets incremented only when new tuples are created.
>>>        declarer.declare(new Fields("msg-id", "content"));
>>>     }
>>>
>>>      @Override
>>>     public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector spoutOutputCollector) {
>>>         collector = spoutOutputCollector;
>>>     }
>>>
>>>      @Override
>>>     public void nextTuple() {
>>>         // either replay a failed tuple, or create a new one
>>>         List<Object> tuple = null;
>>>         if (replay_queue.size() > 0){
>>>             tuple = replay_queue.poll();
>>>         }else{
>>>             tuple = new ArrayList<Object>();
>>>             tuple.add(null);
>>>             tuple.add("Content #" + contentCounter++);
>>>         }
>>>
>>>          // increment msgId and set it as the first item in the tuple
>>>         int msgId = this.curMsgId++;
>>>         tuple.set(0, msgId);
>>>         logger.info("Emitting: " + tuple);
>>>         // add this tuple to the 'pending' map, and emit it.
>>>         pending_map.put(msgId, tuple);
>>>         collector.emit(tuple, msgId);
>>>         Utils.sleep(100);
>>>     }
>>>
>>>      @Override
>>>     public void ack(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>>         logger.info("Acked: " + acked_tuple);
>>>     }
>>>
>>>      @Override
>>>     public void fail(Object msgId){
>>>         // remove tuple from pending_map since it's no longer pending
>>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>>         logger.info("Failed: " + failed_tuple);
>>>
>>>          // put a copy into the replay queue
>>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>>         replay_queue.add(copy);
>>>     }
>>> }
>>>
>>>
>>>  Bolt:
>>>  public class SamplePrintBolt extends BaseRichBolt {
>>>
>>>      private static Logger logger =
>>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>>
>>>      OutputCollector collector;
>>>
>>>      @Override
>>>     public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector outputCollector) {
>>>         collector = outputCollector;
>>>     }
>>>
>>>      @Override
>>>     public void execute(Tuple input) {
>>>         logger.info("I see: " + input.getValues());
>>>         Utils.sleep(4000);
>>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>>         collector.ack(input);
>>>     }
>>>
>>>      @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         // doesn't emit
>>>     }
>>> }
>>>
>>>
>>>  Main:
>>> public static void main(String[] args) throws Exception {
>>>          Config conf = new Config();
>>>         conf.setMaxSpoutPending(5);
>>>         conf.setMessageTimeoutSecs(5);
>>>
>>>          TopologyBuilder builder = new TopologyBuilder();
>>>         builder.setSpout("spout", new SampleSpout());
>>>         builder.setBolt("bolt1", new
>>> SamplePrintBolt()).shuffleGrouping("spout");
>>>
>>>          LocalCluster cluster = new LocalCluster();
>>>         cluster.submitTopology("local", conf, builder.createTopology());
>>>  }
>>>
>>>
>>>  Output:
>>>  30084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>>> #0]
>>> 30085 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>>> #0]. Will now sleep...
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>>> #1]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>>> #2]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>>> #3]
>>> 30097 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>>> #4]
>>> 34086 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [0, Content #0]
>>> 34086 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>>> #1]. Will now sleep...
>>> 34087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>>> 34087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>>> #5]
>>> 38087 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [1, Content #1]
>>> 38087 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>>> #2]. Will now sleep...
>>> 38089 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>>> 38089 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>>> #6]
>>> *-- So far, so good… however, now it's time for things to timeout.*
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>>> 40082 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>>> 40083 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>>> 40083 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>>> #5]
>>> 40084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>>> #4]
>>> 40084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>>> #3]
>>> 40085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>>> #2]
>>> 42088 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [2, Content #2]
>>> *-- Acking a timed-out tuple… this does nothing.*
>>> 42088 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>>> #3]. Will now sleep…
>>> *-- Why is it looking at tuple #3?  This has already failed.*
>>> 45084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>>> 45085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>>> #6]
>>> 46089 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [3, Content #3]
>>> 46089 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>>> #4]. Will now sleep...
>>> 50084 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>>> #2]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>>> #5]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>>> #4]
>>> 50085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>>> #3]
>>> *-- More timeouts**…*
>>> 50090 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [4, Content #4]
>>> 50090 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>>> #5]. Will now sleep...
>>> 54091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [5, Content #5]
>>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>>> 54091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>>> #6]. Will now sleep...
>>> 55085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>>> 55085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>>> #6]
>>> 58091 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [6, Content #6]
>>> 58092 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>>> #5]. Will now sleep...
>>> 60085 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>>> #3]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>>> #2]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>>> #5]
>>> 60086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>>> #4]
>>> 62093 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [7, Content #5]
>>> 62093 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>>> #4]. Will now sleep…
>>> *-- It's clear that the Bolt looks at tuples even if they have
>>> timed-out.  It's queue will get longer and longer and tuples will always
>>> timeout.*
>>> 65086 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>>> 65087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>>> #6]
>>> 66094 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [8, Content #4]
>>> 66094 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>>> #3]. Will now sleep...
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>>> 70087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>>> #4]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>>> #5]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>>> #2]
>>> 70088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>>> #3]
>>> 70095 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [9, Content #3]
>>> 70095 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>>> #2]. Will now sleep...
>>> 74096 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [10, Content #2]
>>> 74096 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>>> #6]. Will now sleep...
>>> 75088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>>> 75088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>>> #6]
>>> 78097 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [11, Content #6]
>>> 78097 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>>> #2]. Will now sleep...
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>>> #3]
>>> 80087 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>>> #2]
>>> 80088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>>> #5]
>>> 80088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>>> #4]
>>> 82098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [12, Content #2]
>>> 82098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>>> #5]. Will now sleep...
>>> 85088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>>> 85088 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>>> #6]
>>> 86098 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [13, Content #5]
>>> 86099 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>>> #4]. Will now sleep...
>>> 90100 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [14, Content #4]
>>> 90101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>>> #3]. Will now sleep...
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>>> 90216 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>>> #5]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>>> #4]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>>> #2]
>>> 90217 [Thread-10-spout] INFO
>>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>>> #3]
>>> 94101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>>> Acking: [15, Content #3]
>>> 94101 [Thread-8-bolt1] INFO
>>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>>> #6]. Will now sleep…
>>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>>> failed 30 seconds ago.*
>>>
>>
>>
>

Re: What is the purpose of timing out tuples?

Posted by saiprasad mishra <sa...@gmail.com>.
+1 Michael Rose Hysterix CB is the way to go if REST APIs are invoked from
topology

*Timeout in topology becomes controllable :)*


Regards
Sai

On Thu, Oct 23, 2014 at 1:57 PM, Michael Rose <mi...@fullcontact.com>
wrote:

> It's a last ditch mechanism for replaying work which might have gotten
> stuck. Storm is an at-least-once processing system and doesn't aim to
> provide exactly once / transactional behavior with base Storm. Trident aims
> to implement that on top of the underlying at-least-once system.
>
> Timed out in-flight tuples will *not* be cleared, this is true.
> Controlling latencies within a topology is a key to making Storm work. We
> have IO work isolated by Hystrix commands to ensure we're always coming in
> under our timeout period. We've experimented with using global streams to
> "kill" a particular tuple tree, essentially adding some unique work to a
> time-based cache to drop it at each bolt. It ultimately wasn't really
> necessary by instead improving the consistency of external IO through
> circuit breaking.
>
> If you take away nothing else, remember that Storm is at least once
> processing. Its goal is to ensure processing eventually happens for
> everything, no matter how many times it might take. It's up to you to
> remove bad input or park it.
>
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:
>
>>  Hi all.  I'm hoping somebody can explain this behavior to me because it
>> seems pretty unexpected.
>>
>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>> the Spout.  The tuple itself will still be processed through the Topology,
>> except acking/failing will have no effect.  Another problem is that the
>> number of pending tuples will increase — timed out tuples do not count as
>> pending even though they will flow through the topology.  Unless I'm
>> missing something, these two combined problems make timing out tuples, at
>> best. utterly pointless, and at worst very problematic (as it will just
>> throw more tuples into a topology that is already maxed out).
>>
>>  Here's my topology:
>>  - I have a spout.  On nextTuple, it either re-emits a tuple that has
>> failed, and if none are present, creates a new tuple.
>>  - I have a bolt that takes 4 seconds to ack a tuple.
>>  - topology.max.spout.pending = 5
>>  - topology.message.timeout.secs = 5
>>
>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout
>> — then the Bolt would next process the *resent* tuples.  Over time, more
>> and more tuples would be acked (though they would frequently time out).
>>
>>  What I'm seeing instead is that even though tuples are timed-out, they
>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>> for the Bolt, and that timed-out tuples are not cleared from it.
>> Regardless, this leads to all tuples timing out, since the Bolt will
>> eventually only process tuples that have been timed out.
>>
>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>
>>  Two questions:
>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>> *fail* on the Spout even though the tuple will still be processed by the
>> rest of the Topology!
>>
>>  Thanks,
>> -Sam
>>
>>
>>  Spout:
>>  public class SampleSpout extends BaseRichSpout {
>>     private static Logger logger =
>> LoggerFactory.getLogger(SampleSpout.class);
>>
>>      SpoutOutputCollector collector;
>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>> List<Object>>();
>>     Queue<List<Object>> replay_queue = new
>> LinkedBlockingQueue<List<Object>>();
>>
>>      int contentCounter;
>>     int curMsgId;
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // unique-id always increments each time we emit.
>>         // msg-id gets incremented only when new tuples are created.
>>        declarer.declare(new Fields("msg-id", "content"));
>>     }
>>
>>      @Override
>>     public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector spoutOutputCollector) {
>>         collector = spoutOutputCollector;
>>     }
>>
>>      @Override
>>     public void nextTuple() {
>>         // either replay a failed tuple, or create a new one
>>         List<Object> tuple = null;
>>         if (replay_queue.size() > 0){
>>             tuple = replay_queue.poll();
>>         }else{
>>             tuple = new ArrayList<Object>();
>>             tuple.add(null);
>>             tuple.add("Content #" + contentCounter++);
>>         }
>>
>>          // increment msgId and set it as the first item in the tuple
>>         int msgId = this.curMsgId++;
>>         tuple.set(0, msgId);
>>         logger.info("Emitting: " + tuple);
>>         // add this tuple to the 'pending' map, and emit it.
>>         pending_map.put(msgId, tuple);
>>         collector.emit(tuple, msgId);
>>         Utils.sleep(100);
>>     }
>>
>>      @Override
>>     public void ack(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>         logger.info("Acked: " + acked_tuple);
>>     }
>>
>>      @Override
>>     public void fail(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>         logger.info("Failed: " + failed_tuple);
>>
>>          // put a copy into the replay queue
>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>         replay_queue.add(copy);
>>     }
>> }
>>
>>
>>  Bolt:
>>  public class SamplePrintBolt extends BaseRichBolt {
>>
>>      private static Logger logger =
>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>
>>      OutputCollector collector;
>>
>>      @Override
>>     public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector outputCollector) {
>>         collector = outputCollector;
>>     }
>>
>>      @Override
>>     public void execute(Tuple input) {
>>         logger.info("I see: " + input.getValues());
>>         Utils.sleep(4000);
>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>         collector.ack(input);
>>     }
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // doesn't emit
>>     }
>> }
>>
>>
>>  Main:
>> public static void main(String[] args) throws Exception {
>>          Config conf = new Config();
>>         conf.setMaxSpoutPending(5);
>>         conf.setMessageTimeoutSecs(5);
>>
>>          TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("spout", new SampleSpout());
>>         builder.setBolt("bolt1", new
>> SamplePrintBolt()).shuffleGrouping("spout");
>>
>>          LocalCluster cluster = new LocalCluster();
>>         cluster.submitTopology("local", conf, builder.createTopology());
>>  }
>>
>>
>>  Output:
>>  30084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>> #0]
>> 30085 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>> #0]. Will now sleep...
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>> #1]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>> #2]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>> #3]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>> #4]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [0, Content #0]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>> #1]. Will now sleep...
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>> #5]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [1, Content #1]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>> #2]. Will now sleep...
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>> #6]
>> *-- So far, so good… however, now it's time for things to timeout.*
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>> #5]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>> #4]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>> #3]
>> 40085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>> #2]
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [2, Content #2]
>> *-- Acking a timed-out tuple… this does nothing.*
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>> #3]. Will now sleep…
>> *-- Why is it looking at tuple #3?  This has already failed.*
>> 45084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>> 45085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>> #6]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [3, Content #3]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>> #4]. Will now sleep...
>> 50084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>> #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>> #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>> #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>> #3]
>> *-- More timeouts**…*
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [4, Content #4]
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>> #5]. Will now sleep...
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [5, Content #5]
>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>> #6]. Will now sleep...
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>> #6]
>> 58091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [6, Content #6]
>> 58092 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>> #5]. Will now sleep...
>> 60085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>> #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>> #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>> #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>> #4]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [7, Content #5]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>> #4]. Will now sleep…
>> *-- It's clear that the Bolt looks at tuples even if they have
>> timed-out.  It's queue will get longer and longer and tuples will always
>> timeout.*
>> 65086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>> 65087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>> #6]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [8, Content #4]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>> #3]. Will now sleep...
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>> #4]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>> #5]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>> #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>> #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [9, Content #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>> #2]. Will now sleep...
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [10, Content #2]
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>> #6]. Will now sleep...
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>> #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [11, Content #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>> #2]. Will now sleep...
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>> #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>> #2]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>> #5]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>> #4]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [12, Content #2]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>> #5]. Will now sleep...
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>> #6]
>> 86098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [13, Content #5]
>> 86099 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>> #4]. Will now sleep...
>> 90100 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [14, Content #4]
>> 90101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>> #3]. Will now sleep...
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>> #5]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>> #4]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>> #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>> #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [15, Content #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>> #6]. Will now sleep…
>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>> failed 30 seconds ago.*
>>
>
>

Re: What is the purpose of timing out tuples?

Posted by Sam Mati <sm...@appnexus.com>.
Thanks for the response.

My takeaway is that "timing out" is a last ditch effort, and that your timeout time should be higher than the maximum amount of time you expect your topology to take.  Otherwise you risk a vicious cycle:  timed-out out tuples are still processed, but are also re-emitted, causing more timeouts, causing more replays, etc.

I'm now wondering:  If all of my bolts always end in ack or fail, under what circumstances can tuples get "stuck"?  If a Worker gets killed, possibly?

One last question:  I frequently see mentioned that Storm guarantees everything gets run once.  Please correct me if I'm wrong, but Storm doesn't guarantee this at all -- It's up to your Spout to keep track of each pending tuple and to replay failed ones (as my example Spout below does).  Am I missing something here?

Again, thanks for your clarification!

Best,
-Sam

From: Michael Rose <mi...@fullcontact.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: Thursday, October 23, 2014 4:57 PM
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: What is the purpose of timing out tuples?

It's a last ditch mechanism for replaying work which might have gotten stuck. Storm is an at-least-once processing system and doesn't aim to provide exactly once / transactional behavior with base Storm. Trident aims to implement that on top of the underlying at-least-once system.

Timed out in-flight tuples will *not* be cleared, this is true. Controlling latencies within a topology is a key to making Storm work. We have IO work isolated by Hystrix commands to ensure we're always coming in under our timeout period. We've experimented with using global streams to "kill" a particular tuple tree, essentially adding some unique work to a time-based cache to drop it at each bolt. It ultimately wasn't really necessary by instead improving the consistency of external IO through circuit breaking.

If you take away nothing else, remember that Storm is at least once processing. Its goal is to ensure processing eventually happens for everything, no matter how many times it might take. It's up to you to remove bad input or park it.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com>> wrote:
Hi all.  I'm hoping somebody can explain this behavior to me because it seems pretty unexpected.

I'm seeing that timing out tuples does nothing except call "fail" on the Spout.  The tuple itself will still be processed through the Topology, except acking/failing will have no effect.  Another problem is that the number of pending tuples will increase — timed out tuples do not count as pending even though they will flow through the topology.  Unless I'm missing something, these two combined problems make timing out tuples, at best. utterly pointless, and at worst very problematic (as it will just throw more tuples into a topology that is already maxed out).

Here's my topology:
- I have a spout.  On nextTuple, it either re-emits a tuple that has failed, and if none are present, creates a new tuple.
- I have a bolt that takes 4 seconds to ack a tuple.
- topology.max.spout.pending = 5
- topology.message.timeout.secs = 5

I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then the Bolt would next process the resent tuples.  Over time, more and more tuples would be acked (though they would frequently time out).

What I'm seeing instead is that even though tuples are timed-out, they are still being processed by the Bolt.  I'm assuming there is buffer/queue for the Bolt, and that timed-out tuples are not cleared from it.  Regardless, this leads to all tuples timing out, since the Bolt will eventually only process tuples that have been timed out.

I'm assuming, and hoping, that I'm missing something obvious here…

Two questions:
1.  Can I prevent Bolts from processing already-timed-out tuples?
2.  What is the point of timing out tuples?  It does nothing but call fail on the Spout even though the tuple will still be processed by the rest of the Topology!

Thanks,
-Sam


Spout:
public class SampleSpout extends BaseRichSpout {
    private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);

    SpoutOutputCollector collector;
    Map<Integer, List<Object>> pending_map = new HashMap<Integer, List<Object>>();
    Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();

    int contentCounter;
    int curMsgId;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // unique-id always increments each time we emit.
        // msg-id gets incremented only when new tuples are created.
       declarer.declare(new Fields("msg-id", "content"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // either replay a failed tuple, or create a new one
        List<Object> tuple = null;
        if (replay_queue.size() > 0){
            tuple = replay_queue.poll();
        }else{
            tuple = new ArrayList<Object>();
            tuple.add(null);
            tuple.add("Content #" + contentCounter++);
        }

        // increment msgId and set it as the first item in the tuple
        int msgId = this.curMsgId++;
        tuple.set(0, msgId);
        logger.info<http://logger.info>("Emitting: " + tuple);
        // add this tuple to the 'pending' map, and emit it.
        pending_map.put(msgId, tuple);
        collector.emit(tuple, msgId);
        Utils.sleep(100);
    }

    @Override
    public void ack(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> acked_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Acked: " + acked_tuple);
    }

    @Override
    public void fail(Object msgId){
        // remove tuple from pending_map since it's no longer pending
        List<Object> failed_tuple = pending_map.remove(msgId);
        logger.info<http://logger.info>("Failed: " + failed_tuple);

        // put a copy into the replay queue
        ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
        replay_queue.add(copy);
    }
}


Bolt:
public class SamplePrintBolt extends BaseRichBolt {

    private static Logger logger = LoggerFactory.getLogger(SamplePrintBolt.class);

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    @Override
    public void execute(Tuple input) {
        logger.info<http://logger.info>("I see: " + input.getValues());
        Utils.sleep(4000);
        logger.info<http://logger.info>("Done sleeping. Acking: "  + input.getValues());
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // doesn't emit
    }
}


Main:
public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(5);
        conf.setMessageTimeoutSecs(5);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SampleSpout());
        builder.setBolt("bolt1", new SamplePrintBolt()).shuffleGrouping("spout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("local", conf, builder.createTopology());
}


Output:
30084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0]. Will now sleep...
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [0, Content #0]
34086 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1]. Will now sleep...
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
34087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [1, Content #1]
38087 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2]. Will now sleep...
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
38089 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
40082 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
40083 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3]. Will now sleep…
-- Why is it looking at tuple #3?  This has already failed.
45084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
45085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [3, Content #3]
46089 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4]. Will now sleep...
50084 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [4, Content #4]
50090 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5]. Will now sleep...
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6]. Will now sleep...
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
55085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [6, Content #6]
58092 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5]. Will now sleep...
60085 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [7, Content #5]
62093 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4]. Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out.  It's queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
65087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [8, Content #4]
66094 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3]. Will now sleep...
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
70087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [9, Content #3]
70095 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2]. Will now sleep...
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [10, Content #2]
74096 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6]. Will now sleep...
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
75088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [11, Content #6]
78097 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2]. Will now sleep...
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [12, Content #2]
82098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5]. Will now sleep...
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
85088 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [13, Content #5]
86099 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4]. Will now sleep...
90100 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [14, Content #4]
90101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3]. Will now sleep...
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
90216 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking: [15, Content #3]
94101 [Thread-8-bolt1] INFO  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6]. Will now sleep…
-- Problem gets exacerbated…  Bolt is now looking at tuples that have failed 30 seconds ago.


Re: What is the purpose of timing out tuples?

Posted by Michael Rose <mi...@fullcontact.com>.
It's a last ditch mechanism for replaying work which might have gotten
stuck. Storm is an at-least-once processing system and doesn't aim to
provide exactly once / transactional behavior with base Storm. Trident aims
to implement that on top of the underlying at-least-once system.

Timed out in-flight tuples will *not* be cleared, this is true. Controlling
latencies within a topology is a key to making Storm work. We have IO work
isolated by Hystrix commands to ensure we're always coming in under our
timeout period. We've experimented with using global streams to "kill" a
particular tuple tree, essentially adding some unique work to a time-based
cache to drop it at each bolt. It ultimately wasn't really necessary by
instead improving the consistency of external IO through circuit breaking.

If you take away nothing else, remember that Storm is at least once
processing. Its goal is to ensure processing eventually happens for
everything, no matter how many times it might take. It's up to you to
remove bad input or park it.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com

On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <sm...@appnexus.com> wrote:

>  Hi all.  I'm hoping somebody can explain this behavior to me because it
> seems pretty unexpected.
>
>  I'm seeing that timing out tuples does *nothing* except call "fail" on
> the Spout.  The tuple itself will still be processed through the Topology,
> except acking/failing will have no effect.  Another problem is that the
> number of pending tuples will increase — timed out tuples do not count as
> pending even though they will flow through the topology.  Unless I'm
> missing something, these two combined problems make timing out tuples, at
> best. utterly pointless, and at worst very problematic (as it will just
> throw more tuples into a topology that is already maxed out).
>
>  Here's my topology:
>  - I have a spout.  On nextTuple, it either re-emits a tuple that has
> failed, and if none are present, creates a new tuple.
>  - I have a bolt that takes 4 seconds to ack a tuple.
>  - topology.max.spout.pending = 5
>  - topology.message.timeout.secs = 5
>
>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout
> — then the Bolt would next process the *resent* tuples.  Over time, more
> and more tuples would be acked (though they would frequently time out).
>
>  What I'm seeing instead is that even though tuples are timed-out, they
> are still being processed by the Bolt.  I'm assuming there is buffer/queue
> for the Bolt, and that timed-out tuples are not cleared from it.
> Regardless, this leads to all tuples timing out, since the Bolt will
> eventually only process tuples that have been timed out.
>
>  I'm assuming, and hoping, that I'm missing something obvious here…
>
>  Two questions:
> 1.  Can I prevent Bolts from processing already-timed-out tuples?
> 2.  What is the point of timing out tuples?  It does *nothing* but call
> *fail* on the Spout even though the tuple will still be processed by the
> rest of the Topology!
>
>  Thanks,
> -Sam
>
>
>  Spout:
>  public class SampleSpout extends BaseRichSpout {
>     private static Logger logger =
> LoggerFactory.getLogger(SampleSpout.class);
>
>      SpoutOutputCollector collector;
>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
> List<Object>>();
>     Queue<List<Object>> replay_queue = new
> LinkedBlockingQueue<List<Object>>();
>
>      int contentCounter;
>     int curMsgId;
>
>      @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         // unique-id always increments each time we emit.
>         // msg-id gets incremented only when new tuples are created.
>        declarer.declare(new Fields("msg-id", "content"));
>     }
>
>      @Override
>     public void open(Map conf, TopologyContext context,
> SpoutOutputCollector spoutOutputCollector) {
>         collector = spoutOutputCollector;
>     }
>
>      @Override
>     public void nextTuple() {
>         // either replay a failed tuple, or create a new one
>         List<Object> tuple = null;
>         if (replay_queue.size() > 0){
>             tuple = replay_queue.poll();
>         }else{
>             tuple = new ArrayList<Object>();
>             tuple.add(null);
>             tuple.add("Content #" + contentCounter++);
>         }
>
>          // increment msgId and set it as the first item in the tuple
>         int msgId = this.curMsgId++;
>         tuple.set(0, msgId);
>         logger.info("Emitting: " + tuple);
>         // add this tuple to the 'pending' map, and emit it.
>         pending_map.put(msgId, tuple);
>         collector.emit(tuple, msgId);
>         Utils.sleep(100);
>     }
>
>      @Override
>     public void ack(Object msgId){
>         // remove tuple from pending_map since it's no longer pending
>         List<Object> acked_tuple = pending_map.remove(msgId);
>         logger.info("Acked: " + acked_tuple);
>     }
>
>      @Override
>     public void fail(Object msgId){
>         // remove tuple from pending_map since it's no longer pending
>         List<Object> failed_tuple = pending_map.remove(msgId);
>         logger.info("Failed: " + failed_tuple);
>
>          // put a copy into the replay queue
>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>         replay_queue.add(copy);
>     }
> }
>
>
>  Bolt:
>  public class SamplePrintBolt extends BaseRichBolt {
>
>      private static Logger logger =
> LoggerFactory.getLogger(SamplePrintBolt.class);
>
>      OutputCollector collector;
>
>      @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector outputCollector) {
>         collector = outputCollector;
>     }
>
>      @Override
>     public void execute(Tuple input) {
>         logger.info("I see: " + input.getValues());
>         Utils.sleep(4000);
>         logger.info("Done sleeping. Acking: "  + input.getValues());
>         collector.ack(input);
>     }
>
>      @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         // doesn't emit
>     }
> }
>
>
>  Main:
> public static void main(String[] args) throws Exception {
>          Config conf = new Config();
>         conf.setMaxSpoutPending(5);
>         conf.setMessageTimeoutSecs(5);
>
>          TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("spout", new SampleSpout());
>         builder.setBolt("bolt1", new
> SamplePrintBolt()).shuffleGrouping("spout");
>
>          LocalCluster cluster = new LocalCluster();
>         cluster.submitTopology("local", conf, builder.createTopology());
>  }
>
>
>  Output:
>  30084 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
> #0]
> 30085 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
> #0]. Will now sleep...
> 30097 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
> #1]
> 30097 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
> #2]
> 30097 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
> #3]
> 30097 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
> #4]
> 34086 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [0, Content #0]
> 34086 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
> #1]. Will now sleep...
> 34087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
> 34087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
> #5]
> 38087 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [1, Content #1]
> 38087 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
> #2]. Will now sleep...
> 38089 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
> 38089 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
> #6]
> *-- So far, so good… however, now it's time for things to timeout.*
> 40082 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
> 40082 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
> 40082 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
> 40083 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
> 40083 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
> #5]
> 40084 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
> #4]
> 40084 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
> #3]
> 40085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
> #2]
> 42088 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [2, Content #2]
> *-- Acking a timed-out tuple… this does nothing.*
> 42088 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
> #3]. Will now sleep…
> *-- Why is it looking at tuple #3?  This has already failed.*
> 45084 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
> 45085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
> #6]
> 46089 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [3, Content #3]
> 46089 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
> #4]. Will now sleep...
> 50084 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
> #2]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
> #5]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
> #4]
> 50085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
> #3]
> *-- More timeouts**…*
> 50090 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [4, Content #4]
> 50090 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
> #5]. Will now sleep...
> 54091 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [5, Content #5]
> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
> 54091 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
> #6]. Will now sleep...
> 55085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
> 55085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
> #6]
> 58091 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [6, Content #6]
> 58092 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
> #5]. Will now sleep...
> 60085 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
> #3]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
> #2]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
> #5]
> 60086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
> #4]
> 62093 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [7, Content #5]
> 62093 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
> #4]. Will now sleep…
> *-- It's clear that the Bolt looks at tuples even if they have timed-out.
> It's queue will get longer and longer and tuples will always timeout.*
> 65086 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
> 65087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
> #6]
> 66094 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [8, Content #4]
> 66094 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
> #3]. Will now sleep...
> 70087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
> 70087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
> 70087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
> 70088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
> 70088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
> #4]
> 70088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
> #5]
> 70088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
> #2]
> 70088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
> #3]
> 70095 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [9, Content #3]
> 70095 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
> #2]. Will now sleep...
> 74096 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [10, Content #2]
> 74096 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
> #6]. Will now sleep...
> 75088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
> 75088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
> #6]
> 78097 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [11, Content #6]
> 78097 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
> #2]. Will now sleep...
> 80087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
> 80087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
> 80087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
> 80087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
> 80087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
> #3]
> 80087 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
> #2]
> 80088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
> #5]
> 80088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
> #4]
> 82098 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [12, Content #2]
> 82098 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
> #5]. Will now sleep...
> 85088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
> 85088 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
> #6]
> 86098 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [13, Content #5]
> 86099 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
> #4]. Will now sleep...
> 90100 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [14, Content #4]
> 90101 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
> #3]. Will now sleep...
> 90216 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
> 90216 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
> 90216 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
> 90217 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
> 90217 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
> #5]
> 90217 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
> #4]
> 90217 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
> #2]
> 90217 [Thread-10-spout] INFO
>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
> #3]
> 94101 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
> Acking: [15, Content #3]
> 94101 [Thread-8-bolt1] INFO
>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
> #6]. Will now sleep…
>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
> failed 30 seconds ago.*
>