You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Navin Ipe <na...@searchlighthealth.com> on 2016/05/08 11:12:12 UTC

How do you run a task in a separate thread in a Bolt?

Hi,

I've wanted to do this and this post confirms the idea:
http://stackoverflow.com/a/36106683/453673
But when I have a spout that constantly has nextTuple() being called by
Storm and I have a bolt that constantly has execute() being called whenever
it receives a tuple, how do I program the Spout to have a separate thread
which reads from MongoDB or for the bolt to have a separate thread that
writes to DB?

If Storm is in complete charge of calling nextTuple() and execute(), then
how do I start my own thread which does something? This is important,
because I don't want my bolt to spend time writing to DB, when it should
actually be busy receiving and processing hundreds of tuples.

-- 
Regards,
Navin

Re: How do you run a task in a separate thread in a Bolt?

Posted by Enno Shioji <es...@gmail.com>.
Oh and finally, another route you can take is to simply bump up the
concurrency using Storm's setting (assuming that you need more concurrency)
and just tolerate blocking Storm's thread while doing IO. While it may not
be the most elegant or efficient solution it's easy and will probably work
in most cases...

On Sun, May 8, 2016 at 5:17 PM, Enno Shioji <es...@gmail.com> wrote:

> Just remembered; I'm not 100% sure if it's still the case but the output
> collector (where you emit the tuples generated) is not thread safe, so you
> must synchronize on it when you use your own threads. I.e. like this:
>
> synchronized(collector){
>     collector.emit(newTuple)
> }
>
>
> On Sun, May 8, 2016 at 5:11 PM, Enno Shioji <es...@gmail.com> wrote:
>
>> > Yes, I considered a threadpool, but the confusion was about where to
>> declare them, initiate a thread run and when to join the threads. Any code
>> samples or pseudocode that could help?
>>
>> You would mark the thread pool transient and allocate/shutdown using the
>> lifecycle callback methods provided by Storm. If you mean Thread.join, you
>> shouldn't be using raw threads. There is really nothing special about using
>> threads in Storm so you should be able to refer to generic threading
>> documentation.
>>
>>
>> > Besides, there's this thread where a person advises
>> <https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E> not
>> using a thread pool.
>>
>> Storm comes with its own concurrency scheme so before using a raw thread
>> pool one should ask if it's really justified. Lots of computation
>> frameworks discourage use of your own thread pool because frequently the
>> problems are better solved by the concurrency mechanism provided by the
>> framework. In this particular case i.e. not wanting to block the Storm's
>> thread while you perform network IO is IMO a justifiable.
>>
>> >  What exactly is the backpressure
>> <https://issues.apache.org/jira/browse/STORM-431> concept? Is it
>> something about having enough of bolts to process the tuples the spout
>> emits so that acks would be received by the spout on time?
>>
>> It's a mechanism to avoid killing the topology by overloading it. Very
>> common example is OOME due to too many pending tasks.
>> In this particular case, all it means is that you should let Storm know
>> that the processing is falling behind by blocking the Storm's thread. E.g.
>> you'd submit tasks to your thread pool and if you see too many tasks being
>> queued up, you'd start blocking Storm's thread so that it knows not to (or
>> rather unable to) send more tuples until you have finished a few tasks and
>> have capacity again. The easiest way to do this is to use a Thread Pool
>> with a bounded queue + the Caller Runs Policy, to which you let Storm's
>> thread submit the task. That way, when the queue is full Storm's thread
>> will be blocked until the task it tried to submit is finished.
>>
>>
>>
>> On Sun, May 8, 2016 at 1:56 PM, Navin Ipe <
>> navin.ipe@searchlighthealth.com> wrote:
>>
>>> Yes, I considered a threadpool, but the confusion was about where to
>>> declare them, initiate a thread run and when to join the threads. Any code
>>> samples or pseudocode that could help?
>>> Besides, there's this thread where a person advises
>>> <https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E>
>>> not using a threadpool.
>>>
>>> What exactly is the backpressure
>>> <https://issues.apache.org/jira/browse/STORM-431> concept? Is it
>>> something about having enough of bolts to process the tuples the spout
>>> emits so that acks would be received by the spout on time?
>>>
>>> On Sun, May 8, 2016 at 5:20 PM, Enno Shioji <es...@gmail.com> wrote:
>>>
>>>> There's nothing that keeps you from simply having a thread pool in your
>>>> bolts. Or you could go for an async DB client.
>>>>
>>>> You will have to be careful about providing back pressure (e.g. by
>>>> using a bounded queue).
>>>>
>>>> On Sun, May 8, 2016 at 12:12 PM, Navin Ipe <
>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've wanted to do this and this post confirms the idea:
>>>>> http://stackoverflow.com/a/36106683/453673
>>>>> But when I have a spout that constantly has nextTuple() being called
>>>>> by Storm and I have a bolt that constantly has execute() being called
>>>>> whenever it receives a tuple, how do I program the Spout to have a separate
>>>>> thread which reads from MongoDB or for the bolt to have a separate thread
>>>>> that writes to DB?
>>>>>
>>>>> If Storm is in complete charge of calling nextTuple() and execute(),
>>>>> then how do I start my own thread which does something? This is important,
>>>>> because I don't want my bolt to spend time writing to DB, when it should
>>>>> actually be busy receiving and processing hundreds of tuples.
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Navin
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>>
>

