You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Itai Frenkel <It...@forter.com> on 2014/07/17 20:42:06 UTC

Acking is delayed by 5 seconds (in disruptor queue ?)

Hello,

I have noticed that an ack takes 5 seconds to pass from the bolt to the spout (see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker all running on the same worker. The spout and the bolt are ShellSpout and ShellBolt respectively.

It looks like the message is delayed in the LMAX disruptor? queue.
How can I reduce this delay to ~1ms ?

Regards,
Itai


2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138




Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Michael Rose <mi...@fullcontact.com>.
Worth clarifying for anyone else in this thread that a LBQ separating
production from consumption is not a default thing in Storm, it's something
we cooked up to prefetch elements from slow/batching resources.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com


On Fri, Jul 18, 2014 at 3:16 PM, Itai Frenkel <It...@forter.com> wrote:

>  Got it! thanks.
>  ------------------------------
> *From:* Michael Rose <mi...@fullcontact.com>
> *Sent:* Saturday, July 19, 2014 12:10 AM
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  1)
>
>  Lets say we have it set at a limit of 100 items. The LBQ currently has
> 97 items in it. The SQS client runs again, pulls 10 messages, and
> successfully inputs 3. The other 7 are blocked waiting for the queue to
> clear out. No new HTTP requests are made to SQS while this LBQ is full
> (essentially this is just called in a loop). nextTuple() eventually comes
> around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are
> inserted into the LBQ. At this point, we call out to the SQS client again
> to fetch another 10 (and hopefully the LBQ has not completely drained by
> the time the SQS client returns another 1-10 messages).
>
>  This model isn't picky about which SQS client is grabbing messages. SQS
> doesn't guarantee order (or single delivery) anyways. In one of our
> topologies, we have 8 spout instances consuming the same SQS queue for
> throughput purposes. Maybe I've misunderstood your question though. We
> normally don't have multiple topologies consuming the same queue, but
> depending on the data there's no reason we couldn't.
>
>  Also in this model, we don't use a blocking poll method, if the LBQ is
> empty, we skip emission and let Storm handle backoff if it wants to (see
> below).
>
>  2) By backoff I mean, your spout hasn't emitted in a while, it's going
> to slow down on calling nextTuple() to not busy-wait your entire CPU. If
> you're at maxSpoutPending limit, nextTuple is also not called until an ack
> or fail has been received (which is one of the reasons its so important to
> not block in the spout as much as possible).
>
>  Storm will use the
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
> when you don't emit. By default, it waits only 1ms before calling again
> (not exponential, just fixed), which is enough to prevent 10-100k/s polling
> behavior but not significantly increase latency -- if you're ever not
> emitting, you can probably afford to sleep for 1ms. We actually have it set
> to 10ms, given that 99.9% of the time we'll have a message ready for
> processing.
>
>  An alternative to this is the
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java
> if you do see an impact on your throughput--but I've never needed this.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
>
> On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel <It...@forter.com> wrote:
>
>>  Thanks for the thorough and quick answer.
>>
>>
>>  Two follow up questions. The context is performing low latency stream
>> manipulation (hopefully process a message the moment it arrives, maybe a
>> few milliseconds later).
>>
>>
>>  1. What happens if the LBQ contains 10 items, while the Storm topology
>> does not call nextTuple because of backoffs ? Wouldn't it be better of for
>> another Amazon SQS client to handle these items? Or are you assuming a
>> single Storm topology is the sole handler of these items ?
>>
>>
>>  2. If by backoff, you mean storm topology cannot handle any more
>> messages, or maxSpoutPending is reached, then ignore this question. If by
>> backoff you mean exponential backoff then I am worried about a message
>> arriving to the queue and nextTuple is not called for a long time (more
>> than a few milliseconds).
>>
>>
>>  Regards,
>>
>> Itai
>>  ------------------------------
>> *From:* Michael Rose <mi...@fullcontact.com>
>> *Sent:* Friday, July 18, 2014 11:27 PM
>>
>> *To:* user@storm.incubator.apache.org
>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>>
>>   I have no experience with multilang spouts, however my impression from
>> the docs is that you should be handling your own multiplexing if you're
>> writing a shellspout. Otherwise if you block for 5 seconds emitting a
>> tuple, you cannot process an ack until that's done. I'd experiment with
>> that, if you change the sleep.spout.wait time to be 500ms and you don't
>> block in your spout (instead returning "sync") it should back off just as
>> it does with a normal spout (see
>> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java,
>> "sync" is a no-op).
>>
>> The post you linked to was mine, and for a long time that was true
>> (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
>> automatic backoffs when no tuples are emitted. The only time I've
>> intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
>> only allow 10/s during development). I've never built a multilang spout
>> before.
>>
>>  Spouts, like bolts, run in a single-threaded context so blocking at all
>> prevents acks/fails/emits from being done until the thread is unblocked.
>> That is why it's best to have another thread dealing with IO and
>> asynchronously feeding a concurrent data structure the spout can utilize.
>> For example, in our internal Amazon SQS client our IO thread continuously
>> fetches up to 10 messages per get and shoves them into a
>> LinkedBlockingQueue (until full, then it blocks the IO thread only until
>> the spout emits clear up room).
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> michael@fullcontact.com
>>
>>
>> On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <It...@forter.com> wrote:
>>
>>>  So can you please explain this sentence from the multilang
>>> documentation?
>>>
>>>
>>>  "Also like ISpout, if you have no tuples to emit for a next, you
>>> should sleep for a small amount of time before syncing. ShellSpout will not
>>> automatically sleep for you"
>>>
>>> https://storm.incubator.apache.org/documentation/Multilang-protocol.html
>>>
>>>
>>>  I read it as: "Unless you sleep a small amount of time before syncing,
>>> the ShellSpout would serialize one "nextTuple" message per 1ms (see
>>> configuration below) which would require much more CPU cycles"
>>>
>>> topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
>>> topology.sleep.spout.wait.strategy.time.ms: 1
>>>
>>>  You can also refer to the answer here, which refers to regular Spouts
>>> doing sleep as well:
>>>
>>> https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0
>>>
>>>
>>>  Regards,
>>>
>>> Itai
>>>
>>>
>>>
>>>  ------------------------------
>>> *From:* Michael Rose <mi...@fullcontact.com>
>>> *Sent:* Friday, July 18, 2014 10:18 PM
>>> *To:* user@storm.incubator.apache.org
>>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>>>
>>>   Run your producer code in another thread to fill a LBQ, poll that
>>> with nextTuple instead.
>>>
>>>  You should never be blocking yourself inside a spout.
>>>
>>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>> michael@fullcontact.com
>>>
>>>
>>> On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com> wrote:
>>>
>>>>  Hello again,
>>>>
>>>>
>>>>  Attached is a simplified reproduction (without the ShellSpout, but
>>>> the concepts are the same).
>>>>
>>>>
>>>>  It seems that ack() and nextTuple() are always called on the same
>>>> thread. That means that there is an inherent tradeoff.
>>>>
>>>> Either nextTuple sleeps a few ms  (and then the ShellSpout would
>>>> serialize alot of nextTuple messages)
>>>>
>>>> or nextTuple can sleep but then the ack is delayed.
>>>>
>>>>
>>>>  Is there a way around this limitation?
>>>>
>>>>
>>>>  Itai
>>>>  ------------------------------
>>>> *From:* Itai Frenkel <It...@forter.com>
>>>> *Sent:* Thursday, July 17, 2014 9:42 PM
>>>> *To:* user@storm.incubator.apache.org
>>>> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>>>>
>>>>    Hello,
>>>>
>>>>  I have noticed that an ack takes 5 seconds to pass from the bolt to
>>>> the spout (see debug log below). It is a simple topology with 1 spout, 1
>>>> bolt and 1 acker all running on the same worker. The spout and the bolt are
>>>> ShellSpout and ShellBolt respectively.
>>>>
>>>>  It looks like the message is delayed in the LMAX disruptor​ queue.
>>>>  How can I reduce this delay to ~1ms ?
>>>>
>>>>  Regards,
>>>>  Itai
>>>>
>>>>
>>>>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
>>>> tuple 2759481868963667531
>>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
>>>> [-357211617823660063 -3928495599512172728]
>>>> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
>>>> tuple 2759481868963667531
>>>> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
>>>> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
>>>> -3928495599512172728]
>>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
>>>> __ack_ack [-357211617823660063]
>>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
>>>> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
>>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

