You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Navin Ipe <na...@searchlighthealth.com> on 2016/06/01 08:21:32 UTC

Re: Understanding parallelism in Storm

Thanks Matthias. I just verified this and found why there's this confusion
about tasks.

In this case:
int BoltParallelism = 3;
int BoltTaskParallelism = 2;
builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
                .setNumTasks(*BoltTaskParallelism*)

BoltParallelism is indeed the number of executors and BoltTaskParallelism
is indeed the number of tasks.

BUT

int BoltParallelism = 3;
builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)

When you don't specify setNumTasks, Storm creates BoltParallelism number of
tasks and creates BoltParallelism number of executors as well.

*To your reply of "No. All executors run in parallel":*
When I have 3 tasks and 3 executors, I won't have to worry about
concurrency inside the Bolt, right? Because every Bolt instance is being
run in a separate thread, so all their member variables and functions are
specific to the executor.
Also, even if I have 3 tasks and 1 executor, every task is going to be run
one after the other by the executor, so there's no worry about concurrency
here either.

So in what situation would I have to worry about concurrency? AFAIK, even
in a single bolt, the execute() function has to complete before the same
execute() is invoked again.


On Tue, May 17, 2016 at 12:54 AM, Matthias J. Sax <mj...@apache.org> wrote:

> Answers inline.
>
> I guess you are not aware, that a worker run other thread next to the
> executors, too. For example, there are two threads (one for input; one
> for output), that work as "dispatcher" for incoming messages. There is a
> global input queue, and the dispatcher "forwards" incoming messages to
> the individual tasks queues such that the executors can all work in
> parallel. Same for output. Executors write into own output queues and a
> single "output thread" reads the data from there and take care of
> network transfer to downstream bolts.
>
> -Matthias
>
> On 05/16/2016 06:24 PM, Navin Ipe wrote:
> > Err...guys....I appreciate the ongoing discussion, but the original
> > question remains unanswered. The one I've asked at the very beginning of
> > this conversation. Some help would be appreciated.
> > Referring to the code I posted and as per Nathan's answer, you say that
> > int *BoltParallelism* actually represents the tasks
>
> No. *BoltParallslim* is the number of executor threads.
>
> which are the number
> > of instances of Bolts/Spouts? And BoltTaskParallelism is the number of
> > executors (OS threads)?
>
> No. This is the number of tasks.
>
> > If that's the case, then execute() will get called only after the
> > previous execute() call of a Bolt has completed. And nextTuple() will
> > get called only after the previous nextTuple() of a Spout has completed.
>
> For a single executor, yes.
>
> > That's a bit reassuring, since now one does not have to cater to
> > multithreading within a Spout/Bolt.
>
> No. All executors run in parallel.
>
> >
> >
> > On Mon, May 16, 2016 at 7:07 PM, Matthias J. Sax <mjsax@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi,
> >
> >
> >     So this is not correct:
> >     > and
> >     > the Bolt creates a task for processing each incoming Tuple.
> >
> >     Storm create exactly *BoltTaskParallelism* tasks and assigns incoming
> >     messages to tasks (according to the used connection pattern --
> shuffle,
> >     fieldsGrouping etc).
> >
> >     Futhermore:
> >
> >     > If there
> >     > are not enough tasks, then the excess Tuples are made to wait in a
> >     > queue of the executor.
> >
> >     No. There is no thing as "not enough tasks". Each task has its own
> input
> >     queue/buffer and tuple are stored there.
> >
> >     The executor threads process one or multiple tasks. Thus, if a task
> is
> >     currently "on hold", new tuples are just added to the task's input
> >     queue. If an executor picks up on of its tasks for processing, the
> >     buffered tuples of the task are processed.
> >
> >
> >     -Matthias
> >
> >     On 05/16/2016 09:07 AM, Adrien Carreira wrote:
> >     > +1
> >     >
> >     > 2016-05-16 6:40 GMT+02:00 Navin Ipe <
> navin.ipe@searchlighthealth.com <ma...@searchlighthealth.com>
> >     > <mailto:navin.ipe@searchlighthealth.com
> >     <ma...@searchlighthealth.com>>>:
> >     >
> >     >     Hi,
> >     >
> >     >     I've seen the explanations
> >     >
> >      <
> http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/
> >,
> >     >     but none of them explain it in terms of what I see in the
> code. This
> >     >     is what I understood:
> >     >
> >     >     int BoltParallelism = 3;
> >     >     int BoltTaskParallelism = 2;
> >     >     builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
> >     >                     .setNumTasks(*BoltTaskParallelism*)
> >     >
> >     >     BoltParallelism creates 3 instances of BoltA. These are the
> >     executors.
> >     >     BoltTaskParallelism allows Tuples to come into BoltA very
> >     fast, and
> >     >     the Bolt creates a task for processing each incoming Tuple. If
> >     there
> >     >     are not enough tasks, then the excess Tuples are made to wait
> in a
> >     >     queue of the executor.
> >     >
> >     >     Strange thing is that the explanation says the tasks are run
> in a
> >     >     single thread, so obviously I misunderstood something. Could
> you
> >     >     help me understand it?
> >     >
> >     >     --
> >     >     Regards,
> >     >     Navin
> >     >
> >     >
> >
> >
> >
> >
> > --
> > Regards,
> > Navin
>
>


