You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jason Kusar <ja...@kusar.net> on 2015/04/29 02:18:09 UTC

How to merge related tuples from multiple streams

Hi,

I'm currently working on building an ETL system using storm. Approximately
30% of the incoming records have binary attachments which need to be virus
scanned. A single record can have one or more attachments. My initial
thought was to build a topology with two outputs from the spout both of
which eventually feed a downstream bolt. I've attached a simple diagram.
Hopefully it comes through on the list.


The spout would output tuples to the metadata transform on the default
stream. If it came across a record that had attachments, it would output
one or more additional tuples with the same ID to the Virus scan stream.
Obviously the diagram is simplified as the Metadata transform might involve
many steps, but regardless it's safe to assume that the time required for
the virus scanner is likely significantly higher than the transform stream.
I would like for records not having attachments to be able to keep flowing
through the system without being slowed down by those records that do
happen to have attachments.

>From looking at the CoordinatedBolt, it looks like it probably does exactly
what I'm looking for, but I'm not sure. It would join the tuples from the
two streams back together and deliver them to the dissem bolt as a batch to
be processed from there. Am I viewing this right or am I completely off
base? I can't find a lot of examples of CoordinatedBolts and there aren't
any real comments in the code explaining what it's doing.

I feel like Direct Groupings might come into play here as well, but the
link from the Documentation Manual page gets a 404, so I was unable to find
more details on that.

 If I'm completely off, is there an example implementation that does
something similar to what I'm trying to do? Specifically, is there an
example of something outputting a variable number of tuples that all get
grouped back together somewhere down the line?

Thanks!
--Jason

Re: How to merge related tuples from multiple streams

Posted by Nathan Leung <nc...@gmail.com>.
What you have described is correct.
On Apr 29, 2015 7:10 AM, "Jason Kusar" <ja...@kusar.net> wrote:

> Hmmm, ok, I think that makes sense. So how would the buffering work? Can I
> just stash the tuple in memory and return without acking when I still need
> more? Then once I have all of them, process and ack them all at once? For
> some reason I thought it would be more complicated than that, but if it's
> that simple, great! :-)
>
> Thanks!
>
> On Wed, Apr 29, 2015 at 6:15 AM Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
>
>> Hi,
>>
>> I am not familiar with CoordinatedBolt, but it seems not to do what you
>> want (it seem to do DRPC stuff)
>>
>>
>> http://www.pixelmachine.org/storm/2012/01/03/How-CoordinatedBolt-Works.html
>>
>> To me, it seems as you would like to perform a simple join... For this,
>> you need to buffer all incoming meta-data tuples (that are related to
>> messages with attachments) in Dissem until the join is complete. For
>> this, you need to know (for each tuple coming from meta-data-transform)
>> how many attachment-tuples are expected from virus scanner. But Spout
>> can simple add this information. If the attachment-count-attribute is
>> zero, the message can be processed immediately.
>>
>> Does this make sense to you?
>>
>> However, I don't understand why you want to use direct-grouping? Using
>> fields-grouping on the message-id attribute should work for you.
>>
>>
>> -Matthias
>>
>>
>> On 04/29/2015 02:18 AM, Jason Kusar wrote:
>> > Hi,
>> >
>> > I'm currently working on building an ETL system using storm.
>> > Approximately 30% of the incoming records have binary attachments which
>> > need to be virus scanned. A single record can have one or more
>> > attachments. My initial thought was to build a topology with two outputs
>> > from the spout both of which eventually feed a downstream bolt. I've
>> > attached a simple diagram. Hopefully it comes through on the list.
>> >
>> >
>> > The spout would output tuples to the metadata transform on the default
>> > stream. If it came across a record that had attachments, it would output
>> > one or more additional tuples with the same ID to the Virus scan stream.
>> > Obviously the diagram is simplified as the Metadata transform might
>> > involve many steps, but regardless it's safe to assume that the time
>> > required for the virus scanner is likely significantly higher than the
>> > transform stream. I would like for records not having attachments to be
>> > able to keep flowing through the system without being slowed down by
>> > those records that do happen to have attachments.
>> >
>> > From looking at the CoordinatedBolt, it looks like it probably does
>> > exactly what I'm looking for, but I'm not sure. It would join the tuples
>> > from the two streams back together and deliver them to the dissem bolt
>> > as a batch to be processed from there. Am I viewing this right or am I
>> > completely off base? I can't find a lot of examples of CoordinatedBolts
>> > and there aren't any real comments in the code explaining what it's
>> doing.
>> >
>> > I feel like Direct Groupings might come into play here as well, but the
>> > link from the Documentation Manual page gets a 404, so I was unable to
>> > find more details on that.
>> >
>> >  If I'm completely off, is there an example implementation that does
>> > something similar to what I'm trying to do? Specifically, is there an
>> > example of something outputting a variable number of tuples that all get
>> > grouped back together somewhere down the line?
>> >
>> > Thanks!
>> > --Jason
>>
>>

Re: How to merge related tuples from multiple streams

Posted by Jason Kusar <ja...@kusar.net>.
Hmmm, ok, I think that makes sense. So how would the buffering work? Can I
just stash the tuple in memory and return without acking when I still need
more? Then once I have all of them, process and ack them all at once? For
some reason I thought it would be more complicated than that, but if it's
that simple, great! :-)