RE: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Itai Frenkel <It...@forter.com>.
Got it! thanks.

________________________________
From: Michael Rose <mi...@fullcontact.com>
Sent: Saturday, July 19, 2014 12:10 AM
To: user@storm.incubator.apache.org
Subject: Re: Acking is delayed by 5 seconds (in disruptor queue ?)

1)

Lets say we have it set at a limit of 100 items. The LBQ currently has 97 items in it. The SQS client runs again, pulls 10 messages, and successfully inputs 3. The other 7 are blocked waiting for the queue to clear out. No new HTTP requests are made to SQS while this LBQ is full (essentially this is just called in a loop). nextTuple() eventually comes around, 6 blocked for insertion. etc. etc. until all 6 blocked messages are inserted into the LBQ. At this point, we call out to the SQS client again to fetch another 10 (and hopefully the LBQ has not completely drained by the time the SQS client returns another 1-10 messages).

This model isn't picky about which SQS client is grabbing messages. SQS doesn't guarantee order (or single delivery) anyways. In one of our topologies, we have 8 spout instances consuming the same SQS queue for throughput purposes. Maybe I've misunderstood your question though. We normally don't have multiple topologies consuming the same queue, but depending on the data there's no reason we couldn't.

Also in this model, we don't use a blocking poll method, if the LBQ is empty, we skip emission and let Storm handle backoff if it wants to (see below).