-- 
Regards,
Navin

Re: Understanding parallelism in Storm

Posted by Navin Ipe <na...@searchlighthealth.com>.
Absolutely. Thank you very much :-)

On Wed, Jun 1, 2016 at 10:35 PM, Matthias J. Sax <mj...@apache.org> wrote:

> Hi Navin,
>
> you do not need to worry about concurrency. because each task is
> basically an individual instance of your Spout/Bolt class.
>
> Thus, it cannot happen, that execute() is called on the same spout/bolt
> object by different threads at the same time. It can only happen, that
> execute() is called on different objects at the same time by different
> threads -- but that is no concurrency issue.
>
> Of course, you must not have static member variables in you code! But
> this is a general requirement and not directly related to executor/task
> model.
>
> Hope this answers your question.
>
> -Matthias
>
> On 06/01/2016 10:21 AM, Navin Ipe wrote:
> > Thanks Matthias. I just verified this and found why there's this
> > confusion about tasks.
> >
> > In this case:
> > int BoltParallelism = 3;
> > int BoltTaskParallelism = 2;
> > builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
> >                 .setNumTasks(*BoltTaskParallelism*)
> >
> > BoltParallelism is indeed the number of executors and
> > BoltTaskParallelism is indeed the number of tasks.
> >
> > BUT
> >
> > int BoltParallelism = 3;
> > builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
> >
> > When you don't specify setNumTasks, Storm creates BoltParallelism number
> > of tasks and creates BoltParallelism number of executors as well.
> >
> > *To your reply of "/No. All executors run in parallel/":*
> > When I have 3 tasks and 3 executors, I won't have to worry about
> > concurrency inside the Bolt, right? Because every Bolt instance is being
> > run in a separate thread, so all their member variables and functions
> > are specific to the executor.
> > Also, even if I have 3 tasks and 1 executor, every task is going to be
> > run one after the other by the executor, so there's no worry about
> > concurrency here either.
> >
> > So in what situation would I have to worry about concurrency? AFAIK,
> > even in a single bolt, the execute() function has to complete before the
> > same execute() is invoked again.
> >
> >
> > On Tue, May 17, 2016 at 12:54 AM, Matthias J. Sax <mjsax@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Answers inline.
> >
> >     I guess you are not aware, that a worker run other thread next to the
> >     executors, too. For example, there are two threads (one for input;
> one
> >     for output), that work as "dispatcher" for incoming messages. There
> is a
> >     global input queue, and the dispatcher "forwards" incoming messages
> to
> >     the individual tasks queues such that the executors can all work in
> >     parallel. Same for output. Executors write into own output queues
> and a
> >     single "output thread" reads the data from there and take care of
> >     network transfer to downstream bolts.
> >
> >     -Matthias
> >
> >     On 05/16/2016 06:24 PM, Navin Ipe wrote:
> >     > Err...guys....I appreciate the ongoing discussion, but the original
> >     > question remains unanswered. The one I've asked at the very
> beginning of
> >     > this conversation. Some help would be appreciated.
> >     > Referring to the code I posted and as per Nathan's answer, you say
> that
> >     > int *BoltParallelism* actually represents the tasks
> >
> >     No. *BoltParallslim* is the number of executor threads.
> >
> >     which are the number
> >     > of instances of Bolts/Spouts? And BoltTaskParallelism is the
> number of
> >     > executors (OS threads)?
> >
> >     No. This is the number of tasks.
> >
> >     > If that's the case, then execute() will get called only after the
> >     > previous execute() call of a Bolt has completed. And nextTuple()
> will
> >     > get called only after the previous nextTuple() of a Spout has
> completed.
> >
> >     For a single executor, yes.
> >
> >     > That's a bit reassuring, since now one does not have to cater to
> >     > multithreading within a Spout/Bolt.
> >
> >     No. All executors run in parallel.
> >
> >     >
> >     >
> >     > On Mon, May 16, 2016 at 7:07 PM, Matthias J. Sax <mjsax@apache.org
> <ma...@apache.org>
> >     > <mailto:mjsax@apache.org <ma...@apache.org>>> wrote:
> >     >
> >     >     Hi,
> >     >
> >     >
> >     >     So this is not correct:
> >     >     > and
> >     >     > the Bolt creates a task for processing each incoming Tuple.
> >     >
> >     >     Storm create exactly *BoltTaskParallelism* tasks and assigns
> incoming
> >     >     messages to tasks (according to the used connection pattern --
> shuffle,
> >     >     fieldsGrouping etc).
> >     >
> >     >     Futhermore:
> >     >
> >     >     > If there
> >     >     > are not enough tasks, then the excess Tuples are made to
> wait in a
> >     >     > queue of the executor.
> >     >
> >     >     No. There is no thing as "not enough tasks". Each task has its
> own input
> >     >     queue/buffer and tuple are stored there.
> >     >
> >     >     The executor threads process one or multiple tasks. Thus, if a
> task is
> >     >     currently "on hold", new tuples are just added to the task's
> input
> >     >     queue. If an executor picks up on of its tasks for processing,
> the
> >     >     buffered tuples of the task are processed.
> >     >
> >     >
> >     >     -Matthias
> >     >
> >     >     On 05/16/2016 09:07 AM, Adrien Carreira wrote:
> >     >     > +1
> >     >     >
> >     >     > 2016-05-16 6:40 GMT+02:00 Navin Ipe <
> navin.ipe@searchlighthealth.com
> >     <ma...@searchlighthealth.com>
> >     <mailto:navin.ipe@searchlighthealth.com
> >     <ma...@searchlighthealth.com>>
> >     >     > <mailto:navin.ipe@searchlighthealth.com
> >     <ma...@searchlighthealth.com>
> >     >     <mailto:navin.ipe@searchlighthealth.com
> >     <ma...@searchlighthealth.com>>>>:
> >     >     >
> >     >     >     Hi,
> >     >     >
> >     >     >     I've seen the explanations
> >     >     >
> >     >
> >     <
> http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/
> >,
> >     >     >     but none of them explain it in terms of what I see in
> >     the code. This
> >     >     >     is what I understood:
> >     >     >
> >     >     >     int BoltParallelism = 3;
> >     >     >     int BoltTaskParallelism = 2;
> >     >     >     builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
> >     >     >                     .setNumTasks(*BoltTaskParallelism*)
> >     >     >
> >     >     >     BoltParallelism creates 3 instances of BoltA. These are
> the
> >     >     executors.
> >     >     >     BoltTaskParallelism allows Tuples to come into BoltA very
> >     >     fast, and
> >     >     >     the Bolt creates a task for processing each incoming
> >     Tuple. If
> >     >     there
> >     >     >     are not enough tasks, then the excess Tuples are made to
> >     wait in a
> >     >     >     queue of the executor.
> >     >     >
> >     >     >     Strange thing is that the explanation says the tasks are
> >     run in a
> >     >     >     single thread, so obviously I misunderstood something.
> >     Could you
> >     >     >     help me understand it?
> >     >     >
> >     >     >     --
> >     >     >     Regards,
> >     >     >     Navin
> >     >     >
> >     >     >
> >     >
> >     >
> >     >
> >     >
> >     > --
> >     > Regards,
> >     > Navin
> >
> >
> >
> >
> > --
> > Regards,
> > Navin
>
>