Re: How do you run a task in a separate thread in a Bolt?

Posted by Enno Shioji <es...@gmail.com>.
Just remembered; I'm not 100% sure if it's still the case but the output
collector (where you emit the tuples generated) is not thread safe, so you
must synchronize on it when you use your own threads. I.e. like this:

synchronized(collector){
    collector.emit(newTuple)
}


On Sun, May 8, 2016 at 5:11 PM, Enno Shioji <es...@gmail.com> wrote:

> > Yes, I considered a threadpool, but the confusion was about where to
> declare them, initiate a thread run and when to join the threads. Any code
> samples or pseudocode that could help?
>
> You would mark the thread pool transient and allocate/shutdown using the
> lifecycle callback methods provided by Storm. If you mean Thread.join, you
> shouldn't be using raw threads. There is really nothing special about using
> threads in Storm so you should be able to refer to generic threading
> documentation.
>
>
> > Besides, there's this thread where a person advises
> <https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E> not
> using a thread pool.
>
> Storm comes with its own concurrency scheme so before using a raw thread
> pool one should ask if it's really justified. Lots of computation
> frameworks discourage use of your own thread pool because frequently the
> problems are better solved by the concurrency mechanism provided by the
> framework. In this particular case i.e. not wanting to block the Storm's
> thread while you perform network IO is IMO a justifiable.
>
> >  What exactly is the backpressure
> <https://issues.apache.org/jira/browse/STORM-431> concept? Is it
> something about having enough of bolts to process the tuples the spout
> emits so that acks would be received by the spout on time?
>
> It's a mechanism to avoid killing the topology by overloading it. Very
> common example is OOME due to too many pending tasks.
> In this particular case, all it means is that you should let Storm know
> that the processing is falling behind by blocking the Storm's thread. E.g.
> you'd submit tasks to your thread pool and if you see too many tasks being
> queued up, you'd start blocking Storm's thread so that it knows not to (or
> rather unable to) send more tuples until you have finished a few tasks and
> have capacity again. The easiest way to do this is to use a Thread Pool
> with a bounded queue + the Caller Runs Policy, to which you let Storm's
> thread submit the task. That way, when the queue is full Storm's thread
> will be blocked until the task it tried to submit is finished.
>
>
>
> On Sun, May 8, 2016 at 1:56 PM, Navin Ipe <navin.ipe@searchlighthealth.com
> > wrote:
>
>> Yes, I considered a threadpool, but the confusion was about where to
>> declare them, initiate a thread run and when to join the threads. Any code
>> samples or pseudocode that could help?
>> Besides, there's this thread where a person advises
>> <https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E>
>> not using a threadpool.
>>
>> What exactly is the backpressure
>> <https://issues.apache.org/jira/browse/STORM-431> concept? Is it
>> something about having enough of bolts to process the tuples the spout
>> emits so that acks would be received by the spout on time?
>>
>> On Sun, May 8, 2016 at 5:20 PM, Enno Shioji <es...@gmail.com> wrote:
>>
>>> There's nothing that keeps you from simply having a thread pool in your
>>> bolts. Or you could go for an async DB client.
>>>
>>> You will have to be careful about providing back pressure (e.g. by using
>>> a bounded queue).
>>>
>>> On Sun, May 8, 2016 at 12:12 PM, Navin Ipe <
>>> navin.ipe@searchlighthealth.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've wanted to do this and this post confirms the idea:
>>>> http://stackoverflow.com/a/36106683/453673
>>>> But when I have a spout that constantly has nextTuple() being called by
>>>> Storm and I have a bolt that constantly has execute() being called whenever
>>>> it receives a tuple, how do I program the Spout to have a separate thread
>>>> which reads from MongoDB or for the bolt to have a separate thread that
>>>> writes to DB?
>>>>
>>>> If Storm is in complete charge of calling nextTuple() and execute(),
>>>> then how do I start my own thread which does something? This is important,
>>>> because I don't want my bolt to spend time writing to DB, when it should
>>>> actually be busy receiving and processing hundreds of tuples.
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>
>