2) By backoff I mean, your spout hasn't emitted in a while, it's going to slow down on calling nextTuple() to not busy-wait your entire CPU. If you're at maxSpoutPending limit, nextTuple is also not called until an ack or fail has been received (which is one of the reasons its so important to not block in the spout as much as possible).

Storm will use the https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java when you don't emit. By default, it waits only 1ms before calling again (not exponential, just fixed), which is enough to prevent 10-100k/s polling behavior but not significantly increase latency -- if you're ever not emitting, you can probably afford to sleep for 1ms. We actually have it set to 10ms, given that 99.9% of the time we'll have a message ready for processing.

An alternative to this is the https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java if you do see an impact on your throughput--but I've never needed this.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>


On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel <It...@forter.com>> wrote:

Thanks for the thorough and quick answer.


Two follow up questions. The context is performing low latency stream manipulation (hopefully process a message the moment it arrives, maybe a few milliseconds later).


1. What happens if the LBQ contains 10 items, while the Storm topology does not call nextTuple because of backoffs ? Wouldn't it be better of for another Amazon SQS client to handle these items? Or are you assuming a single Storm topology is the sole handler of these items ?


2. If by backoff, you mean storm topology cannot handle any more messages, or maxSpoutPending is reached, then ignore this question. If by backoff you mean exponential backoff then I am worried about a message arriving to the queue and nextTuple is not called for a long time (more than a few milliseconds).


Regards,

Itai

________________________________
From: Michael Rose <mi...@fullcontact.com>>
Sent: Friday, July 18, 2014 11:27 PM

To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Acking is delayed by 5 seconds (in disruptor queue ?)

I have no experience with multilang spouts, however my impression from the docs is that you should be handling your own multiplexing if you're writing a shellspout. Otherwise if you block for 5 seconds emitting a tuple, you cannot process an ack until that's done. I'd experiment with that, if you change the sleep.spout.wait time to be 500ms and you don't block in your spout (instead returning "sync") it should back off just as it does with a normal spout (see https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java, "sync" is a no-op).

The post you linked to was mine, and for a long time that was true (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do automatic backoffs when no tuples are emitted. The only time I've intentionally blocked in a spout after 0.8.0 is to control throughout (e.g. only allow 10/s during development). I've never built a multilang spout before.

Spouts, like bolts, run in a single-threaded context so blocking at all prevents acks/fails/emits from being done until the thread is unblocked. That is why it's best to have another thread dealing with IO and asynchronously feeding a concurrent data structure the spout can utilize. For example, in our internal Amazon SQS client our IO thread continuously fetches up to 10 messages per get and shoves them into a LinkedBlockingQueue (until full, then it blocks the IO thread only until the spout emits clear up room).


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>


On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <It...@forter.com>> wrote:

So can you please explain this sentence from the multilang documentation?


"Also like ISpout, if you have no tuples to emit for a next, you should sleep for a small amount of time before syncing. ShellSpout will not automatically sleep for you"

https://storm.incubator.apache.org/documentation/Multilang-protocol.html


I read it as: "Unless you sleep a small amount of time before syncing, the ShellSpout would serialize one "nextTuple" message per 1ms (see configuration below) which would require much more CPU cycles"

topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms<http://topology.sleep.spout.wait.strategy.time.ms>: 1




You can also refer to the answer here, which refers to regular Spouts doing sleep as well:

https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0


Regards,

Itai



________________________________
From: Michael Rose <mi...@fullcontact.com>>
Sent: Friday, July 18, 2014 10:18 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Run your producer code in another thread to fill a LBQ, poll that with nextTuple instead.

You should never be blocking yourself inside a spout.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>


On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com>> wrote:

Hello again,


Attached is a simplified reproduction (without the ShellSpout, but the concepts are the same).


It seems that ack() and nextTuple() are always called on the same thread. That means that there is an inherent tradeoff.

Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize alot of nextTuple messages)

