You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Kashyap Mhaisekar <ka...@gmail.com> on 2015/07/15 18:45:40 UTC

Realtime computations using storm - questions on performance

Hi,
We are attempting a real-time distributed computing using storm and the
solution has only one problem - inter bolt latency on same machine or
across machines ranges between 2 - 250 ms. I am not able to figure out why.
Network latency is under 0.5 ms. By latency, I mean the time between an
emit of one bolt/spout to getting the message in execute() of next bolt.

I have a topology like the below -
A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this number and
divides this into 10 emits of 100 each) -> C (bolt) [Recieves these emits
and divides this to 10 emits of 10 numbers) -> D (bolt) [Does some
computation on the number and emits one message] -> E (bolt) [Aggregates
all the data and confirms if all the 1000 messages are processed)

Every bolt takes under 3 msec to complete and as a result, I estimated that
the end to end processing for 1000 takes not more than 50 msec including
any latencies.

*Observations*
1. The end to end time from Spout A to Bolt E takes 200 msec to 3 seconds.
My estimate was under 50 msec given that each bolt and spout take under 3
msec to execute including any latencies.
2. I noticed that the most of the time is spent between Emit from a
Spout/Bolt and execute() of the consuming bolt.
3. Network latency is under 0.5 msec.

I am not able to figure out why it takes so much time between a spout/bolt
to next bolt. I understand that the spout/bolt buffers the data into a
queue and then the subsequent bolt consumes from there.

*Infrastructure*
1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there are 20
workers overall.

*Test*
1. The test was done with 25 messages to the spout => 25 messages are sent
to spout in a span of 5 seconds.

*Config values*
Config config = new Config();
config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);

Please let me know if you have encountered similar issues and any steps you
have taken to mitigate the time taken between spout/bolt and another bolt.

Thanks
Kashyap

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Updates -
1. Increasing the buffer sizes and adjusting the max topology spout pending
helped a bit. But the problem that I see from logs is that the time   it
takes for the various threads (executor) to move messages from one queue to
other takes abnormal amounts of time. Not sure how to reduce that.

Thanks
Kashyap

On Wed, Jul 22, 2015 at 3:36 PM, Kashyap Mhaisekar <ka...@gmail.com>
wrote:

> I kind of believe that the MainThread which picks the data from incoming
> queue is taking a longish time.Did anyone face this?
>
> Execute and Process latencies are under 3-8 ms but the overall time taken
> for the message to get processed is close to a 300ms. This is where I dont
> understand what is happening. The case of missing 290ms
>
> How is the overall time taken for a message computed? Is it the process
> latency at the Spout or a sum of process latencies at all the bolts?
>
> Thanks
> Kashyap
>
> On Sun, Jul 19, 2015 at 1:30 PM, Kashyap Mhaisekar <ka...@gmail.com>
> wrote:
>
>> I changed it to debug to find out the reason behind increased times as I
>> suspected buffer overflow. It was info level.
>>
>> Regards
>> Kashyap
>> On Jul 19, 2015 1:18 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>
>> Are your logs on debug level? Try changing to info.
>> On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>>
>>> Thanks Nathan.
>>> The reason for the increased time taken between bolts could be due to -
>>> 1. Buffer overflow
>>> 2. GC activity.
>>> 3. Low parallelism
>>> 4. Latency between machines (around 0.3 ms)
>>>
>>> Debug logs indicate queue capacity and population of queues in limits,
>>> so probably that is not the cause.
>>>
>>> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
>>> sure if this number is good or bad. Still figuring out...
>>>
>>> Parallelism does not seem to be a problem as capacity is under 0.3-0.5
>>> for all bolts.
>>>
>>> Do you know of any other reasons based on experience?
>>>
>>> Thanks for the time
>>>
>>> Thanks
>>> Kashyap
>>> On Jul 19, 2015 02:29, "Nathan Leung" <nc...@gmail.com> wrote:
>>>
>>>> Generally, binary search combined with observation of the system
>>>> (whether it meets throughput/latency targets) is a good approach.
>>>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Nathan,
>>>>> The bolts are extending BaseBasicBolt and also, the in the spout am
>>>>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>>>>> What I see is this -
>>>>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>>>>> execute() and end of execute())
>>>>> 2. The time is being spent *between* the bolts
>>>>> 3. The thread dumps all show LMAX disruptor at -
>>>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>>>>> CPU time is being spent.
>>>>>
>>>>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <
>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>>>>> that false impression as the site docs are a bit misleading. Thanks for
>>>>>> pointing that out!
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>> Kindly yours,
>>>>>>
>>>>>> Andrew Grammenos
>>>>>>
>>>>>> -- PGP PKey --
>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>
>>>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> If your tuples are reliable (spout emit with message id) and
>>>>>>> anchored (emit from bolt anchored to input tuple), then you have to answer
>>>>>>> that yourself. If not, then your output queue size is not constrained by
>>>>>>> the framework and you may still have high latency.
>>>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nathan,
>>>>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>>>>> with throughput.
>>>>>>>>
>>>>>>>> Thank you!
>>>>>>>> Kashyap
>>>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> If your tuples are anchored max spout pending indirectly affects
>>>>>>>>> how many tuples are generated ;).
>>>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>>>>> controllable via Max Spout Pending?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Kashyap
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>>>>> application is especially latency sensitive or your queue is so long that
>>>>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>>>>>> this in more detail.
>>>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Nathan,
>>>>>>>>>>>>> Thanks. Have been running on a bare bones topology as
>>>>>>>>>>>>> suggested. I am inclined to believe that the no. of messages in the
>>>>>>>>>>>>> topology at that point in time is affecting the "latency".
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am trying to now figure out how should the topology be
>>>>>>>>>>>>> structured when the no. of transient tupples in the topology is very high.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->
>>>>>>>>>>>>>  A (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) ->
>>>>>>>>>>>>> C (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>>>>
>>>>>>>>>>>>> What I observed is as follows -
>>>>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000
>>>>>>>>>>>>> tuples emitted and the time taken to aggregate the 100 is 4 seconds to 10
>>>>>>>>>>>>> seconds*
>>>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are
>>>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Question - How should the use case be handled where in the no.
>>>>>>>>>>>>> of tuples in the topology could increase exponentially..,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you again,
>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that
>>>>>>>>>>>>>>>> you have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application
>>>>>>>>>>>>>>>>> I am working on has to be able to send tuples directly to specific tasks.
>>>>>>>>>>>>>>>>> In general control the data flow. Can you please explain to me why you
>>>>>>>>>>>>>>>>> would not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <ncleung@gmail.com
>>>>>>>>>>>>>>>>> >:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a
>>>>>>>>>>>>>>>>>> good reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the
>>>>>>>>>>>>>>>>>>> same thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <
>>>>>>>>>>>>>>>>>>> ncleung@gmail.com>:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on
>>>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, and you are
>>>>>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per
>>>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible
>>>>>>>>>>>>>>>>>>>> to reduce network transfers.
>>>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing
>>>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem -
>>>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines
>>>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 each) -> C
>>>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 10 numbers) ->
>>>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 takes not more
>>>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes
>>>>>>>>>>>>>>>>>>>>> 200 msec to 3 seconds. My estimate was under 50 msec given that each bolt
>>>>>>>>>>>>>>>>>>>>> and spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent
>>>>>>>>>>>>>>>>>>>>> between Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with
>>>>>>>>>>>>>>>>>>>>> 1024 MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout =>
>>>>>>>>>>>>>>>>>>>>> 25 messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS,
>>>>>>>>>>>>>>>>>>>>> Integer.parseInt(20));
>>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar
>>>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time taken between
>>>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>>
>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
I kind of believe that the MainThread which picks the data from incoming
queue is taking a longish time.Did anyone face this?

Execute and Process latencies are under 3-8 ms but the overall time taken
for the message to get processed is close to a 300ms. This is where I dont
understand what is happening. The case of missing 290ms

How is the overall time taken for a message computed? Is it the process
latency at the Spout or a sum of process latencies at all the bolts?

Thanks
Kashyap

On Sun, Jul 19, 2015 at 1:30 PM, Kashyap Mhaisekar <ka...@gmail.com>
wrote:

> I changed it to debug to find out the reason behind increased times as I
> suspected buffer overflow. It was info level.
>
> Regards
> Kashyap
>  On Jul 19, 2015 1:18 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>
> Are your logs on debug level? Try changing to info.
> On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>
>> Thanks Nathan.
>> The reason for the increased time taken between bolts could be due to -
>> 1. Buffer overflow
>> 2. GC activity.
>> 3. Low parallelism
>> 4. Latency between machines (around 0.3 ms)
>>
>> Debug logs indicate queue capacity and population of queues in limits, so
>> probably that is not the cause.
>>
>> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
>> sure if this number is good or bad. Still figuring out...
>>
>> Parallelism does not seem to be a problem as capacity is under 0.3-0.5
>> for all bolts.
>>
>> Do you know of any other reasons based on experience?
>>
>> Thanks for the time
>>
>> Thanks
>> Kashyap
>> On Jul 19, 2015 02:29, "Nathan Leung" <nc...@gmail.com> wrote:
>>
>>> Generally, binary search combined with observation of the system
>>> (whether it meets throughput/latency targets) is a good approach.
>>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>> wrote:
>>>
>>>> Nathan,
>>>> The bolts are extending BaseBasicBolt and also, the in the spout am
>>>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>>>> What I see is this -
>>>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>>>> execute() and end of execute())
>>>> 2. The time is being spent *between* the bolts
>>>> 3. The thread dumps all show LMAX disruptor at -
>>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>>>> CPU time is being spent.
>>>>
>>>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>>>
>>>> Thanks
>>>> Kashyap
>>>>
>>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <
>>>> andreas.grammenos@gmail.com> wrote:
>>>>
>>>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>>>> that false impression as the site docs are a bit misleading. Thanks for
>>>>> pointing that out!
>>>>>
>>>>> Regards.
>>>>>
>>>>> Kindly yours,
>>>>>
>>>>> Andrew Grammenos
>>>>>
>>>>> -- PGP PKey --
>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>
>>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If your tuples are reliable (spout emit with message id) and anchored
>>>>>> (emit from bolt anchored to input tuple), then you have to answer that
>>>>>> yourself. If not, then your output queue size is not constrained by the
>>>>>> framework and you may still have high latency.
>>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Nathan,
>>>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>>>> with throughput.
>>>>>>>
>>>>>>> Thank you!
>>>>>>> Kashyap
>>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>>
>>>>>>>> If your tuples are anchored max spout pending indirectly affects
>>>>>>>> how many tuples are generated ;).
>>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>>>> controllable via Max Spout Pending?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Kashyap
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>>>> application is especially latency sensitive or your queue is so long that
>>>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>>>>> this in more detail.
>>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Nathan,
>>>>>>>>>>>> Thanks. Have been running on a bare bones topology as
>>>>>>>>>>>> suggested. I am inclined to believe that the no. of messages in the
>>>>>>>>>>>> topology at that point in time is affecting the "latency".
>>>>>>>>>>>>
>>>>>>>>>>>> Am trying to now figure out how should the topology be
>>>>>>>>>>>> structured when the no. of transient tupples in the topology is very high.
>>>>>>>>>>>>
>>>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->
>>>>>>>>>>>>  A (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) ->
>>>>>>>>>>>> C (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>>>
>>>>>>>>>>>> What I observed is as follows -
>>>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are
>>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>>>
>>>>>>>>>>>> Question - How should the use case be handled where in the no.
>>>>>>>>>>>> of tuples in the topology could increase exponentially..,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you again,
>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application
>>>>>>>>>>>>>>>> I am working on has to be able to send tuples directly to specific tasks.
>>>>>>>>>>>>>>>> In general control the data flow. Can you please explain to me why you
>>>>>>>>>>>>>>>> would not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>
>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a
>>>>>>>>>>>>>>>>> good reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the
>>>>>>>>>>>>>>>>>> same thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <
>>>>>>>>>>>>>>>>>> ncleung@gmail.com>:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on
>>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, and you are
>>>>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per
>>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible
>>>>>>>>>>>>>>>>>>> to reduce network transfers.
>>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing
>>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem -
>>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines
>>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 each) -> C
>>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 10 numbers) ->
>>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 takes not more
>>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that each bolt and
>>>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024
>>>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout =>
>>>>>>>>>>>>>>>>>>>> 25 messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS,
>>>>>>>>>>>>>>>>>>>> Integer.parseInt(20));
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar
>>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time taken between
>>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
I changed it to debug to find out the reason behind increased times as I
suspected buffer overflow. It was info level.

