You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Spico Florin <sp...@gmail.com> on 2014/08/01 06:35:28 UTC

Implementing a barrier mechanism in storm

Hello!
  I have a case study where the same message (identified by an id) is
spread over a couple of processing bolts and a final bolt should act as a
barrier. This final bolt should do its job on the message ID only when all
the upfront bolts have finished their process on the message.

As sketch is bellow


Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)        |
                                              ->process bolt 2(msgId1,
payloadb2)  |->final bolt(msg1,finalp)
                                              ->process bolt 3(msgId1,
payloadb3)  |

So the final bolt should not start a work on a message id till the message
was not processed by all the 3 processing bolts.
  My question are:
1. Can be these case viable for storm?
2. If the answer for the first question is yes, how can I achieve this
request?
3. Is possible to achieve this request without using trident?
I look forward for your answers.
Thanks,
  Florin

Re: Implementing a barrier mechanism in storm

Posted by Anuj Kumar <an...@gmail.com>.
That's true. Group by ID field and hold the processed tuples in
a RotatingMap at final bolt till all are received. Rotate the map with tick
tuples to expire unused tuples.


On Fri, Aug 1, 2014 at 10:14 AM, Varun Vijayaraghavan <va...@gmail.com>
wrote:

> That's interesting. Note that I have not used such a pattern before - and
> have done something similar. I have not used trident - so this probably
> will not answer your last question completely :)
>
> If you set up the topology such that links between bolt {1, 2, 3} and
> final bolt is stream grouped by "msgId" - you could keep the partially
> processed results in memory (or in a persisted state somewhere) - till you
> see the processed result for all the bolts.
>
> I would also expire msgIds which have not seen further results for beyond
> a certain threshold time.
>
> What do you think?
>
>
>
> On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin <sp...@gmail.com>
> wrote:
>
>> Hello!
>>   I have a case study where the same message (identified by an id) is
>> spread over a couple of processing bolts and a final bolt should act as a
>> barrier. This final bolt should do its job on the message ID only when all
>> the upfront bolts have finished their process on the message.
>>
>> As sketch is bellow
>>
>>
>> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)        |
>>                                               ->process bolt 2(msgId1,
>> payloadb2)  |->final bolt(msg1,finalp)
>>                                               ->process bolt 3(msgId1,
>> payloadb3)  |
>>
>> So the final bolt should not start a work on a message id till the
>> message was not processed by all the 3 processing bolts.
>>   My question are:
>> 1. Can be these case viable for storm?
>> 2. If the answer for the first question is yes, how can I achieve this
>> request?
>> 3. Is possible to achieve this request without using trident?
>> I look forward for your answers.
>> Thanks,
>>   Florin
>>
>
>
>
> --
> - varun :)
>

Re: Implementing a barrier mechanism in storm

Posted by Michael Rose <mi...@fullcontact.com>.
In the prepare method you receive a copy of the topology context which can
tell you all of the stream-components you're subscribed to.

You could make it a static field or just fields group on messageId
On Aug 1, 2014 12:26 AM, "Spico Florin" <sp...@gmail.com> wrote:

> Hello!
>   Thanks for your reply. I was thinking to use such the cache mechanism
> (the guava cache as a static field in the final bolt). The question that I
> have now, is how do you keep track of the bolts upfront? Suppose that you
> have 3 bolts then you are counting the number of bolts that were processing
> one message Map<msgId,count_of_bolts>?
> Can you send the name of the bolts that have been processed the message to
> the final bolt, and in the final bolt to check if the the list of all
> processing bolts is the same?
>
> What is the best approach here?
>
> Thanks .
> Regards,
>   Florin
>
> On Fri, Aug 1, 2014 at 7:51 AM, Michael Rose <mi...@fullcontact.com>
> wrote:
>
>> It's another case of a streaming join. I've done this before, there
>> aren't too many gotchas, other than you need a datastructure which purges
>> stale unresolved joins beyond the tuple timeout time (I used a Guava cache
>> for this).
>>
>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>> michael@fullcontact.com
>>
>>
>> On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan <
>> varun.kvv@gmail.com> wrote:
>>
>>> That's interesting. Note that I have not used such a pattern before -
>>> and have done something similar. I have not used trident - so this probably
>>> will not answer your last question completely :)
>>>
>>> If you set up the topology such that links between bolt {1, 2, 3} and
>>> final bolt is stream grouped by "msgId" - you could keep the partially
>>> processed results in memory (or in a persisted state somewhere) - till you
>>> see the processed result for all the bolts.
>>>
>>> I would also expire msgIds which have not seen further results for
>>> beyond a certain threshold time.
>>>
>>> What do you think?
>>>
>>>
>>>
>>> On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin <sp...@gmail.com>
>>> wrote:
>>>
>>>> Hello!
>>>>   I have a case study where the same message (identified by an id) is
>>>> spread over a couple of processing bolts and a final bolt should act as a
>>>> barrier. This final bolt should do its job on the message ID only when all
>>>> the upfront bolts have finished their process on the message.
>>>>
>>>> As sketch is bellow
>>>>
>>>>
>>>> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)        |
>>>>                                               ->process bolt 2(msgId1,
>>>> payloadb2)  |->final bolt(msg1,finalp)
>>>>                                               ->process bolt 3(msgId1,
>>>> payloadb3)  |
>>>>
>>>> So the final bolt should not start a work on a message id till the
>>>> message was not processed by all the 3 processing bolts.
>>>>   My question are:
>>>> 1. Can be these case viable for storm?
>>>> 2. If the answer for the first question is yes, how can I achieve this
>>>> request?
>>>> 3. Is possible to achieve this request without using trident?
>>>> I look forward for your answers.
>>>> Thanks,
>>>>   Florin
>>>>
>>>
>>>
>>>
>>> --
>>> - varun :)
>>>
>>
>>
>

Re: Implementing a barrier mechanism in storm

Posted by Spico Florin <sp...@gmail.com>.
Hello!
  Thanks for your reply. I was thinking to use such the cache mechanism
(the guava cache as a static field in the final bolt). The question that I
have now, is how do you keep track of the bolts upfront? Suppose that you
have 3 bolts then you are counting the number of bolts that were processing
one message Map<msgId,count_of_bolts>?
Can you send the name of the bolts that have been processed the message to
the final bolt, and in the final bolt to check if the the list of all
processing bolts is the same?

What is the best approach here?

Thanks .
Regards,
  Florin

On Fri, Aug 1, 2014 at 7:51 AM, Michael Rose <mi...@fullcontact.com>
wrote:

> It's another case of a streaming join. I've done this before, there aren't
> too many gotchas, other than you need a datastructure which purges stale
> unresolved joins beyond the tuple timeout time (I used a Guava cache for
> this).
>
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
>
> On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan <
> varun.kvv@gmail.com> wrote:
>
>> That's interesting. Note that I have not used such a pattern before - and
>> have done something similar. I have not used trident - so this probably
>> will not answer your last question completely :)
>>
>> If you set up the topology such that links between bolt {1, 2, 3} and
>> final bolt is stream grouped by "msgId" - you could keep the partially
>> processed results in memory (or in a persisted state somewhere) - till you
>> see the processed result for all the bolts.
>>
>> I would also expire msgIds which have not seen further results for beyond
>> a certain threshold time.
>>
>> What do you think?
>>
>>
>>
>> On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin <sp...@gmail.com>
>> wrote:
>>
>>> Hello!
>>>   I have a case study where the same message (identified by an id) is
>>> spread over a couple of processing bolts and a final bolt should act as a
>>> barrier. This final bolt should do its job on the message ID only when all
>>> the upfront bolts have finished their process on the message.
>>>
>>> As sketch is bellow
>>>
>>>
>>> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)        |
>>>                                               ->process bolt 2(msgId1,
>>> payloadb2)  |->final bolt(msg1,finalp)
>>>                                               ->process bolt 3(msgId1,
>>> payloadb3)  |
>>>
>>> So the final bolt should not start a work on a message id till the
>>> message was not processed by all the 3 processing bolts.
>>>   My question are:
>>> 1. Can be these case viable for storm?
>>> 2. If the answer for the first question is yes, how can I achieve this
>>> request?
>>> 3. Is possible to achieve this request without using trident?
>>> I look forward for your answers.
>>> Thanks,
>>>   Florin
>>>
>>
>>
>>
>> --
>> - varun :)
>>
>
>