or nextTuple can sleep but then the ack is delayed.


Is there a way around this limitation?


Itai

________________________________
From: Itai Frenkel <It...@forter.com>>
Sent: Thursday, July 17, 2014 9:42 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Acking is delayed by 5 seconds (in disruptor queue ?)

Hello,

I have noticed that an ack takes 5 seconds to pass from the bolt to the spout (see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker all running on the same worker. The spout and the bolt are ShellSpout and ShellBolt respectively.

It looks like the message is delayed in the LMAX disruptor​ queue.
How can I reduce this delay to ~1ms ?

Regards,
Itai


2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138







Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Michael Rose <mi...@fullcontact.com>.
1)

Lets say we have it set at a limit of 100 items. The LBQ currently has 97
items in it. The SQS client runs again, pulls 10 messages, and successfully
inputs 3. The other 7 are blocked waiting for the queue to clear out. No
new HTTP requests are made to SQS while this LBQ is full (essentially this
is just called in a loop). nextTuple() eventually comes around, 6 blocked
for insertion. etc. etc. until all 6 blocked messages are inserted into the
LBQ. At this point, we call out to the SQS client again to fetch another 10
(and hopefully the LBQ has not completely drained by the time the SQS
client returns another 1-10 messages).

This model isn't picky about which SQS client is grabbing messages. SQS
doesn't guarantee order (or single delivery) anyways. In one of our
topologies, we have 8 spout instances consuming the same SQS queue for
throughput purposes. Maybe I've misunderstood your question though. We
normally don't have multiple topologies consuming the same queue, but
depending on the data there's no reason we couldn't.

Also in this model, we don't use a blocking poll method, if the LBQ is
empty, we skip emission and let Storm handle backoff if it wants to (see
below).

2) By backoff I mean, your spout hasn't emitted in a while, it's going to
slow down on calling nextTuple() to not busy-wait your entire CPU. If
you're at maxSpoutPending limit, nextTuple is also not called until an ack
or fail has been received (which is one of the reasons its so important to
not block in the spout as much as possible).

Storm will use the
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
when you don't emit. By default, it waits only 1ms before calling again
(not exponential, just fixed), which is enough to prevent 10-100k/s polling
behavior but not significantly increase latency -- if you're ever not
emitting, you can probably afford to sleep for 1ms. We actually have it set
to 10ms, given that 99.9% of the time we'll have a message ready for
processing.

An alternative to this is the
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/NothingEmptyEmitStrategy.java
if you do see an impact on your throughput--but I've never needed this.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com


On Fri, Jul 18, 2014 at 2:41 PM, Itai Frenkel <It...@forter.com> wrote:

>  Thanks for the thorough and quick answer.
>
>
>  Two follow up questions. The context is performing low latency stream
> manipulation (hopefully process a message the moment it arrives, maybe a
> few milliseconds later).
>
>
>  1. What happens if the LBQ contains 10 items, while the Storm topology
> does not call nextTuple because of backoffs ? Wouldn't it be better of for
> another Amazon SQS client to handle these items? Or are you assuming a
> single Storm topology is the sole handler of these items ?
>
>
>  2. If by backoff, you mean storm topology cannot handle any more
> messages, or maxSpoutPending is reached, then ignore this question. If by
> backoff you mean exponential backoff then I am worried about a message
> arriving to the queue and nextTuple is not called for a long time (more
> than a few milliseconds).
>
>
>  Regards,
>
> Itai
>  ------------------------------
> *From:* Michael Rose <mi...@fullcontact.com>
> *Sent:* Friday, July 18, 2014 11:27 PM
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  I have no experience with multilang spouts, however my impression from
> the docs is that you should be handling your own multiplexing if you're
> writing a shellspout. Otherwise if you block for 5 seconds emitting a
> tuple, you cannot process an ack until that's done. I'd experiment with
> that, if you change the sleep.spout.wait time to be 500ms and you don't
> block in your spout (instead returning "sync") it should back off just as
> it does with a normal spout (see
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java,
> "sync" is a no-op).
>
> The post you linked to was mine, and for a long time that was true
> (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
> automatic backoffs when no tuples are emitted. The only time I've
> intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
> only allow 10/s during development). I've never built a multilang spout
> before.
>
>  Spouts, like bolts, run in a single-threaded context so blocking at all
> prevents acks/fails/emits from being done until the thread is unblocked.
> That is why it's best to have another thread dealing with IO and
> asynchronously feeding a concurrent data structure the spout can utilize.
> For example, in our internal Amazon SQS client our IO thread continuously
> fetches up to 10 messages per get and shoves them into a
> LinkedBlockingQueue (until full, then it blocks the IO thread only until
> the spout emits clear up room).
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
>
> On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <It...@forter.com> wrote:
>
>>  So can you please explain this sentence from the multilang
>> documentation?
>>
>>
>>  "Also like ISpout, if you have no tuples to emit for a next, you should
>> sleep for a small amount of time before syncing. ShellSpout will not
>> automatically sleep for you"
>>
>> https://storm.incubator.apache.org/documentation/Multilang-protocol.html
>>
>>
>>  I read it as: "Unless you sleep a small amount of time before syncing,
>> the ShellSpout would serialize one "nextTuple" message per 1ms (see
>> configuration below) which would require much more CPU cycles"
>>
>> topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
>> topology.sleep.spout.wait.strategy.time.ms: 1
>>
>> You can also refer to the answer here, which refers to regular Spouts
>> doing sleep as well:
>>
>> https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0
>>
>>
>>  Regards,
>>
>> Itai
>>
>>
>>
>>  ------------------------------
>> *From:* Michael Rose <mi...@fullcontact.com>
>> *Sent:* Friday, July 18, 2014 10:18 PM
>> *To:* user@storm.incubator.apache.org
>> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>>
>>   Run your producer code in another thread to fill a LBQ, poll that with
>> nextTuple instead.
>>
>>  You should never be blocking yourself inside a spout.
>>
>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> michael@fullcontact.com
>>
>>
>> On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com> wrote:
>>
>>>  Hello again,
>>>
>>>
>>>  Attached is a simplified reproduction (without the ShellSpout, but the
>>> concepts are the same).
>>>
>>>
>>>  It seems that ack() and nextTuple() are always called on the same
>>> thread. That means that there is an inherent tradeoff.
>>>
>>> Either nextTuple sleeps a few ms  (and then the ShellSpout would
>>> serialize alot of nextTuple messages)
>>>
>>> or nextTuple can sleep but then the ack is delayed.
>>>
>>>
>>>  Is there a way around this limitation?
>>>
>>>
>>>  Itai
>>>  ------------------------------
>>> *From:* Itai Frenkel <It...@forter.com>
>>> *Sent:* Thursday, July 17, 2014 9:42 PM
>>> *To:* user@storm.incubator.apache.org
>>> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>>>
>>>    Hello,
>>>
>>>  I have noticed that an ack takes 5 seconds to pass from the bolt to
>>> the spout (see debug log below). It is a simple topology with 1 spout, 1
>>> bolt and 1 acker all running on the same worker. The spout and the bolt are
>>> ShellSpout and ShellBolt respectively.
>>>
>>>  It looks like the message is delayed in the LMAX disruptor​ queue.
>>>  How can I reduce this delay to ~1ms ?
>>>
>>>  Regards,
>>>  Itai
>>>
>>>
>>>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
>>> tuple 2759481868963667531
>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
>>> [-357211617823660063 -3928495599512172728]
>>> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
>>> tuple 2759481868963667531
>>> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
>>> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
>>> -3928495599512172728]
>>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
>>> __ack_ack [-357211617823660063]
>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
>>> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
>>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
>>>
>>>
>>>
>>>
>>
>

RE: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Itai Frenkel <It...@forter.com>.
Thanks for the thorough and quick answer.


Two follow up questions. The context is performing low latency stream manipulation (hopefully process a message the moment it arrives, maybe a few milliseconds later).