Regards
Kashyap
 On Jul 19, 2015 1:18 PM, "Nathan Leung" <nc...@gmail.com> wrote:

Are your logs on debug level? Try changing to info.
On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Thanks Nathan.
> The reason for the increased time taken between bolts could be due to -
> 1. Buffer overflow
> 2. GC activity.
> 3. Low parallelism
> 4. Latency between machines (around 0.3 ms)
>
> Debug logs indicate queue capacity and population of queues in limits, so
> probably that is not the cause.
>
> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
> sure if this number is good or bad. Still figuring out...
>
> Parallelism does not seem to be a problem as capacity is under 0.3-0.5 for
> all bolts.
>
> Do you know of any other reasons based on experience?
>
> Thanks for the time
>
> Thanks
> Kashyap
> On Jul 19, 2015 02:29, "Nathan Leung" <nc...@gmail.com> wrote:
>
>> Generally, binary search combined with observation of the system (whether
>> it meets throughput/latency targets) is a good approach.
>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>>
>>> Nathan,
>>> The bolts are extending BaseBasicBolt and also, the in the spout am
>>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>>> What I see is this -
>>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>>> execute() and end of execute())
>>> 2. The time is being spent *between* the bolts
>>> 3. The thread dumps all show LMAX disruptor at -
>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>>> CPU time is being spent.
>>>
>>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>>
>>> Thanks
>>> Kashyap
>>>
>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <andreas.grammenos@gmail.com
>>> > wrote:
>>>
>>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>>> that false impression as the site docs are a bit misleading. Thanks for
>>>> pointing that out!
>>>>
>>>> Regards.
>>>>
>>>> Kindly yours,
>>>>
>>>> Andrew Grammenos
>>>>
>>>> -- PGP PKey --
>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>
>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com>
>>>> wrote:
>>>>
>>>>> If your tuples are reliable (spout emit with message id) and anchored
>>>>> (emit from bolt anchored to input tuple), then you have to answer that
>>>>> yourself. If not, then your output queue size is not constrained by the
>>>>> framework and you may still have high latency.
>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Nathan,
>>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>>> with throughput.
>>>>>>
>>>>>> Thank you!
>>>>>> Kashyap
>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>
>>>>>>> If your tuples are anchored max spout pending indirectly affects how
>>>>>>> many tuples are generated ;).
>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>>> controllable via Max Spout Pending?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Kashyap
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>>> application is especially latency sensitive or your queue is so long that
>>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>>>> this in more detail.
>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nathan,
>>>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested.
>>>>>>>>>>> I am inclined to believe that the no. of messages in the topology at that
>>>>>>>>>>> point in time is affecting the "latency".
>>>>>>>>>>>
>>>>>>>>>>> Am trying to now figure out how should the topology be
>>>>>>>>>>> structured when the no. of transient tupples in the topology is very high.
>>>>>>>>>>>
>>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>>
>>>>>>>>>>> What I observed is as follows -
>>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are
>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>>
>>>>>>>>>>> Question - How should the use case be handled where in the no.
>>>>>>>>>>> of tuples in the topology could increase exponentially..,
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Kashyap
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you again,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I
>>>>>>>>>>>>>>> am working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a
>>>>>>>>>>>>>>>> good reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncleung@gmail.com
>>>>>>>>>>>>>>>>> >:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on
>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, and you are
>>>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per
>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing
>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem -
>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines
>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 each) -> C
>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 10 numbers) ->
>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 takes not more
>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that each bolt and
>>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024
>>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS,
>>>>>>>>>>>>>>>>>>> Integer.parseInt(20));
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar
>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time taken between
>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>
>>>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
Are your logs on debug level? Try changing to info.
On Jul 19, 2015 1:32 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Thanks Nathan.
> The reason for the increased time taken between bolts could be due to -
> 1. Buffer overflow
> 2. GC activity.
> 3. Low parallelism
> 4. Latency between machines (around 0.3 ms)
>
> Debug logs indicate queue capacity and population of queues in limits, so
> probably that is not the cause.
>
> For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
> sure if this number is good or bad. Still figuring out...
>
> Parallelism does not seem to be a problem as capacity is under 0.3-0.5 for
> all bolts.
>
> Do you know of any other reasons based on experience?
>
> Thanks for the time
>
> Thanks
> Kashyap
> On Jul 19, 2015 02:29, "Nathan Leung" <nc...@gmail.com> wrote:
>
>> Generally, binary search combined with observation of the system (whether
>> it meets throughput/latency targets) is a good approach.
>> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>>
>>> Nathan,
>>> The bolts are extending BaseBasicBolt and also, the in the spout am
>>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>>> What I see is this -
>>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>>> execute() and end of execute())
>>> 2. The time is being spent *between* the bolts
>>> 3. The thread dumps all show LMAX disruptor at -
>>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>>> CPU time is being spent.
>>>
>>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>>
>>> Thanks
>>> Kashyap
>>>
>>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <andreas.grammenos@gmail.com
>>> > wrote:
>>>
>>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>>> that false impression as the site docs are a bit misleading. Thanks for
>>>> pointing that out!
>>>>
>>>> Regards.
>>>>
>>>> Kindly yours,
>>>>
>>>> Andrew Grammenos
>>>>
>>>> -- PGP PKey --
>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>
>>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com>
>>>> wrote:
>>>>
>>>>> If your tuples are reliable (spout emit with message id) and anchored
>>>>> (emit from bolt anchored to input tuple), then you have to answer that
>>>>> yourself. If not, then your output queue size is not constrained by the
>>>>> framework and you may still have high latency.
>>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Nathan,
>>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>>> with throughput.
>>>>>>
>>>>>> Thank you!
>>>>>> Kashyap
>>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>
>>>>>>> If your tuples are anchored max spout pending indirectly affects how
>>>>>>> many tuples are generated ;).
>>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>>> controllable via Max Spout Pending?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Kashyap
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>>> application is especially latency sensitive or your queue is so long that
>>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>>>> this in more detail.
>>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nathan,
>>>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested.
>>>>>>>>>>> I am inclined to believe that the no. of messages in the topology at that
>>>>>>>>>>> point in time is affecting the "latency".
>>>>>>>>>>>
>>>>>>>>>>> Am trying to now figure out how should the topology be
>>>>>>>>>>> structured when the no. of transient tupples in the topology is very high.
>>>>>>>>>>>
>>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>>
>>>>>>>>>>> What I observed is as follows -
>>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are
>>>>>>>>>>> so much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>>
>>>>>>>>>>> Question - How should the use case be handled where in the no.
>>>>>>>>>>> of tuples in the topology could increase exponentially..,
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Kashyap
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you again,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I
>>>>>>>>>>>>>>> am working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a
>>>>>>>>>>>>>>>> good reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncleung@gmail.com
>>>>>>>>>>>>>>>>> >:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on
>>>>>>>>>>>>>>>>>> parallelism. One emit from A becomes 100 emitted from C, and you are
>>>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per
>>>>>>>>>>>>>>>>>> worker). All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing
>>>>>>>>>>>>>>>>>>> using storm and the solution has only one problem -
>>>>>>>>>>>>>>>>>>> inter bolt latency on same machine or across machines
>>>>>>>>>>>>>>>>>>> ranges between 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 each) -> C
>>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 10 numbers) ->
>>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 takes not more
>>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that each bolt and
>>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024
>>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS,
>>>>>>>>>>>>>>>>>>> Integer.parseInt(20));
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar
>>>>>>>>>>>>>>>>>>> issues and any steps you have taken to mitigate the time taken between
>>>>>>>>>>>>>>>>>>> spout/bolt and another bolt.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>
>>>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Thanks Nathan.
The reason for the increased time taken between bolts could be due to -
1. Buffer overflow
2. GC activity.
3. Low parallelism
4. Latency between machines (around 0.3 ms)

Debug logs indicate queue capacity and population of queues in limits, so
probably that is not the cause.

For GC, I see GC/MarkSweepCompact and GC/Copy hovering at 500 ms. Am not
sure if this number is good or bad. Still figuring out...

Parallelism does not seem to be a problem as capacity is under 0.3-0.5 for
all bolts.

Do you know of any other reasons based on experience?

Thanks for the time

Thanks
Kashyap
On Jul 19, 2015 02:29, "Nathan Leung" <nc...@gmail.com> wrote:

> Generally, binary search combined with observation of the system (whether
> it meets throughput/latency targets) is a good approach.
> On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>
>> Nathan,
>> The bolts are extending BaseBasicBolt and also, the in the spout am
>> explicitly emitting a msgId hence tuples should be tagged and anchored.
>> What I see is this -
>> 1. The logic exection in the bolt takes not more than 1 ms (start of
>> execute() and end of execute())
>> 2. The time is being spent *between* the bolts
>> 3. The thread dumps all show LMAX disruptor at -
>> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum
>> CPU time is being spent.
>>
>> Is there a pattern with which the buffer sizes need to be tuned? :(
>>
>> Thanks
>> Kashyap
>>
>> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <an...@gmail.com>
>> wrote:
>>
>>> Thanks for the clarification regarding Task ID's Nathan, I was under
>>> that false impression as the site docs are a bit misleading. Thanks for
>>> pointing that out!
>>>
>>> Regards.
>>>
>>> Kindly yours,
>>>
>>> Andrew Grammenos
>>>
>>> -- PGP PKey --
>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>
>>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> If your tuples are reliable (spout emit with message id) and anchored
>>>> (emit from bolt anchored to input tuple), then you have to answer that
>>>> yourself. If not, then your output queue size is not constrained by the
>>>> framework and you may still have high latency.
>>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Nathan,
>>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>>> with throughput.
>>>>>
>>>>> Thank you!
>>>>> Kashyap
>>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>
>>>>>> If your tuples are anchored max spout pending indirectly affects how
>>>>>> many tuples are generated ;).
>>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>>> when tuples are getting generated in the topology and not really
>>>>>>> controllable via Max Spout Pending?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Kashyap
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Also I would argue that this is not important unless your
>>>>>>>> application is especially latency sensitive or your queue is so long that
>>>>>>>> it is causing in flight tuples to timeout.
>>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>>> this in more detail.
>>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Nathan,
>>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested.
>>>>>>>>>> I am inclined to believe that the no. of messages in the topology at that
>>>>>>>>>> point in time is affecting the "latency".
>>>>>>>>>>
>>>>>>>>>> Am trying to now figure out how should the topology be structured
>>>>>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>>>>>
>>>>>>>>>> Topology is structured as follows -
>>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>>
>>>>>>>>>> What I observed is as follows -
>>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>>
>>>>>>>>>> Question - How should the use case be handled where in the no. of
>>>>>>>>>> tuples in the topology could increase exponentially..,
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Kashyap
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>>
>>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>>
>>>>>>>>>>> Thank you again,
>>>>>>>>>>> Nick
>>>>>>>>>>>
>>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>>> have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>>>> task's id.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I
>>>>>>>>>>>>>> am working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>
>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism.
>>>>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker).
>>>>>>>>>>>>>>>>> All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>>>>>> latency on same machine or across machines ranges
>>>>>>>>>>>>>>>>>> between 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 each) -> C
>>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 10 numbers) ->
>>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a
>>>>>>>>>>>>>>>>>> result, I estimated that the end to end processing for 1000 takes not more
>>>>>>>>>>>>>>>>>> than 50 msec including any latencies.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that each bolt and
>>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024
>>>>>>>>>>>>>>>>>> MB and there are 20 workers overall.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues
>>>>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken between spout/bolt
>>>>>>>>>>>>>>>>>> and another bolt.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>
>>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
Generally, binary search combined with observation of the system (whether
it meets throughput/latency targets) is a good approach.
On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Nathan,
> The bolts are extending BaseBasicBolt and also, the in the spout am
> explicitly emitting a msgId hence tuples should be tagged and anchored.
> What I see is this -
> 1. The logic exection in the bolt takes not more than 1 ms (start of
> execute() and end of execute())
> 2. The time is being spent *between* the bolts
> 3. The thread dumps all show LMAX disruptor at -
> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum CPU
> time is being spent.
>
> Is there a pattern with which the buffer sizes need to be tuned? :(
>
> Thanks
> Kashyap
>
> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <an...@gmail.com>
> wrote:
>
>> Thanks for the clarification regarding Task ID's Nathan, I was under that
>> false impression as the site docs are a bit misleading. Thanks for pointing
>> that out!
>>
>> Regards.
>>
>> Kindly yours,
>>
>> Andrew Grammenos
>>
>> -- PGP PKey --
>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>
>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> If your tuples are reliable (spout emit with message id) and anchored
>>> (emit from bolt anchored to input tuple), then you have to answer that
>>> yourself. If not, then your output queue size is not constrained by the
>>> framework and you may still have high latency.
>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>> wrote:
>>>
>>>> Nathan,
>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>> with throughput.
>>>>
>>>> Thank you!
>>>> Kashyap
>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>
>>>>> If your tuples are anchored max spout pending indirectly affects how
>>>>> many tuples are generated ;).
>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>> when tuples are getting generated in the topology and not really
>>>>>> controllable via Max Spout Pending?
>>>>>>
>>>>>> Thanks
>>>>>> Kashyap
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Also I would argue that this is not important unless your
>>>>>>> application is especially latency sensitive or your queue is so long that
>>>>>>> it is causing in flight tuples to timeout.
>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>> this in more detail.
>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Nathan,
>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I
>>>>>>>>> am inclined to believe that the no. of messages in the topology at that
>>>>>>>>> point in time is affecting the "latency".
>>>>>>>>>
>>>>>>>>> Am trying to now figure out how should the topology be structured
>>>>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>>>>
>>>>>>>>> Topology is structured as follows -
>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>
>>>>>>>>> What I observed is as follows -
>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>
>>>>>>>>> Question - How should the use case be handled where in the no. of
>>>>>>>>> tuples in the topology could increase exponentially..,
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Kashyap
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>
>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>
>>>>>>>>>> Thank you again,
>>>>>>>>>> Nick
>>>>>>>>>>
>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>> have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>>> task's id.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>
>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>
>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>
>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I
>>>>>>>>>>>>> am working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism.
>>>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker).
>>>>>>>>>>>>>>>> All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>>>>> latency on same machine or across machines ranges between
>>>>>>>>>>>>>>>>> 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 each) -> C
>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 10 numbers) ->
>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result,
>>>>>>>>>>>>>>>>> I estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that each bolt and
>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues
>>>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken between spout/bolt
>>>>>>>>>>>>>>>>> and another bolt.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>
>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Nathan,
The bolts are extending BaseBasicBolt and also, the in the spout am
explicitly emitting a msgId hence tuples should be tagged and anchored.
What I see is this -
1. The logic exection in the bolt takes not more than 1 ms (start of
execute() and end of execute())
2. The time is being spent *between* the bolts
3. The thread dumps all show LMAX disruptor at -
com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum CPU
time is being spent.