Thanks!

On Wed, Apr 29, 2015 at 6:15 AM Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Hi,
>
> I am not familiar with CoordinatedBolt, but it seems not to do what you
> want (it seem to do DRPC stuff)
>
> http://www.pixelmachine.org/storm/2012/01/03/How-CoordinatedBolt-Works.html
>
> To me, it seems as you would like to perform a simple join... For this,
> you need to buffer all incoming meta-data tuples (that are related to
> messages with attachments) in Dissem until the join is complete. For
> this, you need to know (for each tuple coming from meta-data-transform)
> how many attachment-tuples are expected from virus scanner. But Spout
> can simple add this information. If the attachment-count-attribute is
> zero, the message can be processed immediately.
>
> Does this make sense to you?
>
> However, I don't understand why you want to use direct-grouping? Using
> fields-grouping on the message-id attribute should work for you.
>
>
> -Matthias
>
>
> On 04/29/2015 02:18 AM, Jason Kusar wrote:
> > Hi,
> >
> > I'm currently working on building an ETL system using storm.
> > Approximately 30% of the incoming records have binary attachments which
> > need to be virus scanned. A single record can have one or more
> > attachments. My initial thought was to build a topology with two outputs
> > from the spout both of which eventually feed a downstream bolt. I've
> > attached a simple diagram. Hopefully it comes through on the list.
> >
> >
> > The spout would output tuples to the metadata transform on the default
> > stream. If it came across a record that had attachments, it would output
> > one or more additional tuples with the same ID to the Virus scan stream.
> > Obviously the diagram is simplified as the Metadata transform might
> > involve many steps, but regardless it's safe to assume that the time
> > required for the virus scanner is likely significantly higher than the
> > transform stream. I would like for records not having attachments to be
> > able to keep flowing through the system without being slowed down by
> > those records that do happen to have attachments.
> >
> > From looking at the CoordinatedBolt, it looks like it probably does
> > exactly what I'm looking for, but I'm not sure. It would join the tuples
> > from the two streams back together and deliver them to the dissem bolt
> > as a batch to be processed from there. Am I viewing this right or am I
> > completely off base? I can't find a lot of examples of CoordinatedBolts
> > and there aren't any real comments in the code explaining what it's
> doing.
> >
> > I feel like Direct Groupings might come into play here as well, but the
> > link from the Documentation Manual page gets a 404, so I was unable to
> > find more details on that.
> >
> >  If I'm completely off, is there an example implementation that does
> > something similar to what I'm trying to do? Specifically, is there an
> > example of something outputting a variable number of tuples that all get
> > grouped back together somewhere down the line?
> >
> > Thanks!
> > --Jason
>
>

Re: How to merge related tuples from multiple streams

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Hi,

I am not familiar with CoordinatedBolt, but it seems not to do what you
want (it seem to do DRPC stuff)

http://www.pixelmachine.org/storm/2012/01/03/How-CoordinatedBolt-Works.html

To me, it seems as you would like to perform a simple join... For this,
you need to buffer all incoming meta-data tuples (that are related to
messages with attachments) in Dissem until the join is complete. For
this, you need to know (for each tuple coming from meta-data-transform)
how many attachment-tuples are expected from virus scanner. But Spout
can simple add this information. If the attachment-count-attribute is
zero, the message can be processed immediately.

Does this make sense to you?

However, I don't understand why you want to use direct-grouping? Using
fields-grouping on the message-id attribute should work for you.


-Matthias


On 04/29/2015 02:18 AM, Jason Kusar wrote:
> Hi,  
> 
> I'm currently working on building an ETL system using storm.
> Approximately 30% of the incoming records have binary attachments which
> need to be virus scanned. A single record can have one or more
> attachments. My initial thought was to build a topology with two outputs
> from the spout both of which eventually feed a downstream bolt. I've
> attached a simple diagram. Hopefully it comes through on the list.
> 
> 
> The spout would output tuples to the metadata transform on the default
> stream. If it came across a record that had attachments, it would output
> one or more additional tuples with the same ID to the Virus scan stream.
> Obviously the diagram is simplified as the Metadata transform might
> involve many steps, but regardless it's safe to assume that the time
> required for the virus scanner is likely significantly higher than the
> transform stream. I would like for records not having attachments to be
> able to keep flowing through the system without being slowed down by
> those records that do happen to have attachments.  
> 
> From looking at the CoordinatedBolt, it looks like it probably does
> exactly what I'm looking for, but I'm not sure. It would join the tuples
> from the two streams back together and deliver them to the dissem bolt
> as a batch to be processed from there. Am I viewing this right or am I
> completely off base? I can't find a lot of examples of CoordinatedBolts
> and there aren't any real comments in the code explaining what it's doing.
> 
> I feel like Direct Groupings might come into play here as well, but the
> link from the Documentation Manual page gets a 404, so I was unable to
> find more details on that. 
> 
>  If I'm completely off, is there an example implementation that does
> something similar to what I'm trying to do? Specifically, is there an
> example of something outputting a variable number of tuples that all get
> grouped back together somewhere down the line? 
> 
> Thanks!
> --Jason