1. What happens if the LBQ contains 10 items, while the Storm topology does not call nextTuple because of backoffs ? Wouldn't it be better of for another Amazon SQS client to handle these items? Or are you assuming a single Storm topology is the sole handler of these items ?


2. If by backoff, you mean storm topology cannot handle any more messages, or maxSpoutPending is reached, then ignore this question. If by backoff you mean exponential backoff then I am worried about a message arriving to the queue and nextTuple is not called for a long time (more than a few milliseconds).


Regards,

Itai

________________________________
From: Michael Rose <mi...@fullcontact.com>
Sent: Friday, July 18, 2014 11:27 PM
To: user@storm.incubator.apache.org
Subject: Re: Acking is delayed by 5 seconds (in disruptor queue ?)

I have no experience with multilang spouts, however my impression from the docs is that you should be handling your own multiplexing if you're writing a shellspout. Otherwise if you block for 5 seconds emitting a tuple, you cannot process an ack until that's done. I'd experiment with that, if you change the sleep.spout.wait time to be 500ms and you don't block in your spout (instead returning "sync") it should back off just as it does with a normal spout (see https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java, "sync" is a no-op).

The post you linked to was mine, and for a long time that was true (especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do automatic backoffs when no tuples are emitted. The only time I've intentionally blocked in a spout after 0.8.0 is to control throughout (e.g. only allow 10/s during development). I've never built a multilang spout before.

Spouts, like bolts, run in a single-threaded context so blocking at all prevents acks/fails/emits from being done until the thread is unblocked. That is why it's best to have another thread dealing with IO and asynchronously feeding a concurrent data structure the spout can utilize. For example, in our internal Amazon SQS client our IO thread continuously fetches up to 10 messages per get and shoves them into a LinkedBlockingQueue (until full, then it blocks the IO thread only until the spout emits clear up room).


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>


On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <It...@forter.com>> wrote:

So can you please explain this sentence from the multilang documentation?


"Also like ISpout, if you have no tuples to emit for a next, you should sleep for a small amount of time before syncing. ShellSpout will not automatically sleep for you"

https://storm.incubator.apache.org/documentation/Multilang-protocol.html


I read it as: "Unless you sleep a small amount of time before syncing, the ShellSpout would serialize one "nextTuple" message per 1ms (see configuration below) which would require much more CPU cycles"

topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms<http://topology.sleep.spout.wait.strategy.time.ms>: 1



You can also refer to the answer here, which refers to regular Spouts doing sleep as well:

https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0


Regards,

Itai



________________________________
From: Michael Rose <mi...@fullcontact.com>>
Sent: Friday, July 18, 2014 10:18 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Run your producer code in another thread to fill a LBQ, poll that with nextTuple instead.

You should never be blocking yourself inside a spout.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>


On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com>> wrote:

Hello again,


Attached is a simplified reproduction (without the ShellSpout, but the concepts are the same).


It seems that ack() and nextTuple() are always called on the same thread. That means that there is an inherent tradeoff.

Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize alot of nextTuple messages)

or nextTuple can sleep but then the ack is delayed.


Is there a way around this limitation?


Itai

________________________________
From: Itai Frenkel <It...@forter.com>>
Sent: Thursday, July 17, 2014 9:42 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Acking is delayed by 5 seconds (in disruptor queue ?)

Hello,