Is there a pattern with which the buffer sizes need to be tuned? :(

Thanks
Kashyap

On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <an...@gmail.com>
wrote:

> Thanks for the clarification regarding Task ID's Nathan, I was under that
> false impression as the site docs are a bit misleading. Thanks for pointing
> that out!
>
> Regards.
>
> Kindly yours,
>
> Andrew Grammenos
>
> -- PGP PKey --
> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>
> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com> wrote:
>
>> If your tuples are reliable (spout emit with message id) and anchored
>> (emit from bolt anchored to input tuple), then you have to answer that
>> yourself. If not, then your output queue size is not constrained by the
>> framework and you may still have high latency.
>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>>
>>> Nathan,
>>> My max spout pending is set to 1. Now is my problem with latency or with
>>> throughput.
>>>
>>> Thank you!
>>> Kashyap
>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>
>>>> If your tuples are anchored max spout pending indirectly affects how
>>>> many tuples are generated ;).
>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Nathan. One question though - Are there any best practices when
>>>>> tuples are getting generated in the topology and not really controllable
>>>>> via Max Spout Pending?
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Also I would argue that this is not important unless your application
>>>>>> is especially latency sensitive or your queue is so long that it is causing
>>>>>> in flight tuples to timeout.
>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>>
>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>> this in more detail.
>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nathan,
>>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I
>>>>>>>> am inclined to believe that the no. of messages in the topology at that
>>>>>>>> point in time is affecting the "latency".
>>>>>>>>
>>>>>>>> Am trying to now figure out how should the topology be structured
>>>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>>>
>>>>>>>> Topology is structured as follows -
>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>>> if all the 100 messages are processed)
>>>>>>>>
>>>>>>>> What I observed is as follows -
>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and the
>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>
>>>>>>>> Question - How should the use case be handled where in the no. of
>>>>>>>> tuples in the topology could increase exponentially..,
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Kashyap
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>
>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>
>>>>>>>>> Thank you again,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>> have to have a specific task id and use "direct streams" which is error
>>>>>>>>>>> prone, probably increase latency and might introduce redundancy problems as
>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple will have
>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>>> task's id.
>>>>>>>>>>>
>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>
>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>
>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>
>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>
>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism.
>>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker).
>>>>>>>>>>>>>>> All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>>>> latency on same machine or across machines ranges between
>>>>>>>>>>>>>>>> 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C (bolt)
>>>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) -> D
>>>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result,
>>>>>>>>>>>>>>>> I estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that each bolt and
>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues
>>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken between spout/bolt
>>>>>>>>>>>>>>>> and another bolt.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>
>

Re: Realtime computations using storm - questions on performance

Posted by Andrew Xor <an...@gmail.com>.
Thanks for the clarification regarding Task ID's Nathan, I was under that
false impression as the site docs are a bit misleading. Thanks for pointing
that out!

Regards.

Kindly yours,

Andrew Grammenos

-- PGP PKey --
​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt

On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <nc...@gmail.com> wrote:

> If your tuples are reliable (spout emit with message id) and anchored
> (emit from bolt anchored to input tuple), then you have to answer that
> yourself. If not, then your output queue size is not constrained by the
> framework and you may still have high latency.
> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>
>> Nathan,
>> My max spout pending is set to 1. Now is my problem with latency or with
>> throughput.
>>
>> Thank you!
>> Kashyap
>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>
>>> If your tuples are anchored max spout pending indirectly affects how
>>> many tuples are generated ;).
>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Nathan. One question though - Are there any best practices when
>>>> tuples are getting generated in the topology and not really controllable
>>>> via Max Spout Pending?
>>>>
>>>> Thanks
>>>> Kashyap
>>>>
>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Also I would argue that this is not important unless your application
>>>>> is especially latency sensitive or your queue is so long that it is causing
>>>>> in flight tuples to timeout.
>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>>
>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>> it's too low it may affect throughput.  I've posted to the group about this
>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>> this in more detail.
>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Nathan,
>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I
>>>>>>> am inclined to believe that the no. of messages in the topology at that
>>>>>>> point in time is affecting the "latency".
>>>>>>>
>>>>>>> Am trying to now figure out how should the topology be structured
>>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>>
>>>>>>> Topology is structured as follows -
>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>>> if all the 100 messages are processed)
>>>>>>>
>>>>>>> What I observed is as follows -
>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples
>>>>>>> per message (100X100=10000) there are 10000 tuples emitted and the time
>>>>>>> taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>
>>>>>>> Question - How should the use case be handled where in the no. of
>>>>>>> tuples in the topology could increase exponentially..,
>>>>>>>
>>>>>>> Thanks
>>>>>>> Kashyap
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you all for the valuable info.
>>>>>>>>
>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>> therefore I have to go along with it.
>>>>>>>>
>>>>>>>> Thank you again,
>>>>>>>> Nick
>>>>>>>>
>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> Storm task ids don't change:
>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you have
>>>>>>>>>> to have a specific task id and use "direct streams" which is error prone,
>>>>>>>>>> probably increase latency and might introduce redundancy problems as the
>>>>>>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>>>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>>> task's id.
>>>>>>>>>>
>>>>>>>>>> Hope this helps.
>>>>>>>>>>
>>>>>>>>>> Kindly yours,
>>>>>>>>>>
>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>
>>>>>>>>>> -- PGP PKey --
>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello again,
>>>>>>>>>>>
>>>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Nick
>>>>>>>>>>>
>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism.
>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker).
>>>>>>>>>>>>>> All your ack traffic is going to a single task.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>>> latency on same machine or across machines ranges between 2
>>>>>>>>>>>>>>> - 250 ms. I am not able to figure out why. Network latency is
>>>>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit
>>>>>>>>>>>>>>> of one bolt/spout to getting the message in execute() of next bolt.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C (bolt)
>>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) -> D
>>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec
>>>>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each bolt and spout
>>>>>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the spout/bolt buffers
>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues
>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken between spout/bolt
>>>>>>>>>>>>>>> and another bolt.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
If your tuples are reliable (spout emit with message id) and anchored (emit
from bolt anchored to input tuple), then you have to answer that yourself.
If not, then your output queue size is not constrained by the framework and
you may still have high latency.
On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Nathan,
> My max spout pending is set to 1. Now is my problem with latency or with
> throughput.
>
> Thank you!
> Kashyap
> On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>
>> If your tuples are anchored max spout pending indirectly affects how many
>> tuples are generated ;).
>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>>
>>> Thanks Nathan. One question though - Are there any best practices when
>>> tuples are getting generated in the topology and not really controllable
>>> via Max Spout Pending?
>>>
>>> Thanks
>>> Kashyap
>>>
>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> Also I would argue that this is not important unless your application
>>>> is especially latency sensitive or your queue is so long that it is causing
>>>> in flight tuples to timeout.
>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>>
>>>>> Sorry for a brief response.. The number of tuples in flight absolutely
>>>>> affects your max latency.  You need to tune your topology max spout
>>>>> pending.  Lower value will reduce your end to end latency, but if it's too
>>>>> low it may affect throughput.  I've posted to the group about this before;
>>>>> if you do a search you may find some posts where I've discussed this in
>>>>> more detail.
>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Nathan,
>>>>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>>>>> inclined to believe that the no. of messages in the topology at that point
>>>>>> in time is affecting the "latency".
>>>>>>
>>>>>> Am trying to now figure out how should the topology be structured
>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>
>>>>>> Topology is structured as follows -
>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>>> if all the 100 messages are processed)
>>>>>>
>>>>>> What I observed is as follows -
>>>>>> 1. The time taken for an end to end processing of the message
>>>>>> (Sending the message to Storm cluster and till the time the aggregation is
>>>>>> complete) is directly dependent on the volume of messages that is entering
>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples
>>>>>> per message (100X100=10000) there are 10000 tuples emitted and the time
>>>>>> taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>>>>> per message (100X1000=100000) **there are 100000 tuples emitted and
>>>>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>
>>>>>> Question - How should the use case be handled where in the no. of
>>>>>> tuples in the topology could increase exponentially..,
>>>>>>
>>>>>> Thanks
>>>>>> Kashyap
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you all for the valuable info.
>>>>>>>
>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>> therefore I have to go along with it.
>>>>>>>
>>>>>>> Thank you again,
>>>>>>> Nick
>>>>>>>
>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> Storm task ids don't change:
>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Direct grouping as it is shown in storm docs, means that you have
>>>>>>>>> to have a specific task id and use "direct streams" which is error prone,
>>>>>>>>> probably increase latency and might introduce redundancy problems as the
>>>>>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>>> task's id.
>>>>>>>>>
>>>>>>>>> Hope this helps.
>>>>>>>>>
>>>>>>>>> Kindly yours,
>>>>>>>>>
>>>>>>>>> Andrew Grammenos
>>>>>>>>>
>>>>>>>>> -- PGP PKey --
>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello again,
>>>>>>>>>>
>>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>>> architecture of Storm?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Nick
>>>>>>>>>>
>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>
>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of latency
>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>> latency on same machine or across machines ranges between 2
>>>>>>>>>>>>>> - 250 ms. I am not able to figure out why. Network latency is
>>>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit of
>>>>>>>>>>>>>> one bolt/spout to getting the message in execute() of next bolt.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C (bolt)
>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) -> D
>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec
>>>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each bolt and spout
>>>>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am not able to figure out why it takes so much time between
>>>>>>>>>>>>>> a spout/bolt to next bolt. I understand that the spout/bolt buffers the
>>>>>>>>>>>>>> data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>>>>> any steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>>>>>>>> another bolt.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>
>>>>>>
>>>>>>
>>>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Nathan,
My max spout pending is set to 1. Now is my problem with latency or with
throughput.