Re: How do you run a task in a separate thread in a Bolt?

Posted by Enno Shioji <es...@gmail.com>.
> Yes, I considered a threadpool, but the confusion was about where to
declare them, initiate a thread run and when to join the threads. Any code
samples or pseudocode that could help?

You would mark the thread pool transient and allocate/shutdown using the
lifecycle callback methods provided by Storm. If you mean Thread.join, you
shouldn't be using raw threads. There is really nothing special about using
threads in Storm so you should be able to refer to generic threading
documentation.


> Besides, there's this thread where a person advises
<https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E>
not
using a thread pool.

Storm comes with its own concurrency scheme so before using a raw thread
pool one should ask if it's really justified. Lots of computation
frameworks discourage use of your own thread pool because frequently the
problems are better solved by the concurrency mechanism provided by the
framework. In this particular case i.e. not wanting to block the Storm's
thread while you perform network IO is IMO a justifiable.

>  What exactly is the backpressure
<https://issues.apache.org/jira/browse/STORM-431> concept? Is it something
about having enough of bolts to process the tuples the spout emits so that
acks would be received by the spout on time?

It's a mechanism to avoid killing the topology by overloading it. Very
common example is OOME due to too many pending tasks.
In this particular case, all it means is that you should let Storm know
that the processing is falling behind by blocking the Storm's thread. E.g.
you'd submit tasks to your thread pool and if you see too many tasks being
queued up, you'd start blocking Storm's thread so that it knows not to (or
rather unable to) send more tuples until you have finished a few tasks and
have capacity again. The easiest way to do this is to use a Thread Pool
with a bounded queue + the Caller Runs Policy, to which you let Storm's
thread submit the task. That way, when the queue is full Storm's thread
will be blocked until the task it tried to submit is finished.



On Sun, May 8, 2016 at 1:56 PM, Navin Ipe <na...@searchlighthealth.com>
wrote:

> Yes, I considered a threadpool, but the confusion was about where to
> declare them, initiate a thread run and when to join the threads. Any code
> samples or pseudocode that could help?
> Besides, there's this thread where a person advises
> <https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E>
> not using a threadpool.
>
> What exactly is the backpressure
> <https://issues.apache.org/jira/browse/STORM-431> concept? Is it
> something about having enough of bolts to process the tuples the spout
> emits so that acks would be received by the spout on time?
>
> On Sun, May 8, 2016 at 5:20 PM, Enno Shioji <es...@gmail.com> wrote:
>
>> There's nothing that keeps you from simply having a thread pool in your
>> bolts. Or you could go for an async DB client.
>>
>> You will have to be careful about providing back pressure (e.g. by using
>> a bounded queue).
>>
>> On Sun, May 8, 2016 at 12:12 PM, Navin Ipe <
>> navin.ipe@searchlighthealth.com> wrote:
>>
>>> Hi,
>>>
>>> I've wanted to do this and this post confirms the idea:
>>> http://stackoverflow.com/a/36106683/453673
>>> But when I have a spout that constantly has nextTuple() being called by
>>> Storm and I have a bolt that constantly has execute() being called whenever
>>> it receives a tuple, how do I program the Spout to have a separate thread
>>> which reads from MongoDB or for the bolt to have a separate thread that
>>> writes to DB?
>>>
>>> If Storm is in complete charge of calling nextTuple() and execute(),
>>> then how do I start my own thread which does something? This is important,
>>> because I don't want my bolt to spend time writing to DB, when it should
>>> actually be busy receiving and processing hundreds of tuples.
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>>
>
>
> --
> Regards,
> Navin
>

Re: How do you run a task in a separate thread in a Bolt?

Posted by Navin Ipe <na...@searchlighthealth.com>.
Yes, I considered a threadpool, but the confusion was about where to
declare them, initiate a thread run and when to join the threads. Any code
samples or pseudocode that could help?
Besides, there's this thread where a person advises
<https://mail-archives.apache.org/mod_mbox/storm-user/201311.mbox/%3CCAAYLz+pUZ44GNsNNJ9O5hjTr2rZLW=CKM=FGvcfwBnw613r1qQ@mail.gmail.com%3E>
not using a threadpool.

What exactly is the backpressure
<https://issues.apache.org/jira/browse/STORM-431> concept? Is it something
about having enough of bolts to process the tuples the spout emits so that
acks would be received by the spout on time?

On Sun, May 8, 2016 at 5:20 PM, Enno Shioji <es...@gmail.com> wrote:

> There's nothing that keeps you from simply having a thread pool in your
> bolts. Or you could go for an async DB client.
>
> You will have to be careful about providing back pressure (e.g. by using a
> bounded queue).
>
> On Sun, May 8, 2016 at 12:12 PM, Navin Ipe <
> navin.ipe@searchlighthealth.com> wrote:
>
>> Hi,
>>
>> I've wanted to do this and this post confirms the idea:
>> http://stackoverflow.com/a/36106683/453673
>> But when I have a spout that constantly has nextTuple() being called by
>> Storm and I have a bolt that constantly has execute() being called whenever
>> it receives a tuple, how do I program the Spout to have a separate thread
>> which reads from MongoDB or for the bolt to have a separate thread that
>> writes to DB?
>>
>> If Storm is in complete charge of calling nextTuple() and execute(), then
>> how do I start my own thread which does something? This is important,
>> because I don't want my bolt to spend time writing to DB, when it should
>> actually be busy receiving and processing hundreds of tuples.
>>
>> --
>> Regards,
>> Navin
>>
>
>


-- 
Regards,
Navin

Re: How do you run a task in a separate thread in a Bolt?

Posted by Enno Shioji <es...@gmail.com>.
There's nothing that keeps you from simply having a thread pool in your
bolts. Or you could go for an async DB client.

You will have to be careful about providing back pressure (e.g. by using a
bounded queue).

On Sun, May 8, 2016 at 12:12 PM, Navin Ipe <na...@searchlighthealth.com>
wrote:

> Hi,
>
> I've wanted to do this and this post confirms the idea:
> http://stackoverflow.com/a/36106683/453673
> But when I have a spout that constantly has nextTuple() being called by
> Storm and I have a bolt that constantly has execute() being called whenever
> it receives a tuple, how do I program the Spout to have a separate thread
> which reads from MongoDB or for the bolt to have a separate thread that
> writes to DB?
>
> If Storm is in complete charge of calling nextTuple() and execute(), then
> how do I start my own thread which does something? This is important,
> because I don't want my bolt to spend time writing to DB, when it should
> actually be busy receiving and processing hundreds of tuples.
>
> --
> Regards,
> Navin
>