I have noticed that an ack takes 5 seconds to pass from the bolt to the spout (see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker all running on the same worker. The spout and the bolt are ShellSpout and ShellBolt respectively.

It looks like the message is delayed in the LMAX disruptor? queue.
How can I reduce this delay to ~1ms ?

Regards,
Itai


2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138






Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Michael Rose <mi...@fullcontact.com>.
I have no experience with multilang spouts, however my impression from the
docs is that you should be handling your own multiplexing if you're writing
a shellspout. Otherwise if you block for 5 seconds emitting a tuple, you
cannot process an ack until that's done. I'd experiment with that, if you
change the sleep.spout.wait time to be 500ms and you don't block in your
spout (instead returning "sync") it should back off just as it does with a
normal spout (see
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java,
"sync" is a no-op).

The post you linked to was mine, and for a long time that was true
(especially 0.6 and 0.7). Since Storm 0.8, the spout wait strategy will do
automatic backoffs when no tuples are emitted. The only time I've
intentionally blocked in a spout after 0.8.0 is to control throughout (e.g.
only allow 10/s during development). I've never built a multilang spout
before.

Spouts, like bolts, run in a single-threaded context so blocking at all
prevents acks/fails/emits from being done until the thread is unblocked.
That is why it's best to have another thread dealing with IO and
asynchronously feeding a concurrent data structure the spout can utilize.
For example, in our internal Amazon SQS client our IO thread continuously
fetches up to 10 messages per get and shoves them into a
LinkedBlockingQueue (until full, then it blocks the IO thread only until
the spout emits clear up room).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com


On Fri, Jul 18, 2014 at 1:34 PM, Itai Frenkel <It...@forter.com> wrote:

>  So can you please explain this sentence from the multilang documentation?
>
>
>  "Also like ISpout, if you have no tuples to emit for a next, you should
> sleep for a small amount of time before syncing. ShellSpout will not
> automatically sleep for you"
>
> https://storm.incubator.apache.org/documentation/Multilang-protocol.html
>
>
>  I read it as: "Unless you sleep a small amount of time before syncing,
> the ShellSpout would serialize one "nextTuple" message per 1ms (see
> configuration below) which would require much more CPU cycles"
>
> topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
> topology.sleep.spout.wait.strategy.time.ms: 1
>
> You can also refer to the answer here, which refers to regular Spouts
> doing sleep as well:
>
> https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0
>
>
>  Regards,
>
> Itai
>
>
>
>  ------------------------------
> *From:* Michael Rose <mi...@fullcontact.com>
> *Sent:* Friday, July 18, 2014 10:18 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Acking is delayed by 5 seconds (in disruptor queue ?)
>
>  Run your producer code in another thread to fill a LBQ, poll that with
> nextTuple instead.
>
>  You should never be blocking yourself inside a spout.
>
>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
>
> On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com> wrote:
>
>>  Hello again,
>>
>>
>>  Attached is a simplified reproduction (without the ShellSpout, but the
>> concepts are the same).
>>
>>
>>  It seems that ack() and nextTuple() are always called on the same
>> thread. That means that there is an inherent tradeoff.
>>
>> Either nextTuple sleeps a few ms  (and then the ShellSpout would
>> serialize alot of nextTuple messages)
>>
>> or nextTuple can sleep but then the ack is delayed.
>>
>>
>>  Is there a way around this limitation?
>>
>>
>>  Itai
>>  ------------------------------
>> *From:* Itai Frenkel <It...@forter.com>
>> *Sent:* Thursday, July 17, 2014 9:42 PM
>> *To:* user@storm.incubator.apache.org
>> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>>
>>    Hello,
>>
>>  I have noticed that an ack takes 5 seconds to pass from the bolt to the
>> spout (see debug log below). It is a simple topology with 1 spout, 1 bolt
>> and 1 acker all running on the same worker. The spout and the bolt are
>> ShellSpout and ShellBolt respectively.
>>
>>  It looks like the message is delayed in the LMAX disruptor​ queue.
>>  How can I reduce this delay to ~1ms ?
>>
>>  Regards,
>>  Itai
>>
>>
>>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
>> tuple 2759481868963667531
>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
>> [-357211617823660063 -3928495599512172728]
>> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
>> tuple 2759481868963667531
>> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
>> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
>> -3928495599512172728]
>> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
>> __ack_ack [-357211617823660063]
>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
>> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
>> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
>>
>>
>>
>>
>

RE: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Itai Frenkel <It...@forter.com>.
So can you please explain this sentence from the multilang documentation?


"Also like ISpout, if you have no tuples to emit for a next, you should sleep for a small amount of time before syncing. ShellSpout will not automatically sleep for you"

https://storm.incubator.apache.org/documentation/Multilang-protocol.html


I read it as: "Unless you sleep a small amount of time before syncing, the ShellSpout would serialize one "nextTuple" message per 1ms (see configuration below) which would require much more CPU cycles"

topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms: 1

You can also refer to the answer here, which refers to regular Spouts doing sleep as well:

https://groups.google.com/forum/#!topic/storm-user/OSjaVgTK5m0


Regards,

Itai



________________________________
From: Michael Rose <mi...@fullcontact.com>
Sent: Friday, July 18, 2014 10:18 PM
To: user@storm.incubator.apache.org
Subject: Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Run your producer code in another thread to fill a LBQ, poll that with nextTuple instead.