Thank you!
Kashyap
On Jul 16, 2015 5:46 PM, "Nathan Leung" <nc...@gmail.com> wrote:

> If your tuples are anchored max spout pending indirectly affects how many
> tuples are generated ;).
> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>
>> Thanks Nathan. One question though - Are there any best practices when
>> tuples are getting generated in the topology and not really controllable
>> via Max Spout Pending?
>>
>> Thanks
>> Kashyap
>>
>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> Also I would argue that this is not important unless your application is
>>> especially latency sensitive or your queue is so long that it is causing in
>>> flight tuples to timeout.
>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>>
>>>> Sorry for a brief response.. The number of tuples in flight absolutely
>>>> affects your max latency.  You need to tune your topology max spout
>>>> pending.  Lower value will reduce your end to end latency, but if it's too
>>>> low it may affect throughput.  I've posted to the group about this before;
>>>> if you do a search you may find some posts where I've discussed this in
>>>> more detail.
>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Nathan,
>>>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>>>> inclined to believe that the no. of messages in the topology at that point
>>>>> in time is affecting the "latency".
>>>>>
>>>>> Am trying to now figure out how should the topology be structured when
>>>>> the no. of transient tupples in the topology is very high.
>>>>>
>>>>> Topology is structured as follows -
>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>>> if all the 100 messages are processed)
>>>>>
>>>>> What I observed is as follows -
>>>>> 1. The time taken for an end to end processing of the message (Sending
>>>>> the message to Storm cluster and till the time the aggregation is complete)
>>>>> is directly dependent on the volume of messages that is entering into storm
>>>>> and also the no. of emits done by the spout A.
>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples
>>>>> per message (100X100=10000) there are 10000 tuples emitted and the time
>>>>> taken to aggregate the 100 is 15 ms to 100 ms*
>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>>>> per message (100X1000=100000) **there are 100000 tuples emitted and
>>>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>>> 2.Strange thing is - the more parallelism i add, the times are so much
>>>>> more bad. Am trying to figure out if the memory per worker is a constraint,
>>>>> but this is the firs time am seeing this.
>>>>>
>>>>> Question - How should the use case be handled where in the no. of
>>>>> tuples in the topology could increase exponentially..,
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>>
>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>> nick.katsip@gmail.com> wrote:
>>>>>
>>>>>> Thank you all for the valuable info.
>>>>>>
>>>>>> Unfortunately, I have to use it for my (research) prototype therefore
>>>>>> I have to go along with it.
>>>>>>
>>>>>> Thank you again,
>>>>>> Nick
>>>>>>
>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> Storm task ids don't change:
>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>
>>>>>>>> Direct grouping as it is shown in storm docs, means that you have
>>>>>>>> to have a specific task id and use "direct streams" which is error prone,
>>>>>>>> probably increase latency and might introduce redundancy problems as the
>>>>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>>> task's id.
>>>>>>>>
>>>>>>>> Hope this helps.
>>>>>>>>
>>>>>>>> Kindly yours,
>>>>>>>>
>>>>>>>> Andrew Grammenos
>>>>>>>>
>>>>>>>> -- PGP PKey --
>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello again,
>>>>>>>>>
>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>>> architecture of Storm?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all,
>>>>>>>>>>>
>>>>>>>>>>> I have two questions:
>>>>>>>>>>>
>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing
>>>>>>>>>>> and I have a problem getting the exact milliseconds of latency (mainly
>>>>>>>>>>> because of clock drifting).
>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Nick
>>>>>>>>>>>
>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>>
>>>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>>>
>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>> latency on same machine or across machines ranges between 2 -
>>>>>>>>>>>>> 250 ms. I am not able to figure out why. Network latency is
>>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit of
>>>>>>>>>>>>> one bolt/spout to getting the message in execute() of next bolt.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C (bolt)
>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) -> D
>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec
>>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each bolt and spout
>>>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not able to figure out why it takes so much time between
>>>>>>>>>>>>> a spout/bolt to next bolt. I understand that the spout/bolt buffers the
>>>>>>>>>>>>> data into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>>>>>> there are 20 workers overall.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>>>> any steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>>>>>>> another bolt.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>> University of Pittsburgh, PhD candidate
>>>>>>
>>>>>
>>>>>
>>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
If your tuples are anchored max spout pending indirectly affects how many
tuples are generated ;).
On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Thanks Nathan. One question though - Are there any best practices when
> tuples are getting generated in the topology and not really controllable
> via Max Spout Pending?
>
> Thanks
> Kashyap
>
> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com> wrote:
>
>> Also I would argue that this is not important unless your application is
>> especially latency sensitive or your queue is so long that it is causing in
>> flight tuples to timeout.
>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>
>>> Sorry for a brief response.. The number of tuples in flight absolutely
>>> affects your max latency.  You need to tune your topology max spout
>>> pending.  Lower value will reduce your end to end latency, but if it's too
>>> low it may affect throughput.  I've posted to the group about this before;
>>> if you do a search you may find some posts where I've discussed this in
>>> more detail.
>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>> wrote:
>>>
>>>> Nathan,
>>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>>> inclined to believe that the no. of messages in the topology at that point
>>>> in time is affecting the "latency".
>>>>
>>>> Am trying to now figure out how should the topology be structured when
>>>> the no. of transient tupples in the topology is very high.
>>>>
>>>> Topology is structured as follows -
>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C
>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along
>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and confirms
>>>> if all the 100 messages are processed)
>>>>
>>>> What I observed is as follows -
>>>> 1. The time taken for an end to end processing of the message (Sending
>>>> the message to Storm cluster and till the time the aggregation is complete)
>>>> is directly dependent on the volume of messages that is entering into storm
>>>> and also the no. of emits done by the spout A.
>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples
>>>> per message (100X100=10000) there are 10000 tuples emitted and the time
>>>> taken to aggregate the 100 is 15 ms to 100 ms*
>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>>> per message (100X1000=100000) **there are 100000 tuples emitted and
>>>> the time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>>> 2.Strange thing is - the more parallelism i add, the times are so much
>>>> more bad. Am trying to figure out if the memory per worker is a constraint,
>>>> but this is the firs time am seeing this.
>>>>
>>>> Question - How should the use case be handled where in the no. of
>>>> tuples in the topology could increase exponentially..,
>>>>
>>>> Thanks
>>>> Kashyap
>>>>
>>>>
>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>> nick.katsip@gmail.com> wrote:
>>>>
>>>>> Thank you all for the valuable info.
>>>>>
>>>>> Unfortunately, I have to use it for my (research) prototype therefore
>>>>> I have to go along with it.
>>>>>
>>>>> Thank you again,
>>>>> Nick
>>>>>
>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> Storm task ids don't change:
>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>
>>>>>>> Direct grouping as it is shown in storm docs, means that you have to
>>>>>>> have a specific task id and use "direct streams" which is error prone,
>>>>>>> probably increase latency and might introduce redundancy problems as the
>>>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>>> task's id.
>>>>>>>
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> Kindly yours,
>>>>>>>
>>>>>>> Andrew Grammenos
>>>>>>>
>>>>>>> -- PGP PKey --
>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello again,
>>>>>>>>
>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>>> architecture of Storm?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Nick
>>>>>>>>
>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>> distribution which makes it easier to characterize its performance.  Local
>>>>>>>>> or shuffle grouping stays in process so generally it will be faster.
>>>>>>>>> However you have to be careful in certain cases to avoid task starvation
>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 spout task,
>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping depends
>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping depends on
>>>>>>>>> your key distribution, etc.
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I have two questions:
>>>>>>>>>>
>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing
>>>>>>>>>> and I have a problem getting the exact milliseconds of latency (mainly
>>>>>>>>>> because of clock drifting).
>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Nick
>>>>>>>>>>
>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>>
>>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>>
>>>>>>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>>>>>>> network transfers.
>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I
>>>>>>>>>>>> mean the time between an emit of one bolt/spout to getting the message in
>>>>>>>>>>>> execute() of next bolt.
>>>>>>>>>>>>
>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C (bolt)
>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 numbers) -> D
>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one message] -> E
>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>>> processed)
>>>>>>>>>>>>
>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>
>>>>>>>>>>>> *Observations*
>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to
>>>>>>>>>>>> 3 seconds. My estimate was under 50 msec given that each bolt and spout
>>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>>
>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>>>>> there are 20 workers overall.
>>>>>>>>>>>>
>>>>>>>>>>>> *Test*
>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>
>>>>>>>>>>>> *Config values*
>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>
>>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>>> any steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>>>>>> another bolt.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Nikolaos Romanos Katsipoulakis,
>>>>> University of Pittsburgh, PhD candidate
>>>>>
>>>>
>>>>
>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Thanks Nathan. One question though - Are there any best practices when
tuples are getting generated in the topology and not really controllable
via Max Spout Pending?

Thanks
Kashyap

On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <nc...@gmail.com> wrote:

> Also I would argue that this is not important unless your application is
> especially latency sensitive or your queue is so long that it is causing in
> flight tuples to timeout.
> On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>
>> Sorry for a brief response.. The number of tuples in flight absolutely
>> affects your max latency.  You need to tune your topology max spout
>> pending.  Lower value will reduce your end to end latency, but if it's too
>> low it may affect throughput.  I've posted to the group about this before;
>> if you do a search you may find some posts where I've discussed this in
>> more detail.
>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>>
>>> Nathan,
>>> Thanks. Have been running on a bare bones topology as suggested. I am
>>> inclined to believe that the no. of messages in the topology at that point
>>> in time is affecting the "latency".
>>>
>>> Am trying to now figure out how should the topology be structured when
>>> the no. of transient tupples in the topology is very high.
>>>
>>> Topology is structured as follows -
>>> Consumer (A java program sends a message to storm cluster) ->  A (Spout)
>>> ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C (bolt)
>>> [Passes along the message to next bolt) -> D (bolt) [Passes along the
>>> message to next bolt] -> E (bolt) [Aggregates all the data and confirms if
>>> all the 100 messages are processed)
>>>
>>> What I observed is as follows -
>>> 1. The time taken for an end to end processing of the message (Sending
>>> the message to Storm cluster and till the time the aggregation is complete)
>>> is directly dependent on the volume of messages that is entering into storm
>>> and also the no. of emits done by the spout A.
>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples per
>>> message (100X100=10000) there are 10000 tuples emitted and the time taken
>>> to aggregate the 100 is 15 ms to 100 ms*
>>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples
>>> per message (100X1000=100000) **there are 100000 tuples emitted and the
>>> time taken to aggregate the 100 is 4 seconds to 10 seconds*
>>> 2.Strange thing is - the more parallelism i add, the times are so much
>>> more bad. Am trying to figure out if the memory per worker is a constraint,
>>> but this is the firs time am seeing this.
>>>
>>> Question - How should the use case be handled where in the no. of tuples
>>> in the topology could increase exponentially..,
>>>
>>> Thanks
>>> Kashyap
>>>
>>>
>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>> nick.katsip@gmail.com> wrote:
>>>
>>>> Thank you all for the valuable info.
>>>>
>>>> Unfortunately, I have to use it for my (research) prototype therefore I
>>>> have to go along with it.
>>>>
>>>> Thank you again,
>>>> Nick
>>>>
>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> Storm task ids don't change:
>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>
>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>
>>>>>> Direct grouping as it is shown in storm docs, means that you have to
>>>>>> have a specific task id and use "direct streams" which is error prone,
>>>>>> probably increase latency and might introduce redundancy problems as the
>>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>>> task's id.
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>> Kindly yours,
>>>>>>
>>>>>> Andrew Grammenos
>>>>>>
>>>>>> -- PGP PKey --
>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>
>>>>>>> Hello again,
>>>>>>>
>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>>> architecture of Storm?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Nick
>>>>>>>
>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> I would not recommend direct grouping unless you have a good reason
>>>>>>>> for it.  Shuffle grouping is essentially random with even distribution
>>>>>>>> which makes it easier to characterize its performance.  Local or shuffle
>>>>>>>> grouping stays in process so generally it will be faster.  However you have
>>>>>>>> to be careful in certain cases to avoid task starvation (e.g. you have
>>>>>>>> kafka spout with 1 partition on the topic and 1 spout task, feeding 10 bolt
>>>>>>>> "A" tasks in 10 worker processes). Direct grouping depends on your code
>>>>>>>> (i.e. you can create hotspots), fields grouping depends on your key
>>>>>>>> distribution, etc.
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> I have two questions:
>>>>>>>>>
>>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing
>>>>>>>>> and I have a problem getting the exact milliseconds of latency (mainly
>>>>>>>>> because of clock drifting).
>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>>
>>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>>
>>>>>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>>>>>> network transfers.
>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>> kashyap.m@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I
>>>>>>>>>>> mean the time between an emit of one bolt/spout to getting the message in
>>>>>>>>>>> execute() of next bolt.
>>>>>>>>>>>
>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>>>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>>> processed)
>>>>>>>>>>>
>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>
>>>>>>>>>>> *Observations*
>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to
>>>>>>>>>>> 3 seconds. My estimate was under 50 msec given that each bolt and spout
>>>>>>>>>>> take under 3 msec to execute including any latencies.
>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit
>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>
>>>>>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>>
>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>>>> there are 20 workers overall.
>>>>>>>>>>>
>>>>>>>>>>> *Test*
>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>
>>>>>>>>>>> *Config values*
>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>
>>>>>>>>>>> Please let me know if you have encountered similar issues and
>>>>>>>>>>> any steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>>>>> another bolt.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Kashyap
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nikolaos Romanos Katsipoulakis,
>>>> University of Pittsburgh, PhD candidate
>>>>
>>>
>>>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
Also I would argue that this is not important unless your application is
especially latency sensitive or your queue is so long that it is causing in
flight tuples to timeout.
On Jul 16, 2015 6:05 PM, "Nathan Leung" <nc...@gmail.com> wrote:

> Sorry for a brief response.. The number of tuples in flight absolutely
> affects your max latency.  You need to tune your topology max spout
> pending.  Lower value will reduce your end to end latency, but if it's too
> low it may affect throughput.  I've posted to the group about this before;
> if you do a search you may find some posts where I've discussed this in
> more detail.
> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>
>> Nathan,
>> Thanks. Have been running on a bare bones topology as suggested. I am
>> inclined to believe that the no. of messages in the topology at that point
>> in time is affecting the "latency".
>>
>> Am trying to now figure out how should the topology be structured when
>> the no. of transient tupples in the topology is very high.
>>
>> Topology is structured as follows -
>> Consumer (A java program sends a message to storm cluster) ->  A (Spout)
>> ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C (bolt)
>> [Passes along the message to next bolt) -> D (bolt) [Passes along the
>> message to next bolt] -> E (bolt) [Aggregates all the data and confirms if
>> all the 100 messages are processed)
>>
>> What I observed is as follows -
>> 1. The time taken for an end to end processing of the message (Sending
>> the message to Storm cluster and till the time the aggregation is complete)
>> is directly dependent on the volume of messages that is entering into storm
>> and also the no. of emits done by the spout A.
>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples per
>> message (100X100=10000) there are 10000 tuples emitted and the time taken
>> to aggregate the 100 is 15 ms to 100 ms*
>> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples per
>> message (100X1000=100000) **there are 100000 tuples emitted and the time
>> taken to aggregate the 100 is 4 seconds to 10 seconds*
>> 2.Strange thing is - the more parallelism i add, the times are so much
>> more bad. Am trying to figure out if the memory per worker is a constraint,
>> but this is the firs time am seeing this.
>>
>> Question - How should the use case be handled where in the no. of tuples
>> in the topology could increase exponentially..,
>>
>> Thanks
>> Kashyap
>>
>>
>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>> nick.katsip@gmail.com> wrote:
>>
>>> Thank you all for the valuable info.
>>>
>>> Unfortunately, I have to use it for my (research) prototype therefore I
>>> have to go along with it.
>>>
>>> Thank you again,
>>> Nick
>>>
>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> Storm task ids don't change:
>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>
>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>> andreas.grammenos@gmail.com> wrote:
>>>>
>>>>> Direct grouping as it is shown in storm docs, means that you have to
>>>>> have a specific task id and use "direct streams" which is error prone,
>>>>> probably increase latency and might introduce redundancy problems as the
>>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>>> task's id.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Kindly yours,
>>>>>
>>>>> Andrew Grammenos
>>>>>
>>>>> -- PGP PKey --
>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>
>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>> nick.katsip@gmail.com> wrote:
>>>>>
>>>>>> Hello again,
>>>>>>
>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>>> general control the data flow. Can you please explain to me why you would
>>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>>> architecture of Storm?
>>>>>>
>>>>>> Thanks,
>>>>>> Nick
>>>>>>
>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> I would not recommend direct grouping unless you have a good reason
>>>>>>> for it.  Shuffle grouping is essentially random with even distribution
>>>>>>> which makes it easier to characterize its performance.  Local or shuffle
>>>>>>> grouping stays in process so generally it will be faster.  However you have
>>>>>>> to be careful in certain cases to avoid task starvation (e.g. you have
>>>>>>> kafka spout with 1 partition on the topic and 1 spout task, feeding 10 bolt
>>>>>>> "A" tasks in 10 worker processes). Direct grouping depends on your code
>>>>>>> (i.e. you can create hotspots), fields grouping depends on your key
>>>>>>> distribution, etc.
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I have two questions:
>>>>>>>>
>>>>>>>> 1) How do you exactly measure latency? I am doing the same thing
>>>>>>>> and I have a problem getting the exact milliseconds of latency (mainly
>>>>>>>> because of clock drifting).
>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Nick
>>>>>>>>
>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> Two things. Your math may be off depending on parallelism. One
>>>>>>>>> emit from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>>
>>>>>>>>> Second, try the default number of ackers (one per worker). All
>>>>>>>>> your ack traffic is going to a single task.
>>>>>>>>>
>>>>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>>>>> network transfers.
>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I
>>>>>>>>>> mean the time between an emit of one bolt/spout to getting the message in
>>>>>>>>>> execute() of next bolt.
>>>>>>>>>>
>>>>>>>>>> I have a topology like the below -
>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>>> processed)
>>>>>>>>>>
>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>>> msec including any latencies.
>>>>>>>>>>
>>>>>>>>>> *Observations*
>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>>>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>>>>>>> under 3 msec to execute including any latencies.
>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit from
>>>>>>>>>> a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>
>>>>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>>
>>>>>>>>>> *Infrastructure*
>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>>> there are 20 workers overall.
>>>>>>>>>>
>>>>>>>>>> *Test*
>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 messages
>>>>>>>>>> are sent to spout in a span of 5 seconds.
>>>>>>>>>>
>>>>>>>>>> *Config values*
>>>>>>>>>> Config config = new Config();
>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>
>>>>>>>>>> Please let me know if you have encountered similar issues and any
>>>>>>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>>>> another bolt.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Kashyap
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>> University of Pittsburgh, PhD candidate
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Nikolaos Romanos Katsipoulakis,
>>> University of Pittsburgh, PhD candidate
>>>
>>
>>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
Sorry for a brief response.. The number of tuples in flight absolutely
affects your max latency.  You need to tune your topology max spout
pending.  Lower value will reduce your end to end latency, but if it's too
low it may affect throughput.  I've posted to the group about this before;
if you do a search you may find some posts where I've discussed this in
more detail.
On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Nathan,
> Thanks. Have been running on a bare bones topology as suggested. I am
> inclined to believe that the no. of messages in the topology at that point
> in time is affecting the "latency".
>
> Am trying to now figure out how should the topology be structured when the
> no. of transient tupples in the topology is very high.
>
> Topology is structured as follows -
> Consumer (A java program sends a message to storm cluster) ->  A (Spout)
> ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C (bolt)
> [Passes along the message to next bolt) -> D (bolt) [Passes along the
> message to next bolt] -> E (bolt) [Aggregates all the data and confirms if
> all the 100 messages are processed)
>
> What I observed is as follows -
> 1. The time taken for an end to end processing of the message (Sending the
> message to Storm cluster and till the time the aggregation is complete) is
> directly dependent on the volume of messages that is entering into storm
> and also the no. of emits done by the spout A.
> *Test 1: 100 sequential messages to storm with B emitting 100 tuples per
> message (100X100=10000) there are 10000 tuples emitted and the time taken
> to aggregate the 100 is 15 ms to 100 ms*
> *Test 2: 100 sequential messages to storm with B emitting 1000 tuples per
> message (100X1000=100000) **there are 100000 tuples emitted and the time
> taken to aggregate the 100 is 4 seconds to 10 seconds*
> 2.Strange thing is - the more parallelism i add, the times are so much
> more bad. Am trying to figure out if the memory per worker is a constraint,
> but this is the firs time am seeing this.
>
> Question - How should the use case be handled where in the no. of tuples
> in the topology could increase exponentially..,
>
> Thanks
> Kashyap
>
>
> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
> nick.katsip@gmail.com> wrote:
>
>> Thank you all for the valuable info.
>>
>> Unfortunately, I have to use it for my (research) prototype therefore I
>> have to go along with it.
>>
>> Thank you again,
>> Nick
>>
>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>
>>> Storm task ids don't change:
>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>
>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <andreas.grammenos@gmail.com
>>> > wrote:
>>>
>>>> Direct grouping as it is shown in storm docs, means that you have to
>>>> have a specific task id and use "direct streams" which is error prone,
>>>> probably increase latency and might introduce redundancy problems as the
>>>> producer of tuple needs to know the id of the task the tuple will have to
>>>> go; so imagine a scenario where the receiving task fails for some reason
>>>> and the producer can't relay the tuples unless it received the re-spawned
>>>> task's id.
>>>>
>>>> Hope this helps.
>>>>
>>>> Kindly yours,
>>>>
>>>> Andrew Grammenos
>>>>
>>>> -- PGP PKey --
>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>
>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>> nick.katsip@gmail.com> wrote:
>>>>
>>>>> Hello again,
>>>>>
>>>>> Nathan, I am using direct-grouping because the application I am
>>>>> working on has to be able to send tuples directly to specific tasks. In
>>>>> general control the data flow. Can you please explain to me why you would
>>>>> not recommend direct grouping? Is there any particular reason in the
>>>>> architecture of Storm?
>>>>>
>>>>> Thanks,
>>>>> Nick
>>>>>
>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> I would not recommend direct grouping unless you have a good reason
>>>>>> for it.  Shuffle grouping is essentially random with even distribution
>>>>>> which makes it easier to characterize its performance.  Local or shuffle
>>>>>> grouping stays in process so generally it will be faster.  However you have
>>>>>> to be careful in certain cases to avoid task starvation (e.g. you have
>>>>>> kafka spout with 1 partition on the topic and 1 spout task, feeding 10 bolt
>>>>>> "A" tasks in 10 worker processes). Direct grouping depends on your code
>>>>>> (i.e. you can create hotspots), fields grouping depends on your key
>>>>>> distribution, etc.
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>> nick.katsip@gmail.com> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I have two questions:
>>>>>>>
>>>>>>> 1) How do you exactly measure latency? I am doing the same thing and
>>>>>>> I have a problem getting the exact milliseconds of latency (mainly because
>>>>>>> of clock drifting).
>>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Nick
>>>>>>>
>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> Two things. Your math may be off depending on parallelism. One emit
>>>>>>>> from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>>
>>>>>>>> Second, try the default number of ackers (one per worker). All your
>>>>>>>> ack traffic is going to a single task.
>>>>>>>>
>>>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>>>> network transfers.
>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I
>>>>>>>>> mean the time between an emit of one bolt/spout to getting the message in
>>>>>>>>> execute() of next bolt.
>>>>>>>>>
>>>>>>>>> I have a topology like the below -
>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>>> processed)
>>>>>>>>>
>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>>> msec including any latencies.
>>>>>>>>>
>>>>>>>>> *Observations*
>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>>>>>> under 3 msec to execute including any latencies.
>>>>>>>>> 2. I noticed that the most of the time is spent between Emit from
>>>>>>>>> a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>
>>>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>>>
>>>>>>>>> *Infrastructure*
>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>>> there are 20 workers overall.
>>>>>>>>>
>>>>>>>>> *Test*
>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 messages
>>>>>>>>> are sent to spout in a span of 5 seconds.
>>>>>>>>>
>>>>>>>>> *Config values*
>>>>>>>>> Config config = new Config();
>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>
>>>>>>>>> Please let me know if you have encountered similar issues and any
>>>>>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>>> another bolt.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Kashyap
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Nikolaos Romanos Katsipoulakis,
>>>>> University of Pittsburgh, PhD candidate
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Nikolaos Romanos Katsipoulakis,
>> University of Pittsburgh, PhD candidate
>>
>
>