Re: Implementing a barrier mechanism in storm

Posted by Michael Rose <mi...@fullcontact.com>.
It's another case of a streaming join. I've done this before, there aren't
too many gotchas, other than you need a datastructure which purges stale
unresolved joins beyond the tuple timeout time (I used a Guava cache for
this).

Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com


On Thu, Jul 31, 2014 at 10:44 PM, Varun Vijayaraghavan <va...@gmail.com>
wrote:

> That's interesting. Note that I have not used such a pattern before - and
> have done something similar. I have not used trident - so this probably
> will not answer your last question completely :)
>
> If you set up the topology such that links between bolt {1, 2, 3} and
> final bolt is stream grouped by "msgId" - you could keep the partially
> processed results in memory (or in a persisted state somewhere) - till you
> see the processed result for all the bolts.
>
> I would also expire msgIds which have not seen further results for beyond
> a certain threshold time.
>
> What do you think?
>
>
>
> On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin <sp...@gmail.com>
> wrote:
>
>> Hello!
>>   I have a case study where the same message (identified by an id) is
>> spread over a couple of processing bolts and a final bolt should act as a
>> barrier. This final bolt should do its job on the message ID only when all
>> the upfront bolts have finished their process on the message.
>>
>> As sketch is bellow
>>
>>
>> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)        |
>>                                               ->process bolt 2(msgId1,
>> payloadb2)  |->final bolt(msg1,finalp)
>>                                               ->process bolt 3(msgId1,
>> payloadb3)  |
>>
>> So the final bolt should not start a work on a message id till the
>> message was not processed by all the 3 processing bolts.
>>   My question are:
>> 1. Can be these case viable for storm?
>> 2. If the answer for the first question is yes, how can I achieve this
>> request?
>> 3. Is possible to achieve this request without using trident?
>> I look forward for your answers.
>> Thanks,
>>   Florin
>>
>
>
>
> --
> - varun :)
>

Re: Implementing a barrier mechanism in storm

Posted by Varun Vijayaraghavan <va...@gmail.com>.
That's interesting. Note that I have not used such a pattern before - and
have done something similar. I have not used trident - so this probably
will not answer your last question completely :)

If you set up the topology such that links between bolt {1, 2, 3} and final
bolt is stream grouped by "msgId" - you could keep the partially processed
results in memory (or in a persisted state somewhere) - till you see the
processed result for all the bolts.

I would also expire msgIds which have not seen further results for beyond a
certain threshold time.

What do you think?



On Fri, Aug 1, 2014 at 12:35 AM, Spico Florin <sp...@gmail.com> wrote:

> Hello!
>   I have a case study where the same message (identified by an id) is
> spread over a couple of processing bolts and a final bolt should act as a
> barrier. This final bolt should do its job on the message ID only when all
> the upfront bolts have finished their process on the message.
>
> As sketch is bellow
>
>
> Spout -> (msgid1, payload)->process bolt 1 (msgId1, payloadb1)        |
>                                               ->process bolt 2(msgId1,
> payloadb2)  |->final bolt(msg1,finalp)
>                                               ->process bolt 3(msgId1,
> payloadb3)  |
>
> So the final bolt should not start a work on a message id till the message
> was not processed by all the 3 processing bolts.
>   My question are:
> 1. Can be these case viable for storm?
> 2. If the answer for the first question is yes, how can I achieve this
> request?
> 3. Is possible to achieve this request without using trident?
> I look forward for your answers.
> Thanks,
>   Florin
>



-- 
- varun :)