-- 
Regards,
Navin

Re: Understanding parallelism in Storm

Posted by "Matthias J. Sax" <mj...@apache.org>.
Hi Navin,

you do not need to worry about concurrency. because each task is
basically an individual instance of your Spout/Bolt class.

Thus, it cannot happen, that execute() is called on the same spout/bolt
object by different threads at the same time. It can only happen, that
execute() is called on different objects at the same time by different
threads -- but that is no concurrency issue.

Of course, you must not have static member variables in you code! But
this is a general requirement and not directly related to executor/task
model.

Hope this answers your question.

-Matthias

On 06/01/2016 10:21 AM, Navin Ipe wrote:
> Thanks Matthias. I just verified this and found why there's this
> confusion about tasks.
> 
> In this case:
> int BoltParallelism = 3;
> int BoltTaskParallelism = 2;
> builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
>                 .setNumTasks(*BoltTaskParallelism*)
> 
> BoltParallelism is indeed the number of executors and
> BoltTaskParallelism is indeed the number of tasks.
> 
> BUT
> 
> int BoltParallelism = 3;
> builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
> 
> When you don't specify setNumTasks, Storm creates BoltParallelism number
> of tasks and creates BoltParallelism number of executors as well.
> 
> *To your reply of "/No. All executors run in parallel/":*
> When I have 3 tasks and 3 executors, I won't have to worry about
> concurrency inside the Bolt, right? Because every Bolt instance is being
> run in a separate thread, so all their member variables and functions
> are specific to the executor.
> Also, even if I have 3 tasks and 1 executor, every task is going to be
> run one after the other by the executor, so there's no worry about
> concurrency here either.
> 
> So in what situation would I have to worry about concurrency? AFAIK,
> even in a single bolt, the execute() function has to complete before the
> same execute() is invoked again.
> 
> 
> On Tue, May 17, 2016 at 12:54 AM, Matthias J. Sax <mjsax@apache.org
> <ma...@apache.org>> wrote:
> 
>     Answers inline.
> 
>     I guess you are not aware, that a worker run other thread next to the
>     executors, too. For example, there are two threads (one for input; one
>     for output), that work as "dispatcher" for incoming messages. There is a
>     global input queue, and the dispatcher "forwards" incoming messages to
>     the individual tasks queues such that the executors can all work in
>     parallel. Same for output. Executors write into own output queues and a
>     single "output thread" reads the data from there and take care of
>     network transfer to downstream bolts.
> 
>     -Matthias
> 
>     On 05/16/2016 06:24 PM, Navin Ipe wrote:
>     > Err...guys....I appreciate the ongoing discussion, but the original
>     > question remains unanswered. The one I've asked at the very beginning of
>     > this conversation. Some help would be appreciated.
>     > Referring to the code I posted and as per Nathan's answer, you say that
>     > int *BoltParallelism* actually represents the tasks
> 
>     No. *BoltParallslim* is the number of executor threads.
> 
>     which are the number
>     > of instances of Bolts/Spouts? And BoltTaskParallelism is the number of
>     > executors (OS threads)?
> 
>     No. This is the number of tasks.
> 
>     > If that's the case, then execute() will get called only after the
>     > previous execute() call of a Bolt has completed. And nextTuple() will
>     > get called only after the previous nextTuple() of a Spout has completed.
> 
>     For a single executor, yes.
> 
>     > That's a bit reassuring, since now one does not have to cater to
>     > multithreading within a Spout/Bolt.
> 
>     No. All executors run in parallel.
> 
>     >
>     >
>     > On Mon, May 16, 2016 at 7:07 PM, Matthias J. Sax <mjsax@apache.org <ma...@apache.org>
>     > <mailto:mjsax@apache.org <ma...@apache.org>>> wrote:
>     >
>     >     Hi,
>     >
>     >
>     >     So this is not correct:
>     >     > and
>     >     > the Bolt creates a task for processing each incoming Tuple.
>     >
>     >     Storm create exactly *BoltTaskParallelism* tasks and assigns incoming
>     >     messages to tasks (according to the used connection pattern -- shuffle,
>     >     fieldsGrouping etc).
>     >
>     >     Futhermore:
>     >
>     >     > If there
>     >     > are not enough tasks, then the excess Tuples are made to wait in a
>     >     > queue of the executor.
>     >
>     >     No. There is no thing as "not enough tasks". Each task has its own input
>     >     queue/buffer and tuple are stored there.
>     >
>     >     The executor threads process one or multiple tasks. Thus, if a task is
>     >     currently "on hold", new tuples are just added to the task's input
>     >     queue. If an executor picks up on of its tasks for processing, the
>     >     buffered tuples of the task are processed.
>     >
>     >
>     >     -Matthias
>     >
>     >     On 05/16/2016 09:07 AM, Adrien Carreira wrote:
>     >     > +1
>     >     >
>     >     > 2016-05-16 6:40 GMT+02:00 Navin Ipe <navin.ipe@searchlighthealth.com
>     <ma...@searchlighthealth.com>
>     <mailto:navin.ipe@searchlighthealth.com
>     <ma...@searchlighthealth.com>>
>     >     > <mailto:navin.ipe@searchlighthealth.com
>     <ma...@searchlighthealth.com>
>     >     <mailto:navin.ipe@searchlighthealth.com
>     <ma...@searchlighthealth.com>>>>:
>     >     >
>     >     >     Hi,
>     >     >
>     >     >     I've seen the explanations
>     >     >
>     >     
>     <http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/>,
>     >     >     but none of them explain it in terms of what I see in
>     the code. This
>     >     >     is what I understood:
>     >     >
>     >     >     int BoltParallelism = 3;
>     >     >     int BoltTaskParallelism = 2;
>     >     >     builder.setBolt("bolt1", new BoltA(), *BoltParallelism*)
>     >     >                     .setNumTasks(*BoltTaskParallelism*)
>     >     >
>     >     >     BoltParallelism creates 3 instances of BoltA. These are the
>     >     executors.
>     >     >     BoltTaskParallelism allows Tuples to come into BoltA very
>     >     fast, and
>     >     >     the Bolt creates a task for processing each incoming
>     Tuple. If
>     >     there
>     >     >     are not enough tasks, then the excess Tuples are made to
>     wait in a
>     >     >     queue of the executor.
>     >     >
>     >     >     Strange thing is that the explanation says the tasks are
>     run in a
>     >     >     single thread, so obviously I misunderstood something.
>     Could you
>     >     >     help me understand it?
>     >     >
>     >     >     --
>     >     >     Regards,
>     >     >     Navin
>     >     >
>     >     >
>     >
>     >
>     >
>     >
>     > --
>     > Regards,
>     > Navin
> 
> 
> 
> 
> -- 
> Regards,
> Navin