Re: Realtime computations using storm - questions on performance

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
Nathan,
Thanks. Have been running on a bare bones topology as suggested. I am
inclined to believe that the no. of messages in the topology at that point
in time is affecting the "latency".

Am trying to now figure out how should the topology be structured when the
no. of transient tupples in the topology is very high.

Topology is structured as follows -
Consumer (A java program sends a message to storm cluster) ->  A (Spout)
->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> C (bolt)
[Passes along the message to next bolt) -> D (bolt) [Passes along the
message to next bolt] -> E (bolt) [Aggregates all the data and confirms if
all the 100 messages are processed)

What I observed is as follows -
1. The time taken for an end to end processing of the message (Sending the
message to Storm cluster and till the time the aggregation is complete) is
directly dependent on the volume of messages that is entering into storm
and also the no. of emits done by the spout A.
*Test 1: 100 sequential messages to storm with B emitting 100 tuples per
message (100X100=10000) there are 10000 tuples emitted and the time taken
to aggregate the 100 is 15 ms to 100 ms*
*Test 2: 100 sequential messages to storm with B emitting 1000 tuples per
message (100X1000=100000) **there are 100000 tuples emitted and the time
taken to aggregate the 100 is 4 seconds to 10 seconds*
2.Strange thing is - the more parallelism i add, the times are so much more
bad. Am trying to figure out if the memory per worker is a constraint, but
this is the firs time am seeing this.

Question - How should the use case be handled where in the no. of tuples in
the topology could increase exponentially..,

Thanks
Kashyap


On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
nick.katsip@gmail.com> wrote:

> Thank you all for the valuable info.
>
> Unfortunately, I have to use it for my (research) prototype therefore I
> have to go along with it.
>
> Thank you again,
> Nick
>
> 2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>
>> Storm task ids don't change:
>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>
>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <an...@gmail.com>
>> wrote:
>>
>>> Direct grouping as it is shown in storm docs, means that you have to
>>> have a specific task id and use "direct streams" which is error prone,
>>> probably increase latency and might introduce redundancy problems as the
>>> producer of tuple needs to know the id of the task the tuple will have to
>>> go; so imagine a scenario where the receiving task fails for some reason
>>> and the producer can't relay the tuples unless it received the re-spawned
>>> task's id.
>>>
>>> Hope this helps.
>>>
>>> Kindly yours,
>>>
>>> Andrew Grammenos
>>>
>>> -- PGP PKey --
>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>
>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>> nick.katsip@gmail.com> wrote:
>>>
>>>> Hello again,
>>>>
>>>> Nathan, I am using direct-grouping because the application I am working
>>>> on has to be able to send tuples directly to specific tasks. In general
>>>> control the data flow. Can you please explain to me why you would not
>>>> recommend direct grouping? Is there any particular reason in the
>>>> architecture of Storm?
>>>>
>>>> Thanks,
>>>> Nick
>>>>
>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> I would not recommend direct grouping unless you have a good reason
>>>>> for it.  Shuffle grouping is essentially random with even distribution
>>>>> which makes it easier to characterize its performance.  Local or shuffle
>>>>> grouping stays in process so generally it will be faster.  However you have
>>>>> to be careful in certain cases to avoid task starvation (e.g. you have
>>>>> kafka spout with 1 partition on the topic and 1 spout task, feeding 10 bolt
>>>>> "A" tasks in 10 worker processes). Direct grouping depends on your code
>>>>> (i.e. you can create hotspots), fields grouping depends on your key
>>>>> distribution, etc.
>>>>>
>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>> nick.katsip@gmail.com> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I have two questions:
>>>>>>
>>>>>> 1) How do you exactly measure latency? I am doing the same thing and
>>>>>> I have a problem getting the exact milliseconds of latency (mainly because
>>>>>> of clock drifting).
>>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>>
>>>>>> Thanks,
>>>>>> Nick
>>>>>>
>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> Two things. Your math may be off depending on parallelism. One emit
>>>>>>> from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>>
>>>>>>> Second, try the default number of ackers (one per worker). All your
>>>>>>> ack traffic is going to a single task.
>>>>>>>
>>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>>> network transfers.
>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I
>>>>>>>> mean the time between an emit of one bolt/spout to getting the message in
>>>>>>>> execute() of next bolt.
>>>>>>>>
>>>>>>>> I have a topology like the below -
>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>>> processed)
>>>>>>>>
>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>>> msec including any latencies.
>>>>>>>>
>>>>>>>> *Observations*
>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>>>>> under 3 msec to execute including any latencies.
>>>>>>>> 2. I noticed that the most of the time is spent between Emit from a
>>>>>>>> Spout/Bolt and execute() of the consuming bolt.
>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>
>>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>>
>>>>>>>> *Infrastructure*
>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and
>>>>>>>> there are 20 workers overall.
>>>>>>>>
>>>>>>>> *Test*
>>>>>>>> 1. The test was done with 25 messages to the spout => 25 messages
>>>>>>>> are sent to spout in a span of 5 seconds.
>>>>>>>>
>>>>>>>> *Config values*
>>>>>>>> Config config = new Config();
>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>
>>>>>>>> Please let me know if you have encountered similar issues and any
>>>>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>>> another bolt.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Kashyap
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>> University of Pittsburgh, PhD candidate
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nikolaos Romanos Katsipoulakis,
>>>> University of Pittsburgh, PhD candidate
>>>>
>>>
>>>
>>
>
>
> --
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>

Re: Realtime computations using storm - questions on performance

Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Thank you all for the valuable info.

Unfortunately, I have to use it for my (research) prototype therefore I
have to go along with it.

Thank you again,
Nick

2015-07-16 16:33 GMT-04:00 Nathan Leung <nc...@gmail.com>:

> Storm task ids don't change:
> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>
> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <an...@gmail.com>
> wrote:
>
>> Direct grouping as it is shown in storm docs, means that you have to have
>> a specific task id and use "direct streams" which is error prone, probably
>> increase latency and might introduce redundancy problems as the producer of
>> tuple needs to know the id of the task the tuple will have to go; so
>> imagine a scenario where the receiving task fails for some reason and the
>> producer can't relay the tuples unless it received the re-spawned task's id.
>>
>> Hope this helps.
>>
>> Kindly yours,
>>
>> Andrew Grammenos
>>
>> -- PGP PKey --
>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>
>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>> nick.katsip@gmail.com> wrote:
>>
>>> Hello again,
>>>
>>> Nathan, I am using direct-grouping because the application I am working
>>> on has to be able to send tuples directly to specific tasks. In general
>>> control the data flow. Can you please explain to me why you would not
>>> recommend direct grouping? Is there any particular reason in the
>>> architecture of Storm?
>>>
>>> Thanks,
>>> Nick
>>>
>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> I would not recommend direct grouping unless you have a good reason for
>>>> it.  Shuffle grouping is essentially random with even distribution which
>>>> makes it easier to characterize its performance.  Local or shuffle grouping
>>>> stays in process so generally it will be faster.  However you have to be
>>>> careful in certain cases to avoid task starvation (e.g. you have kafka
>>>> spout with 1 partition on the topic and 1 spout task, feeding 10 bolt "A"
>>>> tasks in 10 worker processes). Direct grouping depends on your code (i.e.
>>>> you can create hotspots), fields grouping depends on your key distribution,
>>>> etc.
>>>>
>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>> nick.katsip@gmail.com> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I have two questions:
>>>>>
>>>>> 1) How do you exactly measure latency? I am doing the same thing and I
>>>>> have a problem getting the exact milliseconds of latency (mainly because of
>>>>> clock drifting).
>>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>>
>>>>> Thanks,
>>>>> Nick
>>>>>
>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> Two things. Your math may be off depending on parallelism. One emit
>>>>>> from A becomes 100 emitted from C, and you are joining all of them.
>>>>>>
>>>>>> Second, try the default number of ackers (one per worker). All your
>>>>>> ack traffic is going to a single task.
>>>>>>
>>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>>> network transfers.
>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I mean
>>>>>>> the time between an emit of one bolt/spout to getting the message in
>>>>>>> execute() of next bolt.
>>>>>>>
>>>>>>> I have a topology like the below -
>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>>> processed)
>>>>>>>
>>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>>> msec including any latencies.
>>>>>>>
>>>>>>> *Observations*
>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>>>> under 3 msec to execute including any latencies.
>>>>>>> 2. I noticed that the most of the time is spent between Emit from a
>>>>>>> Spout/Bolt and execute() of the consuming bolt.
>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>
>>>>>>> I am not able to figure out why it takes so much time between a
>>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>>
>>>>>>> *Infrastructure*
>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there
>>>>>>> are 20 workers overall.
>>>>>>>
>>>>>>> *Test*
>>>>>>> 1. The test was done with 25 messages to the spout => 25 messages
>>>>>>> are sent to spout in a span of 5 seconds.
>>>>>>>
>>>>>>> *Config values*
>>>>>>> Config config = new Config();
>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>
>>>>>>> Please let me know if you have encountered similar issues and any
>>>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>>>> another bolt.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Kashyap
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Nikolaos Romanos Katsipoulakis,
>>>>> University of Pittsburgh, PhD candidate
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Nikolaos Romanos Katsipoulakis,
>>> University of Pittsburgh, PhD candidate
>>>
>>
>>
>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
Storm task ids don't change:
https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c

On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <an...@gmail.com>
wrote:

> Direct grouping as it is shown in storm docs, means that you have to have
> a specific task id and use "direct streams" which is error prone, probably
> increase latency and might introduce redundancy problems as the producer of
> tuple needs to know the id of the task the tuple will have to go; so
> imagine a scenario where the receiving task fails for some reason and the
> producer can't relay the tuples unless it received the re-spawned task's id.
>
> Hope this helps.
>
> Kindly yours,
>
> Andrew Grammenos
>
> -- PGP PKey --
> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>
> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
> nick.katsip@gmail.com> wrote:
>
>> Hello again,
>>
>> Nathan, I am using direct-grouping because the application I am working
>> on has to be able to send tuples directly to specific tasks. In general
>> control the data flow. Can you please explain to me why you would not
>> recommend direct grouping? Is there any particular reason in the
>> architecture of Storm?
>>
>> Thanks,
>> Nick
>>
>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>
>>> I would not recommend direct grouping unless you have a good reason for
>>> it.  Shuffle grouping is essentially random with even distribution which
>>> makes it easier to characterize its performance.  Local or shuffle grouping
>>> stays in process so generally it will be faster.  However you have to be
>>> careful in certain cases to avoid task starvation (e.g. you have kafka
>>> spout with 1 partition on the topic and 1 spout task, feeding 10 bolt "A"
>>> tasks in 10 worker processes). Direct grouping depends on your code (i.e.
>>> you can create hotspots), fields grouping depends on your key distribution,
>>> etc.
>>>
>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>> nick.katsip@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I have two questions:
>>>>
>>>> 1) How do you exactly measure latency? I am doing the same thing and I
>>>> have a problem getting the exact milliseconds of latency (mainly because of
>>>> clock drifting).
>>>> 2) (to Nathan) Is there a difference in speeds among different
>>>> groupings? For instance, is shuffle faster than direct grouping?
>>>>
>>>> Thanks,
>>>> Nick
>>>>
>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> Two things. Your math may be off depending on parallelism. One emit
>>>>> from A becomes 100 emitted from C, and you are joining all of them.
>>>>>
>>>>> Second, try the default number of ackers (one per worker). All your
>>>>> ack traffic is going to a single task.
>>>>>
>>>>> Also you can try local or shuffle grouping if possible to reduce
>>>>> network transfers.
>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> We are attempting a real-time distributed computing using storm and
>>>>>> the solution has only one problem - inter bolt latency on same
>>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>>> figure out why. Network latency is under 0.5 ms. By latency, I mean
>>>>>> the time between an emit of one bolt/spout to getting the message in
>>>>>> execute() of next bolt.
>>>>>>
>>>>>> I have a topology like the below -
>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>>> processed)
>>>>>>
>>>>>> Every bolt takes under 3 msec to complete and as a result, I
>>>>>> estimated that the end to end processing for 1000 takes not more than 50
>>>>>> msec including any latencies.
>>>>>>
>>>>>> *Observations*
>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>>> under 3 msec to execute including any latencies.
>>>>>> 2. I noticed that the most of the time is spent between Emit from a
>>>>>> Spout/Bolt and execute() of the consuming bolt.
>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>
>>>>>> I am not able to figure out why it takes so much time between a
>>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>>
>>>>>> *Infrastructure*
>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there
>>>>>> are 20 workers overall.
>>>>>>
>>>>>> *Test*
>>>>>> 1. The test was done with 25 messages to the spout => 25 messages are
>>>>>> sent to spout in a span of 5 seconds.
>>>>>>
>>>>>> *Config values*
>>>>>> Config config = new Config();
>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>
>>>>>> Please let me know if you have encountered similar issues and any
>>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>>> another bolt.
>>>>>>
>>>>>> Thanks
>>>>>> Kashyap
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nikolaos Romanos Katsipoulakis,
>>>> University of Pittsburgh, PhD candidate
>>>>
>>>
>>>
>>
>>
>> --
>> Nikolaos Romanos Katsipoulakis,
>> University of Pittsburgh, PhD candidate
>>
>
>