You should never be blocking yourself inside a spout.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>


On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com>> wrote:

Hello again,


Attached is a simplified reproduction (without the ShellSpout, but the concepts are the same).


It seems that ack() and nextTuple() are always called on the same thread. That means that there is an inherent tradeoff.

Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize alot of nextTuple messages)

or nextTuple can sleep but then the ack is delayed.


Is there a way around this limitation?


Itai

________________________________
From: Itai Frenkel <It...@forter.com>>
Sent: Thursday, July 17, 2014 9:42 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Acking is delayed by 5 seconds (in disruptor queue ?)

Hello,

I have noticed that an ack takes 5 seconds to pass from the bolt to the spout (see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker all running on the same worker. The spout and the bolt are ShellSpout and ShellBolt respectively.

It looks like the message is delayed in the LMAX disruptor? queue.
How can I reduce this delay to ~1ms ?

Regards,
Itai


2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138





Re: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Michael Rose <mi...@fullcontact.com>.
Run your producer code in another thread to fill a LBQ, poll that with
nextTuple instead.

You should never be blocking yourself inside a spout.

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com


On Fri, Jul 18, 2014 at 1:03 PM, Itai Frenkel <It...@forter.com> wrote:

>  Hello again,
>
>
>  Attached is a simplified reproduction (without the ShellSpout, but the
> concepts are the same).
>
>
>  It seems that ack() and nextTuple() are always called on the same
> thread. That means that there is an inherent tradeoff.
>
> Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize
> alot of nextTuple messages)
>
> or nextTuple can sleep but then the ack is delayed.
>
>
>  Is there a way around this limitation?
>
>
>  Itai
>  ------------------------------
> *From:* Itai Frenkel <It...@forter.com>
> *Sent:* Thursday, July 17, 2014 9:42 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Acking is delayed by 5 seconds (in disruptor queue ?)
>
>   Hello,
>
>  I have noticed that an ack takes 5 seconds to pass from the bolt to the
> spout (see debug log below). It is a simple topology with 1 spout, 1 bolt
> and 1 acker all running on the same worker. The spout and the bolt are
> ShellSpout and ShellBolt respectively.
>
>  It looks like the message is delayed in the LMAX disruptor​ queue.
>  How can I reduce this delay to ~1ms ?
>
>  Regards,
>  Itai
>
>
>  2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to
> tuple 2759481868963667531
> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack
> [-357211617823660063 -3928495599512172728]
> 2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to
> tuple 2759481868963667531
> 2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message
> source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063
> -3928495599512172728]
> 2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker
> __ack_ack [-357211617823660063]
> 2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message
> source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
> 2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138
>
>
>
>

RE: Acking is delayed by 5 seconds (in disruptor queue ?)

Posted by Itai Frenkel <It...@forter.com>.
Hello again,


Attached is a simplified reproduction (without the ShellSpout, but the concepts are the same).


It seems that ack() and nextTuple() are always called on the same thread. That means that there is an inherent tradeoff.

Either nextTuple sleeps a few ms  (and then the ShellSpout would serialize alot of nextTuple messages)

or nextTuple can sleep but then the ack is delayed.


Is there a way around this limitation?


Itai

________________________________
From: Itai Frenkel <It...@forter.com>
Sent: Thursday, July 17, 2014 9:42 PM
To: user@storm.incubator.apache.org
Subject: Acking is delayed by 5 seconds (in disruptor queue ?)

Hello,

I have noticed that an ack takes 5 seconds to pass from the bolt to the spout (see debug log below). It is a simple topology with 1 spout, 1 bolt and 1 acker all running on the same worker. The spout and the bolt are ShellSpout and ShellBolt respectively.

It looks like the message is delayed in the LMAX disruptor? queue.
How can I reduce this delay to ~1ms ?

Regards,
Itai


2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Sent process to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting: bolt __ack_ack [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.t.ShellBolt [INFO] Shell msg: Bolt sent ack to tuple 2759481868963667531
2014-07-17 18:30:30 b.s.d.executor [INFO] Processing received message source: bolt:2, stream: __ack_ack, id: {}, [-357211617823660063 -3928495599512172728]
2014-07-17 18:30:30 b.s.d.task [INFO] Emitting direct: 3; __acker __ack_ack [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-357211617823660063]
2014-07-17 18:30:35 b.s.d.executor [INFO] Acking message 1138