You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by John Yost <so...@gmail.com> on 2015/08/14 18:31:39 UTC

200 bolts to 5 bolts--decreases throughput

Hi Everyone,

I have a topology where a highly CPU-intensive bolt (Bolt A) requires a
much higher degree of parallelism than the bolt it emits tuples to (Bolt B)
(200 Bolt A executors vs <= 100 Bolt B executors).

I find that the throughput, as measured in number of tuples acked, goes
from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if
all of the logic within the Bolt B execute method is disabled and the Bolt
B is therefore simply acking the input tuples from Bolt A. In addition, I
find that, going from 50 to 100 Bolt B executors causes the throughput to
go from 900K/minute to ~ 1.1 million/minute.

Is the fact that I am going from 200 bolt instances to 100 or less the
problem?   I've already experimented with executor.send.buffer.size and
executor.receive.buffer.size, which helped drive throughput from 800K to
900K. I will try topology.transfer.buffer.size, perhaps set that higher to
2048. Any other ideas?

Thanks

--John

Re: 200 bolts to 5 bolts--decreases throughput

Posted by so...@gmail.com.
Great thoughts and info--thanks Kishore!

--John

Sent from my iPhone

> On Aug 15, 2015, at 2:42 PM, Kishore Senji <ks...@gmail.com> wrote:
> 
> As long as the number of instances of B is a multiple of the number of instances of A and the number of instances of A being a multiple of number of Workers, we will get a nice even distribution with Storm.
> 
> Yes please choose local or shuffle grouping on the path where there is more data. For example, if Bolt A emits two tuples for every tuple it receives to Bolt B, it makes sense in that case to have that path as the local or shuffle grouping. But in there is a case where Bolt A only emits 10% of the time to Bolt B, then it makes sense to have the upstream to Bolt A in local or shuffling mode.
> 
> 
>> On Sat, Aug 15, 2015 at 4:54 AM, John Yost <so...@gmail.com> wrote:
>> Actually, I only need to match the number of Bolt B executors to the number of workers to ensure local shuffling in the Bolt A to Bolt B step, correct?  I am hoping that Storm would put one Bolt B executor in each Java process.  Is there something special I need to configure to make that happen?
>> 
>> --John
>> 
>>> On Sat, Aug 15, 2015 at 7:27 AM, John Yost <so...@gmail.com> wrote:
>>> Hi Kishore,
>>> 
>>> This is an excellent response--thanks for taking time to write back!  This is great analysis of the problem space. I/O overhead--disk plus network--is a big problem for us.
>>> 
>>> The suggestion to use Local grouping is an excellent one, but I need to use the Fields grouping at some point--either from the KafkaSpout to Bolt A, or Bolt A to Bolt B. The reason I need to do this is we're caching Key/Value pairs and then persisting each collection to a SequenceFile once each cache collection reaches a certain size. 
>>> 
>>> I currently have fields grouping for Bolt A to Bolt B, and would like to maintain this approach as the KafkaSpout has 10 partitions as input, any one of which can develop hotspots. Consequently, I am using the Shuffle grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples coming from Kafka to Bolt A. I then use Fields grouping to ensure the same Key/Value pairs go tot the same Bolt B instance. However, as you point out, the network overhead is really throttling throughput, so it's worth a shot to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local grouping at the Bolt A-Bolt B step.
>>> 
>>> Since I have one worker per node as I mentioned in a response to Javier's earlier post, I will need to up the number of Bolt B instances to match Bolt A to ensure Local shuffling for all tuples. Not totally sure how much adding 300 additional executors for 120 workers will negatively impact performance, but I am guessing that it would be more than offset by the greatly decreased network and disk I/O, since Bolt A-Bolt B will be all intra-Java process messaging via IMAX.
>>> 
>>> Thanks again for your excellent thoughts!
>>> 
>>> --John
>>> 
>>>> On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <ks...@gmail.com> wrote:
>>>> 7 million/minute with 200 instances, implies a latency of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput calculations, I would normalize it and visualize a single instance of the bolt running. So for 1 both instance, the latency would be 1.71/200. 
>>>> 
>>>> By adding bolt B with 50 instances, the throughput came to 900000/min, which means
>>>> 
>>>> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around 2.9 ms. Even though you mentioned there is nothing in the Bolt B impl, the overall latency is 2.9ms, this means there is a lot of overhead in the network. Are you using fields grouping or because of the number of instances the tuples are sent over the network. Try using local grouping.
>>>> 
>>>> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6 million/min. This means adding more instances added more overhead, could be that more number of tuples are going over the network (because they went in to different worker processes). Calculating the bolt b latency for 1.1 million gives = 4.6 ms of overhead for Bolt B.
>>>> 
>>>> Adding more instances can be costly because of the IPC. The key would be to get more local shuffling. Check if you have any failures in the overall system and also check the Storm UI which will show the bottleneck in your topology (the capacity of the bolts and the latencies etc)
>>>> 
>>>> 
>>>>> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <ja...@gmail.com> wrote:
>>>>> Do  you actually have 170 machines? Try sticking to one worker per machine (tweak memory parameters in storm.yaml), makes inter bolt traffic much faster.
>>>>> 
>>>>>> On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:
>>>>>> Hey Javier,
>>>>>> 
>>>>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is optimal, but cluster resources make it tricky to actually launch this).
>>>>>> 
>>>>>> I will up the number of Ackers and see if that helps. If not, then I will try to vary the number of B bolts beyond 100.
>>>>>> 
>>>>>> Thanks Again!
>>>>>> 
>>>>>> --John
>>>>>> 
>>>>>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com> wrote:
>>>>>>> You will have a detrimental effect to wiring in boltB, even if it does nothing but ack. Every tuple you have processed from A has to travel to a B bolt, and the ack has to travel back.
>>>>>>> 
>>>>>>> You could try modifying the number of ackers, and playing with the number of A and B bolts. How many workers do you have for the topology?
>>>>>>> 
>>>>>>> Regards,
>>>>>>> JG
>>>>>>> 
>>>>>>>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com> wrote:
>>>>>>>> Hi Everyone,
>>>>>>>> 
>>>>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires a much higher degree of parallelism than the bolt it emits tuples to (Bolt B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>>>> 
>>>>>>>> I find that the throughput, as measured in number of tuples acked, goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if all of the logic within the Bolt B execute method is disabled and the Bolt B is therefore simply acking the input tuples from Bolt A. In addition, I find that, going from 50 to 100 Bolt B executors causes the throughput to go from 900K/minute to ~ 1.1 million/minute.  
>>>>>>>> 
>>>>>>>> Is the fact that I am going from 200 bolt instances to 100 or less the problem?   I've already experimented with executor.send.buffer.size and executor.receive.buffer.size, which helped drive throughput from 800K to 900K. I will try topology.transfer.buffer.size, perhaps set that higher to 2048. Any other ideas?  
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> 
>>>>>>>> --John
> 

Re: 200 bolts to 5 bolts--decreases throughput

Posted by Kishore Senji <ks...@gmail.com>.
As long as the number of instances of B is a multiple of the number of
instances of A and the number of instances of A being a multiple of number
of Workers, we will get a nice even distribution with Storm.

Yes please choose local or shuffle grouping on the path where there is more
data. For example, if Bolt A emits two tuples for every tuple it receives
to Bolt B, it makes sense in that case to have that path as the local or
shuffle grouping. But in there is a case where Bolt A only emits 10% of the
time to Bolt B, then it makes sense to have the upstream to Bolt A in local
or shuffling mode.


On Sat, Aug 15, 2015 at 4:54 AM, John Yost <so...@gmail.com>
wrote:

> Actually, I only need to match the number of Bolt B executors to the
> number of workers to ensure local shuffling in the Bolt A to Bolt B step,
> correct?  I am hoping that Storm would put one Bolt B executor in each Java
> process.  Is there something special I need to configure to make that
> happen?
>
> --John
>
> On Sat, Aug 15, 2015 at 7:27 AM, John Yost <so...@gmail.com>
> wrote:
>
>> Hi Kishore,
>>
>> This is an excellent response--thanks for taking time to write back!
>> This is great analysis of the problem space. I/O overhead--disk plus
>> network--is a big problem for us.
>>
>> The suggestion to use Local grouping is an excellent one, but I need to
>> use the Fields grouping at some point--either from the KafkaSpout to Bolt
>> A, or Bolt A to Bolt B. The reason I need to do this is we're caching
>> Key/Value pairs and then persisting each collection to a SequenceFile once
>> each cache collection reaches a certain size.
>>
>> I currently have fields grouping for Bolt A to Bolt B, and would like to
>> maintain this approach as the KafkaSpout has 10 partitions as input, any
>> one of which can develop hotspots. Consequently, I am using the Shuffle
>> grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples
>> coming from Kafka to Bolt A. I then use Fields grouping to ensure the same
>> Key/Value pairs go tot the same Bolt B instance. However, as you point out,
>> the network overhead is really throttling throughput, so it's worth a shot
>> to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local
>> grouping at the Bolt A-Bolt B step.
>>
>> Since I have one worker per node as I mentioned in a response to Javier's
>> earlier post, I will need to up the number of Bolt B instances to match
>> Bolt A to ensure Local shuffling for all tuples. Not totally sure how much
>> adding 300 additional executors for 120 workers will negatively impact
>> performance, but I am guessing that it would be more than offset by the
>> greatly decreased network and disk I/O, since Bolt A-Bolt B will be all
>> intra-Java process messaging via IMAX.
>>
>> Thanks again for your excellent thoughts!
>>
>> --John
>>
>> On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <ks...@gmail.com> wrote:
>>
>>> 7 million/minute with 200 instances, implies a latency
>>> of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput
>>> calculations, I would normalize it and visualize a single instance of the
>>> bolt running. So for 1 both instance, the latency would be 1.71/200.
>>>
>>> By adding bolt B with 50 instances, the throughput came to 900000/min,
>>> which means
>>>
>>> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around
>>> 2.9 ms. Even though you mentioned there is nothing in the Bolt B impl, the
>>> overall latency is 2.9ms, this means there is a lot of overhead in the
>>> network. Are you using fields grouping or because of the number of
>>> instances the tuples are sent over the network. Try using local grouping.
>>>
>>> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this
>>> equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6
>>> million/min. This means adding more instances added more overhead, could be
>>> that more number of tuples are going over the network (because they went in
>>> to different worker processes). Calculating the bolt b latency for 1.1
>>> million gives = 4.6 ms of overhead for Bolt B.
>>>
>>> Adding more instances can be costly because of the IPC. The key would be
>>> to get more local shuffling. Check if you have any failures in the overall
>>> system and also check the Storm UI which will show the bottleneck in your
>>> topology (the capacity of the bolts and the latencies etc)
>>>
>>>
>>> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <ja...@gmail.com>
>>> wrote:
>>>
>>>> Do  you actually have 170 machines? Try sticking to one worker per
>>>> machine (tweak memory parameters in storm.yaml), makes inter bolt traffic
>>>> much faster.
>>>> On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:
>>>>
>>>>> Hey Javier,
>>>>>
>>>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5
>>>>> Bolt B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is
>>>>> optimal, but cluster resources make it tricky to actually launch this).
>>>>>
>>>>> I will up the number of Ackers and see if that helps. If not, then I
>>>>> will try to vary the number of B bolts beyond 100.
>>>>>
>>>>> Thanks Again!
>>>>>
>>>>> --John
>>>>>
>>>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You will have a detrimental effect to wiring in boltB, even if it
>>>>>> does nothing but ack. Every tuple you have processed from A has to travel
>>>>>> to a B bolt, and the ack has to travel back.
>>>>>>
>>>>>> You could try modifying the number of ackers, and playing with the
>>>>>> number of A and B bolts. How many workers do you have for the topology?
>>>>>>
>>>>>> Regards,
>>>>>> JG
>>>>>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Everyone,
>>>>>>>
>>>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A)
>>>>>>> requires a much higher degree of parallelism than the bolt it emits tuples
>>>>>>> to (Bolt B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>>>
>>>>>>> I find that the throughput, as measured in number of tuples acked,
>>>>>>> goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt
>>>>>>> B--even if all of the logic within the Bolt B execute method is disabled
>>>>>>> and the Bolt B is therefore simply acking the input tuples from Bolt A. In
>>>>>>> addition, I find that, going from 50 to 100 Bolt B executors causes the
>>>>>>> throughput to go from 900K/minute to ~ 1.1 million/minute.
>>>>>>>
>>>>>>> Is the fact that I am going from 200 bolt instances to 100 or less
>>>>>>> the problem?   I've already experimented with executor.send.buffer.size and
>>>>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>>>>>>> 2048. Any other ideas?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> --John
>>>>>>>
>>>>>>>
>>>>>
>>
>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by John Yost <so...@gmail.com>.
Actually, I only need to match the number of Bolt B executors to the number
of workers to ensure local shuffling in the Bolt A to Bolt B step,
correct?  I am hoping that Storm would put one Bolt B executor in each Java
process.  Is there something special I need to configure to make that
happen?

--John

On Sat, Aug 15, 2015 at 7:27 AM, John Yost <so...@gmail.com>
wrote:

> Hi Kishore,
>
> This is an excellent response--thanks for taking time to write back!  This
> is great analysis of the problem space. I/O overhead--disk plus network--is
> a big problem for us.
>
> The suggestion to use Local grouping is an excellent one, but I need to
> use the Fields grouping at some point--either from the KafkaSpout to Bolt
> A, or Bolt A to Bolt B. The reason I need to do this is we're caching
> Key/Value pairs and then persisting each collection to a SequenceFile once
> each cache collection reaches a certain size.
>
> I currently have fields grouping for Bolt A to Bolt B, and would like to
> maintain this approach as the KafkaSpout has 10 partitions as input, any
> one of which can develop hotspots. Consequently, I am using the Shuffle
> grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples
> coming from Kafka to Bolt A. I then use Fields grouping to ensure the same
> Key/Value pairs go tot the same Bolt B instance. However, as you point out,
> the network overhead is really throttling throughput, so it's worth a shot
> to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local
> grouping at the Bolt A-Bolt B step.
>
> Since I have one worker per node as I mentioned in a response to Javier's
> earlier post, I will need to up the number of Bolt B instances to match
> Bolt A to ensure Local shuffling for all tuples. Not totally sure how much
> adding 300 additional executors for 120 workers will negatively impact
> performance, but I am guessing that it would be more than offset by the
> greatly decreased network and disk I/O, since Bolt A-Bolt B will be all
> intra-Java process messaging via IMAX.
>
> Thanks again for your excellent thoughts!
>
> --John
>
> On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <ks...@gmail.com> wrote:
>
>> 7 million/minute with 200 instances, implies a latency
>> of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput
>> calculations, I would normalize it and visualize a single instance of the
>> bolt running. So for 1 both instance, the latency would be 1.71/200.
>>
>> By adding bolt B with 50 instances, the throughput came to 900000/min,
>> which means
>>
>> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around 2.9
>> ms. Even though you mentioned there is nothing in the Bolt B impl, the
>> overall latency is 2.9ms, this means there is a lot of overhead in the
>> network. Are you using fields grouping or because of the number of
>> instances the tuples are sent over the network. Try using local grouping.
>>
>> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this
>> equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6
>> million/min. This means adding more instances added more overhead, could be
>> that more number of tuples are going over the network (because they went in
>> to different worker processes). Calculating the bolt b latency for 1.1
>> million gives = 4.6 ms of overhead for Bolt B.
>>
>> Adding more instances can be costly because of the IPC. The key would be
>> to get more local shuffling. Check if you have any failures in the overall
>> system and also check the Storm UI which will show the bottleneck in your
>> topology (the capacity of the bolts and the latencies etc)
>>
>>
>> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <ja...@gmail.com>
>> wrote:
>>
>>> Do  you actually have 170 machines? Try sticking to one worker per
>>> machine (tweak memory parameters in storm.yaml), makes inter bolt traffic
>>> much faster.
>>> On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:
>>>
>>>> Hey Javier,
>>>>
>>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5
>>>> Bolt B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is
>>>> optimal, but cluster resources make it tricky to actually launch this).
>>>>
>>>> I will up the number of Ackers and see if that helps. If not, then I
>>>> will try to vary the number of B bolts beyond 100.
>>>>
>>>> Thanks Again!
>>>>
>>>> --John
>>>>
>>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> You will have a detrimental effect to wiring in boltB, even if it does
>>>>> nothing but ack. Every tuple you have processed from A has to travel to a B
>>>>> bolt, and the ack has to travel back.
>>>>>
>>>>> You could try modifying the number of ackers, and playing with the
>>>>> number of A and B bolts. How many workers do you have for the topology?
>>>>>
>>>>> Regards,
>>>>> JG
>>>>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Everyone,
>>>>>>
>>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires
>>>>>> a much higher degree of parallelism than the bolt it emits tuples to (Bolt
>>>>>> B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>>
>>>>>> I find that the throughput, as measured in number of tuples acked,
>>>>>> goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt
>>>>>> B--even if all of the logic within the Bolt B execute method is disabled
>>>>>> and the Bolt B is therefore simply acking the input tuples from Bolt A. In
>>>>>> addition, I find that, going from 50 to 100 Bolt B executors causes the
>>>>>> throughput to go from 900K/minute to ~ 1.1 million/minute.
>>>>>>
>>>>>> Is the fact that I am going from 200 bolt instances to 100 or less
>>>>>> the problem?   I've already experimented with executor.send.buffer.size and
>>>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>>>>>> 2048. Any other ideas?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> --John
>>>>>>
>>>>>>
>>>>
>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by John Yost <so...@gmail.com>.
Hi Kishore,

This is an excellent response--thanks for taking time to write back!  This
is great analysis of the problem space. I/O overhead--disk plus network--is
a big problem for us.

The suggestion to use Local grouping is an excellent one, but I need to use
the Fields grouping at some point--either from the KafkaSpout to Bolt A, or
Bolt A to Bolt B. The reason I need to do this is we're caching Key/Value
pairs and then persisting each collection to a SequenceFile once each cache
collection reaches a certain size.

I currently have fields grouping for Bolt A to Bolt B, and would like to
maintain this approach as the KafkaSpout has 10 partitions as input, any
one of which can develop hotspots. Consequently, I am using the Shuffle
grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples
coming from Kafka to Bolt A. I then use Fields grouping to ensure the same
Key/Value pairs go tot the same Bolt B instance. However, as you point out,
the network overhead is really throttling throughput, so it's worth a shot
to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local
grouping at the Bolt A-Bolt B step.

Since I have one worker per node as I mentioned in a response to Javier's
earlier post, I will need to up the number of Bolt B instances to match
Bolt A to ensure Local shuffling for all tuples. Not totally sure how much
adding 300 additional executors for 120 workers will negatively impact
performance, but I am guessing that it would be more than offset by the
greatly decreased network and disk I/O, since Bolt A-Bolt B will be all
intra-Java process messaging via IMAX.

Thanks again for your excellent thoughts!

--John

On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <ks...@gmail.com> wrote:

> 7 million/minute with 200 instances, implies a latency
> of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput
> calculations, I would normalize it and visualize a single instance of the
> bolt running. So for 1 both instance, the latency would be 1.71/200.
>
> By adding bolt B with 50 instances, the throughput came to 900000/min,
> which means
>
> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around 2.9
> ms. Even though you mentioned there is nothing in the Bolt B impl, the
> overall latency is 2.9ms, this means there is a lot of overhead in the
> network. Are you using fields grouping or because of the number of
> instances the tuples are sent over the network. Try using local grouping.
>
> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this
> equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6
> million/min. This means adding more instances added more overhead, could be
> that more number of tuples are going over the network (because they went in
> to different worker processes). Calculating the bolt b latency for 1.1
> million gives = 4.6 ms of overhead for Bolt B.
>
> Adding more instances can be costly because of the IPC. The key would be
> to get more local shuffling. Check if you have any failures in the overall
> system and also check the Storm UI which will show the bottleneck in your
> topology (the capacity of the bolts and the latencies etc)
>
>
> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <ja...@gmail.com>
> wrote:
>
>> Do  you actually have 170 machines? Try sticking to one worker per
>> machine (tweak memory parameters in storm.yaml), makes inter bolt traffic
>> much faster.
>> On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:
>>
>>> Hey Javier,
>>>
>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt
>>> B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is optimal,
>>> but cluster resources make it tricky to actually launch this).
>>>
>>> I will up the number of Ackers and see if that helps. If not, then I
>>> will try to vary the number of B bolts beyond 100.
>>>
>>> Thanks Again!
>>>
>>> --John
>>>
>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com>
>>> wrote:
>>>
>>>> You will have a detrimental effect to wiring in boltB, even if it does
>>>> nothing but ack. Every tuple you have processed from A has to travel to a B
>>>> bolt, and the ack has to travel back.
>>>>
>>>> You could try modifying the number of ackers, and playing with the
>>>> number of A and B bolts. How many workers do you have for the topology?
>>>>
>>>> Regards,
>>>> JG
>>>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Everyone,
>>>>>
>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires
>>>>> a much higher degree of parallelism than the bolt it emits tuples to (Bolt
>>>>> B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>
>>>>> I find that the throughput, as measured in number of tuples acked,
>>>>> goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt
>>>>> B--even if all of the logic within the Bolt B execute method is disabled
>>>>> and the Bolt B is therefore simply acking the input tuples from Bolt A. In
>>>>> addition, I find that, going from 50 to 100 Bolt B executors causes the
>>>>> throughput to go from 900K/minute to ~ 1.1 million/minute.
>>>>>
>>>>> Is the fact that I am going from 200 bolt instances to 100 or less the
>>>>> problem?   I've already experimented with executor.send.buffer.size and
>>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>>>>> 2048. Any other ideas?
>>>>>
>>>>> Thanks
>>>>>
>>>>> --John
>>>>>
>>>>>
>>>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by Kishore Senji <ks...@gmail.com>.
7 million/minute with 200 instances, implies a latency
of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput
calculations, I would normalize it and visualize a single instance of the
bolt running. So for 1 both instance, the latency would be 1.71/200.

By adding bolt B with 50 instances, the throughput came to 900000/min,
which means

900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around 2.9
ms. Even though you mentioned there is nothing in the Bolt B impl, the
overall latency is 2.9ms, this means there is a lot of overhead in the
network. Are you using fields grouping or because of the number of
instances the tuples are sent over the network. Try using local grouping.

Bumping bolt B to 100 instances, you got 1.1 million/minute. But this
equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6
million/min. This means adding more instances added more overhead, could be
that more number of tuples are going over the network (because they went in
to different worker processes). Calculating the bolt b latency for 1.1
million gives = 4.6 ms of overhead for Bolt B.

Adding more instances can be costly because of the IPC. The key would be to
get more local shuffling. Check if you have any failures in the overall
system and also check the Storm UI which will show the bottleneck in your
topology (the capacity of the bolts and the latencies etc)


On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <ja...@gmail.com> wrote:

> Do  you actually have 170 machines? Try sticking to one worker per machine
> (tweak memory parameters in storm.yaml), makes inter bolt traffic much
> faster.
> On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:
>
>> Hey Javier,
>>
>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt
>> B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is optimal,
>> but cluster resources make it tricky to actually launch this).
>>
>> I will up the number of Ackers and see if that helps. If not, then I will
>> try to vary the number of B bolts beyond 100.
>>
>> Thanks Again!
>>
>> --John
>>
>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com>
>> wrote:
>>
>>> You will have a detrimental effect to wiring in boltB, even if it does
>>> nothing but ack. Every tuple you have processed from A has to travel to a B
>>> bolt, and the ack has to travel back.
>>>
>>> You could try modifying the number of ackers, and playing with the
>>> number of A and B bolts. How many workers do you have for the topology?
>>>
>>> Regards,
>>> JG
>>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires a
>>>> much higher degree of parallelism than the bolt it emits tuples to (Bolt B)
>>>> (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>
>>>> I find that the throughput, as measured in number of tuples acked, goes
>>>> from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if
>>>> all of the logic within the Bolt B execute method is disabled and the Bolt
>>>> B is therefore simply acking the input tuples from Bolt A. In addition, I
>>>> find that, going from 50 to 100 Bolt B executors causes the throughput to
>>>> go from 900K/minute to ~ 1.1 million/minute.
>>>>
>>>> Is the fact that I am going from 200 bolt instances to 100 or less the
>>>> problem?   I've already experimented with executor.send.buffer.size and
>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>>>> 2048. Any other ideas?
>>>>
>>>> Thanks
>>>>
>>>> --John
>>>>
>>>>
>>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by John Yost <so...@gmail.com>.
Hey Javier,

Sorry, just to clarify, when I have 50 workers I configure the topology to
have 100 Bolt A executors and 5 Bolt B executors, and when I have 120
workers (the max I've gotten on our cluster), the topology is configured to
have 200 Bolt A executors and 100 Bolt B executors for the best
configuration(s).  Yes, we actually have 300 nodes in our Mesos cluster,
but it is a multi-tenant environment so, again, I can get 120 workers
somewhat reliably, but no more.

Definitely a great suggestion regarding getting one worker per node. We are
using a Storm on Mesos setup and I configure each worker to have 7 GB of
RAM and 7 CPU cores. As a consequence, I get one worker per machine since
each machine has 2 quad core processors.

--John

On Fri, Aug 14, 2015 at 5:43 PM, Javier Gonzalez <ja...@gmail.com> wrote:

> Do  you actually have 170 machines? Try sticking to one worker per machine
> (tweak memory parameters in storm.yaml), makes inter bolt traffic much
> faster.
> On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:
>
>> Hey Javier,
>>
>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt
>> B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is optimal,
>> but cluster resources make it tricky to actually launch this).
>>
>> I will up the number of Ackers and see if that helps. If not, then I will
>> try to vary the number of B bolts beyond 100.
>>
>> Thanks Again!
>>
>> --John
>>
>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com>
>> wrote:
>>
>>> You will have a detrimental effect to wiring in boltB, even if it does
>>> nothing but ack. Every tuple you have processed from A has to travel to a B
>>> bolt, and the ack has to travel back.
>>>
>>> You could try modifying the number of ackers, and playing with the
>>> number of A and B bolts. How many workers do you have for the topology?
>>>
>>> Regards,
>>> JG
>>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires a
>>>> much higher degree of parallelism than the bolt it emits tuples to (Bolt B)
>>>> (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>
>>>> I find that the throughput, as measured in number of tuples acked, goes
>>>> from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if
>>>> all of the logic within the Bolt B execute method is disabled and the Bolt
>>>> B is therefore simply acking the input tuples from Bolt A. In addition, I
>>>> find that, going from 50 to 100 Bolt B executors causes the throughput to
>>>> go from 900K/minute to ~ 1.1 million/minute.
>>>>
>>>> Is the fact that I am going from 200 bolt instances to 100 or less the
>>>> problem?   I've already experimented with executor.send.buffer.size and
>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>>>> 2048. Any other ideas?
>>>>
>>>> Thanks
>>>>
>>>> --John
>>>>
>>>>
>>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by Javier Gonzalez <ja...@gmail.com>.
Do  you actually have 170 machines? Try sticking to one worker per machine
(tweak memory parameters in storm.yaml), makes inter bolt traffic much
faster.
On Aug 14, 2015 5:28 PM, "John Yost" <so...@gmail.com> wrote:

> Hey Javier,
>
> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt B
> and 120 workers for 400 Bolt A/100 Bolt B (this latter config is optimal,
> but cluster resources make it tricky to actually launch this).
>
> I will up the number of Ackers and see if that helps. If not, then I will
> try to vary the number of B bolts beyond 100.
>
> Thanks Again!
>
> --John
>
> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com>
> wrote:
>
>> You will have a detrimental effect to wiring in boltB, even if it does
>> nothing but ack. Every tuple you have processed from A has to travel to a B
>> bolt, and the ack has to travel back.
>>
>> You could try modifying the number of ackers, and playing with the number
>> of A and B bolts. How many workers do you have for the topology?
>>
>> Regards,
>> JG
>> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com> wrote:
>>
>>> Hi Everyone,
>>>
>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires a
>>> much higher degree of parallelism than the bolt it emits tuples to (Bolt B)
>>> (200 Bolt A executors vs <= 100 Bolt B executors).
>>>
>>> I find that the throughput, as measured in number of tuples acked, goes
>>> from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if
>>> all of the logic within the Bolt B execute method is disabled and the Bolt
>>> B is therefore simply acking the input tuples from Bolt A. In addition, I
>>> find that, going from 50 to 100 Bolt B executors causes the throughput to
>>> go from 900K/minute to ~ 1.1 million/minute.
>>>
>>> Is the fact that I am going from 200 bolt instances to 100 or less the
>>> problem?   I've already experimented with executor.send.buffer.size and
>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>>> 2048. Any other ideas?
>>>
>>> Thanks
>>>
>>> --John
>>>
>>>
>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by John Yost <so...@gmail.com>.
Hey Javier,

Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt B
and 120 workers for 400 Bolt A/100 Bolt B (this latter config is optimal,
but cluster resources make it tricky to actually launch this).

I will up the number of Ackers and see if that helps. If not, then I will
try to vary the number of B bolts beyond 100.

Thanks Again!

--John

On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <ja...@gmail.com> wrote:

> You will have a detrimental effect to wiring in boltB, even if it does
> nothing but ack. Every tuple you have processed from A has to travel to a B
> bolt, and the ack has to travel back.
>
> You could try modifying the number of ackers, and playing with the number
> of A and B bolts. How many workers do you have for the topology?
>
> Regards,
> JG
> On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com> wrote:
>
>> Hi Everyone,
>>
>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires a
>> much higher degree of parallelism than the bolt it emits tuples to (Bolt B)
>> (200 Bolt A executors vs <= 100 Bolt B executors).
>>
>> I find that the throughput, as measured in number of tuples acked, goes
>> from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if
>> all of the logic within the Bolt B execute method is disabled and the Bolt
>> B is therefore simply acking the input tuples from Bolt A. In addition, I
>> find that, going from 50 to 100 Bolt B executors causes the throughput to
>> go from 900K/minute to ~ 1.1 million/minute.
>>
>> Is the fact that I am going from 200 bolt instances to 100 or less the
>> problem?   I've already experimented with executor.send.buffer.size and
>> executor.receive.buffer.size, which helped drive throughput from 800K to
>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
>> 2048. Any other ideas?
>>
>> Thanks
>>
>> --John
>>
>>

Re: 200 bolts to 5 bolts--decreases throughput

Posted by Javier Gonzalez <ja...@gmail.com>.
You will have a detrimental effect to wiring in boltB, even if it does
nothing but ack. Every tuple you have processed from A has to travel to a B
bolt, and the ack has to travel back.

You could try modifying the number of ackers, and playing with the number
of A and B bolts. How many workers do you have for the topology?

Regards,
JG
On Aug 14, 2015 12:31 PM, "John Yost" <so...@gmail.com> wrote:

> Hi Everyone,
>
> I have a topology where a highly CPU-intensive bolt (Bolt A) requires a
> much higher degree of parallelism than the bolt it emits tuples to (Bolt B)
> (200 Bolt A executors vs <= 100 Bolt B executors).
>
> I find that the throughput, as measured in number of tuples acked, goes
> from 7 million/minute to ~ 1 million/minute when I wire in Bolt B--even if
> all of the logic within the Bolt B execute method is disabled and the Bolt
> B is therefore simply acking the input tuples from Bolt A. In addition, I
> find that, going from 50 to 100 Bolt B executors causes the throughput to
> go from 900K/minute to ~ 1.1 million/minute.
>
> Is the fact that I am going from 200 bolt instances to 100 or less the
> problem?   I've already experimented with executor.send.buffer.size and
> executor.receive.buffer.size, which helped drive throughput from 800K to
> 900K. I will try topology.transfer.buffer.size, perhaps set that higher to
> 2048. Any other ideas?
>
> Thanks
>
> --John
>
>