Re: Realtime computations using storm - questions on performance

Posted by Andrew Xor <an...@gmail.com>.
Direct grouping as it is shown in storm docs, means that you have to have a
specific task id and use "direct streams" which is error prone, probably
increase latency and might introduce redundancy problems as the producer of
tuple needs to know the id of the task the tuple will have to go; so
imagine a scenario where the receiving task fails for some reason and the
producer can't relay the tuples unless it received the re-spawned task's id.

Hope this helps.

Kindly yours,

Andrew Grammenos

-- PGP PKey --
​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt

On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
nick.katsip@gmail.com> wrote:

> Hello again,
>
> Nathan, I am using direct-grouping because the application I am working on
> has to be able to send tuples directly to specific tasks. In general
> control the data flow. Can you please explain to me why you would not
> recommend direct grouping? Is there any particular reason in the
> architecture of Storm?
>
> Thanks,
> Nick
>
> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>
>> I would not recommend direct grouping unless you have a good reason for
>> it.  Shuffle grouping is essentially random with even distribution which
>> makes it easier to characterize its performance.  Local or shuffle grouping
>> stays in process so generally it will be faster.  However you have to be
>> careful in certain cases to avoid task starvation (e.g. you have kafka
>> spout with 1 partition on the topic and 1 spout task, feeding 10 bolt "A"
>> tasks in 10 worker processes). Direct grouping depends on your code (i.e.
>> you can create hotspots), fields grouping depends on your key distribution,
>> etc.
>>
>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>> nick.katsip@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I have two questions:
>>>
>>> 1) How do you exactly measure latency? I am doing the same thing and I
>>> have a problem getting the exact milliseconds of latency (mainly because of
>>> clock drifting).
>>> 2) (to Nathan) Is there a difference in speeds among different
>>> groupings? For instance, is shuffle faster than direct grouping?
>>>
>>> Thanks,
>>> Nick
>>>
>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> Two things. Your math may be off depending on parallelism. One emit
>>>> from A becomes 100 emitted from C, and you are joining all of them.
>>>>
>>>> Second, try the default number of ackers (one per worker). All your ack
>>>> traffic is going to a single task.
>>>>
>>>> Also you can try local or shuffle grouping if possible to reduce
>>>> network transfers.
>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> We are attempting a real-time distributed computing using storm and
>>>>> the solution has only one problem - inter bolt latency on same
>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>> figure out why. Network latency is under 0.5 ms. By latency, I mean
>>>>> the time between an emit of one bolt/spout to getting the message in
>>>>> execute() of next bolt.
>>>>>
>>>>> I have a topology like the below -
>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>> processed)
>>>>>
>>>>> Every bolt takes under 3 msec to complete and as a result, I estimated
>>>>> that the end to end processing for 1000 takes not more than 50 msec
>>>>> including any latencies.
>>>>>
>>>>> *Observations*
>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>> under 3 msec to execute including any latencies.
>>>>> 2. I noticed that the most of the time is spent between Emit from a
>>>>> Spout/Bolt and execute() of the consuming bolt.
>>>>> 3. Network latency is under 0.5 msec.
>>>>>
>>>>> I am not able to figure out why it takes so much time between a
>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>
>>>>> *Infrastructure*
>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there
>>>>> are 20 workers overall.
>>>>>
>>>>> *Test*
>>>>> 1. The test was done with 25 messages to the spout => 25 messages are
>>>>> sent to spout in a span of 5 seconds.
>>>>>
>>>>> *Config values*
>>>>> Config config = new Config();
>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>
>>>>> Please let me know if you have encountered similar issues and any
>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>> another bolt.
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>
>>>
>>>
>>> --
>>> Nikolaos Romanos Katsipoulakis,
>>> University of Pittsburgh, PhD candidate
>>>
>>
>>
>
>
> --
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
If you know you need it, that's one thing.  If you're not sure, then you
probably don't need it.

Specifically, you need to be able to reason about the tasks in the system.
Consider the case where you change the parallelism of the bolt you are
emitting to.  Either you will have to be able to handle this dynamically in
your code, or you will have to change the code where you are doing the
emits.  Either way it's an extra change that you have to remember and
implement.  So it's not so much "because of storm" so much as general good
practices.

On Thu, Jul 16, 2015 at 4:24 PM, Nick R. Katsipoulakis <
nick.katsip@gmail.com> wrote:

> Hello again,
>
> Nathan, I am using direct-grouping because the application I am working on
> has to be able to send tuples directly to specific tasks. In general
> control the data flow. Can you please explain to me why you would not
> recommend direct grouping? Is there any particular reason in the
> architecture of Storm?
>
> Thanks,
> Nick
>
> 2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>
>> I would not recommend direct grouping unless you have a good reason for
>> it.  Shuffle grouping is essentially random with even distribution which
>> makes it easier to characterize its performance.  Local or shuffle grouping
>> stays in process so generally it will be faster.  However you have to be
>> careful in certain cases to avoid task starvation (e.g. you have kafka
>> spout with 1 partition on the topic and 1 spout task, feeding 10 bolt "A"
>> tasks in 10 worker processes). Direct grouping depends on your code (i.e.
>> you can create hotspots), fields grouping depends on your key distribution,
>> etc.
>>
>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>> nick.katsip@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I have two questions:
>>>
>>> 1) How do you exactly measure latency? I am doing the same thing and I
>>> have a problem getting the exact milliseconds of latency (mainly because of
>>> clock drifting).
>>> 2) (to Nathan) Is there a difference in speeds among different
>>> groupings? For instance, is shuffle faster than direct grouping?
>>>
>>> Thanks,
>>> Nick
>>>
>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> Two things. Your math may be off depending on parallelism. One emit
>>>> from A becomes 100 emitted from C, and you are joining all of them.
>>>>
>>>> Second, try the default number of ackers (one per worker). All your ack
>>>> traffic is going to a single task.
>>>>
>>>> Also you can try local or shuffle grouping if possible to reduce
>>>> network transfers.
>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> We are attempting a real-time distributed computing using storm and
>>>>> the solution has only one problem - inter bolt latency on same
>>>>> machine or across machines ranges between 2 - 250 ms. I am not able to
>>>>> figure out why. Network latency is under 0.5 ms. By latency, I mean
>>>>> the time between an emit of one bolt/spout to getting the message in
>>>>> execute() of next bolt.
>>>>>
>>>>> I have a topology like the below -
>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this
>>>>> number and divides this into 10 emits of 100 each) -> C (bolt) [Recieves
>>>>> these emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does
>>>>> some computation on the number and emits one message] -> E (bolt)
>>>>> [Aggregates all the data and confirms if all the 1000 messages are
>>>>> processed)
>>>>>
>>>>> Every bolt takes under 3 msec to complete and as a result, I estimated
>>>>> that the end to end processing for 1000 takes not more than 50 msec
>>>>> including any latencies.
>>>>>
>>>>> *Observations*
>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>>> under 3 msec to execute including any latencies.
>>>>> 2. I noticed that the most of the time is spent between Emit from a
>>>>> Spout/Bolt and execute() of the consuming bolt.
>>>>> 3. Network latency is under 0.5 msec.
>>>>>
>>>>> I am not able to figure out why it takes so much time between a
>>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>>> into a queue and then the subsequent bolt consumes from there.
>>>>>
>>>>> *Infrastructure*
>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there
>>>>> are 20 workers overall.
>>>>>
>>>>> *Test*
>>>>> 1. The test was done with 25 messages to the spout => 25 messages are
>>>>> sent to spout in a span of 5 seconds.
>>>>>
>>>>> *Config values*
>>>>> Config config = new Config();
>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>
>>>>> Please let me know if you have encountered similar issues and any
>>>>> steps you have taken to mitigate the time taken between spout/bolt and
>>>>> another bolt.
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>
>>>
>>>
>>> --
>>> Nikolaos Romanos Katsipoulakis,
>>> University of Pittsburgh, PhD candidate
>>>
>>
>>
>
>
> --
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>

Re: Realtime computations using storm - questions on performance

Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Hello again,

Nathan, I am using direct-grouping because the application I am working on
has to be able to send tuples directly to specific tasks. In general
control the data flow. Can you please explain to me why you would not
recommend direct grouping? Is there any particular reason in the
architecture of Storm?

Thanks,
Nick

2015-07-16 16:20 GMT-04:00 Nathan Leung <nc...@gmail.com>:

> I would not recommend direct grouping unless you have a good reason for
> it.  Shuffle grouping is essentially random with even distribution which
> makes it easier to characterize its performance.  Local or shuffle grouping
> stays in process so generally it will be faster.  However you have to be
> careful in certain cases to avoid task starvation (e.g. you have kafka
> spout with 1 partition on the topic and 1 spout task, feeding 10 bolt "A"
> tasks in 10 worker processes). Direct grouping depends on your code (i.e.
> you can create hotspots), fields grouping depends on your key distribution,
> etc.
>
> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
> nick.katsip@gmail.com> wrote:
>
>> Hello all,
>>
>> I have two questions:
>>
>> 1) How do you exactly measure latency? I am doing the same thing and I
>> have a problem getting the exact milliseconds of latency (mainly because of
>> clock drifting).
>> 2) (to Nathan) Is there a difference in speeds among different groupings?
>> For instance, is shuffle faster than direct grouping?
>>
>> Thanks,
>> Nick
>>
>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>>
>>> Two things. Your math may be off depending on parallelism. One emit from
>>> A becomes 100 emitted from C, and you are joining all of them.
>>>
>>> Second, try the default number of ackers (one per worker). All your ack
>>> traffic is going to a single task.
>>>
>>> Also you can try local or shuffle grouping if possible to reduce network
>>> transfers.
>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> We are attempting a real-time distributed computing using storm and
>>>> the solution has only one problem - inter bolt latency on same machine
>>>> or across machines ranges between 2 - 250 ms. I am not able to figure out
>>>> why. Network latency is under 0.5 ms. By latency, I mean the time
>>>> between an emit of one bolt/spout to getting the message in execute() of
>>>> next bolt.
>>>>
>>>> I have a topology like the below -
>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this number
>>>> and divides this into 10 emits of 100 each) -> C (bolt) [Recieves these
>>>> emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does some
>>>> computation on the number and emits one message] -> E (bolt) [Aggregates
>>>> all the data and confirms if all the 1000 messages are processed)
>>>>
>>>> Every bolt takes under 3 msec to complete and as a result, I estimated
>>>> that the end to end processing for 1000 takes not more than 50 msec
>>>> including any latencies.
>>>>
>>>> *Observations*
>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>>> under 3 msec to execute including any latencies.
>>>> 2. I noticed that the most of the time is spent between Emit from a
>>>> Spout/Bolt and execute() of the consuming bolt.
>>>> 3. Network latency is under 0.5 msec.
>>>>
>>>> I am not able to figure out why it takes so much time between a
>>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>>> into a queue and then the subsequent bolt consumes from there.
>>>>
>>>> *Infrastructure*
>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there
>>>> are 20 workers overall.
>>>>
>>>> *Test*
>>>> 1. The test was done with 25 messages to the spout => 25 messages are
>>>> sent to spout in a span of 5 seconds.
>>>>
>>>> *Config values*
>>>> Config config = new Config();
>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>
>>>> Please let me know if you have encountered similar issues and any steps
>>>> you have taken to mitigate the time taken between spout/bolt and another
>>>> bolt.
>>>>
>>>> Thanks
>>>> Kashyap
>>>>
>>>
>>
>>
>> --
>> Nikolaos Romanos Katsipoulakis,
>> University of Pittsburgh, PhD candidate
>>
>
>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
I would not recommend direct grouping unless you have a good reason for
it.  Shuffle grouping is essentially random with even distribution which
makes it easier to characterize its performance.  Local or shuffle grouping
stays in process so generally it will be faster.  However you have to be
careful in certain cases to avoid task starvation (e.g. you have kafka
spout with 1 partition on the topic and 1 spout task, feeding 10 bolt "A"
tasks in 10 worker processes). Direct grouping depends on your code (i.e.
you can create hotspots), fields grouping depends on your key distribution,
etc.

On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
nick.katsip@gmail.com> wrote:

> Hello all,
>
> I have two questions:
>
> 1) How do you exactly measure latency? I am doing the same thing and I
> have a problem getting the exact milliseconds of latency (mainly because of
> clock drifting).
> 2) (to Nathan) Is there a difference in speeds among different groupings?
> For instance, is shuffle faster than direct grouping?
>
> Thanks,
> Nick
>
> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>
>> Two things. Your math may be off depending on parallelism. One emit from
>> A becomes 100 emitted from C, and you are joining all of them.
>>
>> Second, try the default number of ackers (one per worker). All your ack
>> traffic is going to a single task.
>>
>> Also you can try local or shuffle grouping if possible to reduce network
>> transfers.
>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> We are attempting a real-time distributed computing using storm and the
>>> solution has only one problem - inter bolt latency on same machine or
>>> across machines ranges between 2 - 250 ms. I am not able to figure out why.
>>> Network latency is under 0.5 ms. By latency, I mean the time between an
>>> emit of one bolt/spout to getting the message in execute() of next bolt.
>>>
>>> I have a topology like the below -
>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this number
>>> and divides this into 10 emits of 100 each) -> C (bolt) [Recieves these
>>> emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does some
>>> computation on the number and emits one message] -> E (bolt) [Aggregates
>>> all the data and confirms if all the 1000 messages are processed)
>>>
>>> Every bolt takes under 3 msec to complete and as a result, I estimated
>>> that the end to end processing for 1000 takes not more than 50 msec
>>> including any latencies.
>>>
>>> *Observations*
>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>> under 3 msec to execute including any latencies.
>>> 2. I noticed that the most of the time is spent between Emit from a
>>> Spout/Bolt and execute() of the consuming bolt.
>>> 3. Network latency is under 0.5 msec.
>>>
>>> I am not able to figure out why it takes so much time between a
>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>> into a queue and then the subsequent bolt consumes from there.
>>>
>>> *Infrastructure*
>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there are
>>> 20 workers overall.
>>>
>>> *Test*
>>> 1. The test was done with 25 messages to the spout => 25 messages are
>>> sent to spout in a span of 5 seconds.
>>>
>>> *Config values*
>>> Config config = new Config();
>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>
>>> Please let me know if you have encountered similar issues and any steps
>>> you have taken to mitigate the time taken between spout/bolt and another
>>> bolt.
>>>
>>> Thanks
>>> Kashyap
>>>
>>
>
>
> --
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>

Re: Realtime computations using storm - questions on performance

Posted by Andrew Xor <an...@gmail.com>.
Hi,

 Latency is dependent mainly on two things 1) network transfers involved 2)
actual processing that is performed on each tuple. Certainly, one acker per
worker certainly helps but you have to *reduce* network transfers as much
as possible. Also there is a huge difference between different machines and
configurations; for instance virtualized setups (at least to my end)
usually perform considerably better (network-wise) than when having to deal
with separate physical nodes. For example in my soft-cluster (virtualized)
I could process batches of around 1mb under 200 ms against two nodes (from
Spout -> Bolt A).

You will also have to take in account latency and protocol overhead and any
other network transfers that might hinder performance, truth to be told
I've noticed some deviation in processing times but nothing much (it might
be due to GC phases as well).

Hope this helps.

Kindly yours,

Andrew Grammenos

-- PGP PKey --
​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt

On Thu, Jul 16, 2015 at 10:50 PM, Nick R. Katsipoulakis <
nick.katsip@gmail.com> wrote:

> Hello all,
>
> I have two questions:
>
> 1) How do you exactly measure latency? I am doing the same thing and I
> have a problem getting the exact milliseconds of latency (mainly because of
> clock drifting).
> 2) (to Nathan) Is there a difference in speeds among different groupings?
> For instance, is shuffle faster than direct grouping?
>
> Thanks,
> Nick
>
> 2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:
>
>> Two things. Your math may be off depending on parallelism. One emit from
>> A becomes 100 emitted from C, and you are joining all of them.
>>
>> Second, try the default number of ackers (one per worker). All your ack
>> traffic is going to a single task.
>>
>> Also you can try local or shuffle grouping if possible to reduce network
>> transfers.
>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> We are attempting a real-time distributed computing using storm and the
>>> solution has only one problem - inter bolt latency on same machine or
>>> across machines ranges between 2 - 250 ms. I am not able to figure out why.
>>> Network latency is under 0.5 ms. By latency, I mean the time between an
>>> emit of one bolt/spout to getting the message in execute() of next bolt.
>>>
>>> I have a topology like the below -
>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this number
>>> and divides this into 10 emits of 100 each) -> C (bolt) [Recieves these
>>> emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does some
>>> computation on the number and emits one message] -> E (bolt) [Aggregates
>>> all the data and confirms if all the 1000 messages are processed)
>>>
>>> Every bolt takes under 3 msec to complete and as a result, I estimated
>>> that the end to end processing for 1000 takes not more than 50 msec
>>> including any latencies.
>>>
>>> *Observations*
>>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>>> seconds. My estimate was under 50 msec given that each bolt and spout take
>>> under 3 msec to execute including any latencies.
>>> 2. I noticed that the most of the time is spent between Emit from a
>>> Spout/Bolt and execute() of the consuming bolt.
>>> 3. Network latency is under 0.5 msec.
>>>
>>> I am not able to figure out why it takes so much time between a
>>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>>> into a queue and then the subsequent bolt consumes from there.
>>>
>>> *Infrastructure*
>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there are
>>> 20 workers overall.
>>>
>>> *Test*
>>> 1. The test was done with 25 messages to the spout => 25 messages are
>>> sent to spout in a span of 5 seconds.
>>>
>>> *Config values*
>>> Config config = new Config();
>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>
>>> Please let me know if you have encountered similar issues and any steps
>>> you have taken to mitigate the time taken between spout/bolt and another
>>> bolt.
>>>
>>> Thanks
>>> Kashyap
>>>
>>
>
>
> --
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>

Re: Realtime computations using storm - questions on performance

Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Hello all,

I have two questions:

1) How do you exactly measure latency? I am doing the same thing and I have
a problem getting the exact milliseconds of latency (mainly because of
clock drifting).
2) (to Nathan) Is there a difference in speeds among different groupings?
For instance, is shuffle faster than direct grouping?

Thanks,
Nick

2015-07-15 17:37 GMT-04:00 Nathan Leung <nc...@gmail.com>:

> Two things. Your math may be off depending on parallelism. One emit from A
> becomes 100 emitted from C, and you are joining all of them.
>
> Second, try the default number of ackers (one per worker). All your ack
> traffic is going to a single task.
>
> Also you can try local or shuffle grouping if possible to reduce network
> transfers.
> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:
>
>> Hi,
>> We are attempting a real-time distributed computing using storm and the
>> solution has only one problem - inter bolt latency on same machine or
>> across machines ranges between 2 - 250 ms. I am not able to figure out why.
>> Network latency is under 0.5 ms. By latency, I mean the time between an
>> emit of one bolt/spout to getting the message in execute() of next bolt.
>>
>> I have a topology like the below -
>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this number
>> and divides this into 10 emits of 100 each) -> C (bolt) [Recieves these
>> emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does some
>> computation on the number and emits one message] -> E (bolt) [Aggregates
>> all the data and confirms if all the 1000 messages are processed)
>>
>> Every bolt takes under 3 msec to complete and as a result, I estimated
>> that the end to end processing for 1000 takes not more than 50 msec
>> including any latencies.
>>
>> *Observations*
>> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3
>> seconds. My estimate was under 50 msec given that each bolt and spout take
>> under 3 msec to execute including any latencies.
>> 2. I noticed that the most of the time is spent between Emit from a
>> Spout/Bolt and execute() of the consuming bolt.
>> 3. Network latency is under 0.5 msec.
>>
>> I am not able to figure out why it takes so much time between a
>> spout/bolt to next bolt. I understand that the spout/bolt buffers the data
>> into a queue and then the subsequent bolt consumes from there.
>>
>> *Infrastructure*
>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there are
>> 20 workers overall.
>>
>> *Test*
>> 1. The test was done with 25 messages to the spout => 25 messages are
>> sent to spout in a span of 5 seconds.
>>
>> *Config values*
>> Config config = new Config();
>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>
>> Please let me know if you have encountered similar issues and any steps
>> you have taken to mitigate the time taken between spout/bolt and another
>> bolt.
>>
>> Thanks
>> Kashyap
>>
>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: Realtime computations using storm - questions on performance

Posted by Nathan Leung <nc...@gmail.com>.
Two things. Your math may be off depending on parallelism. One emit from A
becomes 100 emitted from C, and you are joining all of them.

Second, try the default number of ackers (one per worker). All your ack
traffic is going to a single task.

Also you can try local or shuffle grouping if possible to reduce network
transfers.
On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <ka...@gmail.com> wrote:

> Hi,
> We are attempting a real-time distributed computing using storm and the
> solution has only one problem - inter bolt latency on same machine or
> across machines ranges between 2 - 250 ms. I am not able to figure out why.
> Network latency is under 0.5 ms. By latency, I mean the time between an
> emit of one bolt/spout to getting the message in execute() of next bolt.
>
> I have a topology like the below -
> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives this number
> and divides this into 10 emits of 100 each) -> C (bolt) [Recieves these
> emits and divides this to 10 emits of 10 numbers) -> D (bolt) [Does some
> computation on the number and emits one message] -> E (bolt) [Aggregates
> all the data and confirms if all the 1000 messages are processed)
>
> Every bolt takes under 3 msec to complete and as a result, I estimated
> that the end to end processing for 1000 takes not more than 50 msec
> including any latencies.
>
> *Observations*
> 1. The end to end time from Spout A to Bolt E takes 200 msec to 3 seconds.
> My estimate was under 50 msec given that each bolt and spout take under 3
> msec to execute including any latencies.
> 2. I noticed that the most of the time is spent between Emit from a
> Spout/Bolt and execute() of the consuming bolt.
> 3. Network latency is under 0.5 msec.
>
> I am not able to figure out why it takes so much time between a spout/bolt
> to next bolt. I understand that the spout/bolt buffers the data into a
> queue and then the subsequent bolt consumes from there.
>
> *Infrastructure*
> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB and there are
> 20 workers overall.
>
> *Test*
> 1. The test was done with 25 messages to the spout => 25 messages are sent
> to spout in a span of 5 seconds.
>
> *Config values*
> Config config = new Config();
> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>
> Please let me know if you have encountered similar issues and any steps
> you have taken to mitigate the time taken between spout/bolt and another
> bolt.
>
> Thanks
> Kashyap
>