You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Andrew Serff <an...@serff.net> on 2014/06/09 19:45:27 UTC

Trident Binned Aggregation Design Help

Hello,

I'm new to using Trident and had a few questions about the best way to do
things in this framework.  I'm trying to build a real-time streaming
aggregation system and Trident seems to have a very easy framework to allow
me to do that.  I have a basic setup working, but as I am adding more
counters, the performance becomes very slow and eventually I start having
many failures.  At the basic level here is what I want to do:

Have an incoming stream that is using a KafkaSpout to read data from.
I take the Kafka stream, parse it and output multiple fields.
I then want many different counters for those fields in the data.

For example, say it was the twitter stream.  I may want to count:
- A counter for each username I come across.  So how many times I have
received a tweet from each user
- A counter for each hashtag so you know how many tweets mention a hashtag
- Binned counters based on date for each tweet (i.e. how many tweets in
2014, June 2014, June 08 2014, etc).

The list could continue, but this can add up to hundreds of counters
running in real time.  Right now I have something like the following:

TridentTopology topology = new TridentTopology();
KafkaSpout spout = new KafkaSpout(kafkaConfig);
Stream stream = topology.newStream("messages", spout).shuffle()
                .each(new Fields("str"), new FieldEmitter(), new
Fields("username", "hashtag"));

stream.groupBy(new Fields("username"))
                .persistentAggregate(stateFactory, new Fields("username"),
new Count(), new Fields("count"))
                .parallelismHint(6);
stream.groupBy(new Fields("hashtag"))
                .persistentAggregate(stateFactory, new Fields("hashtag"),
new Count(), new Fields("count"))
                .parallelismHint(6);

Repeat something similar for everything you want to have a unique count
for.

I end up having hundreds of GroupBys each that has an aggregator for each.
 I have so far only run this on my local machine and not on a cluster yet,
but I'm wondering if this is the correct design for something like this or
if there is a better way to distribute this within Trident to make it more
efficient.

Any suggestions would be appreciated!
Thanks!
Andrew

Re: Trident Binned Aggregation Design Help

Posted by "Cody A. Ray" <co...@gmail.com>.
We're writing it into a data store and then querying the data store
directly. (We have topologies writing to MongoDB and KairosDB; our write
rate is significantly higher than our read rate.)

-Cody


On Wed, Jun 18, 2014 at 10:33 PM, Andrew Serff <an...@serff.net> wrote:

> That is Cody!  I do like the idea of creating key/value groupings. I'll
> have to try that with the persistentCounts to see how it changes my
> topology. I too want to bin all counts so your example helps.
>
> Question, how are you querying your counts?  Are you creating DRPC streams
> for each count?
>
> Thanks again for the discussion!
> Andrew
>
>
> On Wednesday, June 18, 2014, Cody A. Ray <co...@gmail.com> wrote:
>
>> We're currently building out a similar bucketed/binned stats aggregation
>> system on trident that's very similar to this. We're still in the process
>> of tuning it to handle the loads we're trying to throw at it, so if anyone
>> has any insight as to whether this approach is good/bad, I'm all ears. :D
>>
>> Anyway, the key here is that instead of grouping on fields "username",
>> "hashtag", etc, you can emit more generic metric names (key-value pairs)
>> and greatly reduce the number of groupBys you need.
>>
>> For example, you might parse a single tweet
>> <https://twitter.com/laterrastudio/status/479368401010184192>
>>
>> [image: Inline image 3]
>>
>> into the following tuples:
>>
>> (timestamp=1403128860, key=username, value=laterrastudio)
>> (timestamp=1403128860, key=mention, value=SocialCyclist)
>> (timestamp=1403128860, key=mention, value=Djump_in)
>> (timestamp=1403128860, key=mention, value=peerby)
>> (timestamp=1403128860, key=mention, value=laterrastudio)
>> (timestamp=1403128860, key=hashtag, value=AppMyCity)
>> (timestamp=1403128860, key=hashtag, value=NCS2014)
>> (timestamp=1403128860, key=url, value=pic.twitter.com/dlGcj946Hx)
>>
>> Then you groupBy(new Fields("key", "value")) and do a
>> persistentCount(stateFactory, new Fields("value"), new Count(), new
>> Fields("count"))
>>
>> We do something like this for enterprise data using code similar to the
>> following. Note that we just record 30s and 30m buckets, but you can
>> substitute whatever buckets you want. (Your example was daily, monthly, and
>> yearly; you'll likely have to performance test and make some binning
>> decisions based on the performance of your trident state when being written
>> to so frequently with large 'bins').
>>
>>   private static final Map<String, Long> BUCKETS = ImmutableMap.of("30s",
>> 30000L, "30m", 1800000L);
>>
>>     TridentTopology topology = new TridentTopology();
>>     Stream stream = topology.newStream("spout", spout)
>>
>> .parallelismHint(parallelism.forSpoutLayer()).shuffle().name("Transform")
>>         .each(new Fields("bytes"), new BinaryToString(), new
>> Fields("string"))
>>         .each(new Fields("string"), new MessageParser(), new
>> Fields("timestamp", "key", "value"));
>>     for (Map.Entry<String, Long> entry : BUCKETS.entrySet()) {
>>       stream.each(new Fields("timestamp"), new Bucket(entry.getValue()),
>> new Fields("bucketStart", "bucketEnd"))
>>           .parallelismHint(parallelism.forTransformLayer())
>>           .groupBy(new Fields("bucketStart", "bucketEnd", "key", "value"))
>>           .name("Aggregator-" + entry.getKey())
>>           .persistentAggregate(stateFactory, new Fields("value"), new
>> Count(), new Fields("count"))
>>           .parallelismHint(parallelism.forPersistenceLayer());
>>     }
>>     return topology.build();
>>
>> This will one Transform bolt + one bolt per bucket size (two in this
>> example). (Plus the extra spout bolts that Trident itself creates.)
>>
>> This example actually bins all counts, not just the number of tweets. But
>> you may find that's better than just keeping a global counter forever. Or
>> maybe not. YMMV. :)
>>
>> Either way, hopefully this gets you started in the right direction.
>>
>> -Cody
>>
>>
>> On Wed, Jun 18, 2014 at 1:21 PM, Adam Lewis <ma...@adamlewis.com> wrote:
>>
>>> Do you have any insight into how DRPC plays into this?  The groupBy bolt
>>> boundary makes perfect sense and I understand how that maps to some
>>> collection of bolts that would process different groupings (depending on
>>> parallelism).  What stumps me are the cases where adding multiple DRPC
>>> spouts to the topology seems to result in the whole things being duplicated
>>> for each spout.  I can see some extra tracking mechanisms get stood up to
>>> track DRPC requests and then match request with response but still not sure
>>> why that wouldn't scale linearly with # of DRPC spouts.
>>>
>>>
>>>
>>>
>>> On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz <pt...@gmail.com>
>>> wrote:
>>>
>>>> Not at the moment but I will be adding that functionality (trident
>>>> state) to the storm-hbase project very soon. Currently it only supports
>>>> MapState.
>>>>
>>>> -Taylor
>>>>
>>>> On Jun 17, 2014, at 6:09 PM, Andrew Serff <an...@serff.net> wrote:
>>>>
>>>> I'm currently using Redis, but I'm by no means tied to it.  Are there
>>>> any example of using either to do that?
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <pt...@gmail.com>
>>>> wrote:
>>>>
>>>>> Andrew/Adam,
>>>>>
>>>>> Partitioning operations like groupBy() form the bolt boundaries in
>>>>> trident topologies, so the more you have the more bolts you will have and
>>>>> thus, potentially, more network transfer.
>>>>>
>>>>> What backing store are you using for persistence? If you are using
>>>>> something with counter support like HBase or Cassandra you could leverage
>>>>> that in combination with tridents exactly once semantics to let it handle
>>>>> the counting, and potentially greatly reduce the complexity of your
>>>>> topology.
>>>>>
>>>>> -Taylor
>>>>>
>>>>> On Jun 17, 2014, at 5:15 PM, Adam Lewis <ma...@adamlewis.com> wrote:
>>>>>
>>>>> I, too, am eagerly awaiting a reply from the list on this topic.  I
>>>>> hit up against max topology size limits doing something similar with
>>>>> trident.  There are definitely "linear" changes to a trident topology that
>>>>> result in quadratic growth of the "compiled" storm topology size, such as
>>>>> adding DRPC spouts.  Sadly the compilation process of trident to plain
>>>>> storm remains somewhat opaque to me and I haven't had time to dig deeper.
>>>>>  My work around has been to limit myself to one DRPC spout per topology and
>>>>> programmatically build multiple topologies for the variations (which
>>>>> results in a lot of structural and functional duplication of deployed
>>>>> topologies, but at least not code duplication).
>>>>>
>>>>> Trident presents a seemingly nice abstraction, but from my point of
>>>>> view it is a leaky one if I need to understand the compilation process to
>>>>> know why adding a single DRPC spout double the topology size.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net>
>>>>> wrote:
>>>>>
>>>>>> Is there no one out there that can help with this?  If I use this
>>>>>> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
>>>>>> stream and I have like 50 spouts.  All of this adds up to a topology that I
>>>>>> can't even submit because it's too large (and i've bumped the trident max
>>>>>> to 50mb already...).  It seems like I'm thinking about this wrong, but I
>>>>>> haven't be able to come up with another way to do it.  I don't really see
>>>>>> how using vanilla Storm would help, maybe someone can offer some guidance?
>>>>>>
>>>>>> Thanks
>>>>>> Andrew
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm new to using Trident and had a few questions about the best way
>>>>>>> to do things in this framework.  I'm trying to build a real-time streaming
>>>>>>> aggregation system and Trident seems to have a very easy framework to allow
>>>>>>> me to do that.  I have a basic setup working, but as I am adding more
>>>>>>> counters, the performance becomes very slow and eventually I start having
>>>>>>> many failures.  At the basic level here is what I want to do:
>>>>>>>
>>>>>>> Have an incoming stream that is using a KafkaSpout to read data
>>>>>>> from.
>>>>>>> I take the Kafka stream, parse it and output multiple fields.
>>>>>>> I then want many different counters for those fields in the data.
>>>>>>>
>>>>>>> For example, say it was the twitter stream.  I may want to count:
>>>>>>> - A counter for each username I come across.  So how many times I
>>>>>>> have received a tweet from each user
>>>>>>> - A counter for each hashtag so you know how many tweets mention a
>>>>>>> hashtag
>>>>>>> - Binned counters based on date for each tweet (i.e. how many tweets
>>>>>>> in 2014, June 2014, June 08 2014, etc).
>>>>>>>
>>>>>>> The list could continue, but this can add up to hundreds of counters
>>>>>>> running in real time.  Right now I have something like the following:
>>>>>>>
>>>>>>> TridentTopology topology = new TridentTopology();
>>>>>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>>>>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>>>>>                 .each(new Fields("str"), new FieldEmitter(), new
>>>>>>> Fields("username", "hashtag"));
>>>>>>>
>>>>>>> stream.groupBy(new Fields("username"))
>>>>>>>                 .persistentAggregate(stateFactory, new
>>>>>>> Fields("username"), new Count(), new Fields("count"))
>>>>>>>                 .parallelismHint(6);
>>>>>>> stream.groupBy(new Fields("hashtag"))
>>>>>>>                 .persistentAggregate(stateFactory, new
>>>>>>> Fields("hashtag"), new Count(), new Fields("count"))
>>>>>>>                 .parallelismHint(6);
>>>>>>>
>>>>>>> Repeat something similar for everything you want to have a unique
>>>>>>> count for.
>>>>>>>
>>>>>>> I end up having hundreds of GroupBys each that has an aggregator for
>>>>>>> each.  I have so far only run this on my local machine and not on a cluster
>>>>>>> yet, but I'm wondering if this is the correct design for something like
>>>>>>> this or if there is a better way to distribute this within Trident to make
>>>>>>> it more efficient.
>>>>>>>
>>>>>>> Any suggestions would be appreciated!
>>>>>>> Thanks!
>>>>>>> Andrew
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Cody A. Ray, LEED AP
>> cody.a.ray@gmail.com
>> 215.501.7891
>>
>


-- 
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891

Re: Trident Binned Aggregation Design Help

Posted by Brian O'Neill <bo...@alumni.brown.edu>.
We took the same approach.

We were bitten by groupBy¹s as well.   To avoid that, we aggregate by
dimensions within each partition, and then groupBy the slices.
(time is one of the dimensions)

I touched on this topic at the NYC Storm meetup:
http://www.slideshare.net/boneill42/storm-nyc-04292014

If you are around, I¹ll present a more detailed version of this on Saturday
at the Cassandra Day in NYC.
https://www.eventbrite.com/e/cassandra-day-new-york-2014-tickets-11337955129

-brian

---
Brian O'Neill
Chief Technology Officer


Health Market Science
The Science of Better Results
2700 Horizon Drive € King of Prussia, PA € 19406
M: 215.588.6024 € @boneill42 <http://www.twitter.com/boneill42>   €
healthmarketscience.com


This information transmitted in this email message is for the intended
recipient only and may contain confidential and/or privileged material. If
you received this email in error and are not the intended recipient, or the
person responsible to deliver it to the intended recipient, please contact
the sender at the email above and delete this email and any attachments and
destroy any copies thereof. Any review, retransmission, dissemination,
copying or other use of, or taking any action in reliance upon, this
information by persons or entities other than the intended recipient is
strictly prohibited.
 


From:  Andrew Serff <an...@serff.net>
Reply-To:  <us...@storm.incubator.apache.org>
Date:  Wednesday, June 18, 2014 at 11:33 PM
To:  "user@storm.incubator.apache.org" <us...@storm.incubator.apache.org>
Subject:  Re: Trident Binned Aggregation Design Help

That is Cody!  I do like the idea of creating key/value groupings. I'll have
to try that with the persistentCounts to see how it changes my topology. I
too want to bin all counts so your example helps.

Question, how are you querying your counts?  Are you creating DRPC streams
for each count?  

Thanks again for the discussion!
Andrew

On Wednesday, June 18, 2014, Cody A. Ray <co...@gmail.com> wrote:
> We're currently building out a similar bucketed/binned stats aggregation
> system on trident that's very similar to this. We're still in the process of
> tuning it to handle the loads we're trying to throw at it, so if anyone has
> any insight as to whether this approach is good/bad, I'm all ears. :D
> 
> Anyway, the key here is that instead of grouping on fields "username",
> "hashtag", etc, you can emit more generic metric names (key-value pairs) and
> greatly reduce the number of groupBys you need.
> 
> For example, you might parse a single tweet
> <https://twitter.com/laterrastudio/status/479368401010184192>
> 
> 
> 
> into the following tuples:
> 
> (timestamp=1403128860, key=username, value=laterrastudio)
> (timestamp=1403128860, key=mention, value=SocialCyclist)
> (timestamp=1403128860, key=mention, value=Djump_in)
> (timestamp=1403128860, key=mention, value=peerby)
> (timestamp=1403128860, key=mention, value=laterrastudio)
> (timestamp=1403128860, key=hashtag, value=AppMyCity)
> (timestamp=1403128860, key=hashtag, value=NCS2014)
> (timestamp=1403128860, key=url, value=pic.twitter.com/dlGcj946Hx
> <http://pic.twitter.com/dlGcj946Hx> )
> 
> Then you groupBy(new Fields("key", "value")) and do a
> persistentCount(stateFactory, new Fields("value"), new Count(), new
> Fields("count"))
> 
> We do something like this for enterprise data using code similar to the
> following. Note that we just record 30s and 30m buckets, but you can
> substitute whatever buckets you want. (Your example was daily, monthly, and
> yearly; you'll likely have to performance test and make some binning decisions
> based on the performance of your trident state when being written to so
> frequently with large 'bins').
> 
>   private static final Map<String, Long> BUCKETS = ImmutableMap.of("30s",
> 30000L, "30m", 1800000L);
> 
>     TridentTopology topology = new TridentTopology();
>     Stream stream = topology.newStream("spout", spout)
>         
> .parallelismHint(parallelism.forSpoutLayer()).shuffle().name("Transform")
>         .each(new Fields("bytes"), new BinaryToString(), new Fields("string"))
>         .each(new Fields("string"), new MessageParser(), new
> Fields("timestamp", "key", "value"));
>     for (Map.Entry<String, Long> entry : BUCKETS.entrySet()) {
>       stream.each(new Fields("timestamp"), new Bucket(entry.getValue()), new
> Fields("bucketStart", "bucketEnd"))
>           .parallelismHint(parallelism.forTransformLayer())
>           .groupBy(new Fields("bucketStart", "bucketEnd", "key", "value"))
>           .name("Aggregator-" + entry.getKey())
>           .persistentAggregate(stateFactory, new Fields("value"), new Count(),
> new Fields("count"))
>           .parallelismHint(parallelism.forPersistenceLayer());
>     }
>     return topology.build();
> 
> This will one Transform bolt + one bolt per bucket size (two in this example).
> (Plus the extra spout bolts that Trident itself creates.)
> 
> This example actually bins all counts, not just the number of tweets. But you
> may find that's better than just keeping a global counter forever. Or maybe
> not. YMMV. :)
> 
> Either way, hopefully this gets you started in the right direction.
> 
> -Cody
> 
> 
> On Wed, Jun 18, 2014 at 1:21 PM, Adam Lewis <mail@adamlewis.com
> <javascript:_e(%7B%7D,'cvml','mail@adamlewis.com');> > wrote:
>> Do you have any insight into how DRPC plays into this?  The groupBy bolt
>> boundary makes perfect sense and I understand how that maps to some
>> collection of bolts that would process different groupings (depending on
>> parallelism).  What stumps me are the cases where adding multiple DRPC spouts
>> to the topology seems to result in the whole things being duplicated for each
>> spout.  I can see some extra tracking mechanisms get stood up to track DRPC
>> requests and then match request with response but still not sure why that
>> wouldn't scale linearly with # of DRPC spouts.
>> 
>> 
>> 
>> 
>> On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz <ptgoetz@gmail.com
>> <javascript:_e(%7B%7D,'cvml','ptgoetz@gmail.com');> > wrote:
>>> Not at the moment but I will be adding that functionality (trident state) to
>>> the storm-hbase project very soon. Currently it only supports MapState.
>>> 
>>> -Taylor
>>> 
>>> On Jun 17, 2014, at 6:09 PM, Andrew Serff <andrew@serff.net
>>> <javascript:_e(%7B%7D,'cvml','andrew@serff.net');> > wrote:
>>> 
>>>> I'm currently using Redis, but I'm by no means tied to it.  Are there any
>>>> example of using either to do that?
>>>> 
>>>> Andrew
>>>> 
>>>> 
>>>> On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <ptgoetz@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','ptgoetz@gmail.com');> > wrote:
>>>>> Andrew/Adam,
>>>>> 
>>>>> Partitioning operations like groupBy() form the bolt boundaries in trident
>>>>> topologies, so the more you have the more bolts you will have and thus,
>>>>> potentially, more network transfer.
>>>>> 
>>>>> What backing store are you using for persistence? If you are using
>>>>> something with counter support like HBase or Cassandra you could leverage
>>>>> that in combination with tridents exactly once semantics to let it handle
>>>>> the counting, and potentially greatly reduce the complexity of your
>>>>> topology.
>>>>> 
>>>>> -Taylor
>>>>> 
>>>>> On Jun 17, 2014, at 5:15 PM, Adam Lewis <mail@adamlewis.com
>>>>> <javascript:_e(%7B%7D,'cvml','mail@adamlewis.com');> > wrote:
>>>>> 
>>>>>> I, too, am eagerly awaiting a reply from the list on this topic.  I hit
>>>>>> up against max topology size limits doing something similar with trident.
>>>>>> There are definitely "linear" changes to a trident topology that result
>>>>>> in quadratic growth of the "compiled" storm topology size, such as adding
>>>>>> DRPC spouts.  Sadly the compilation process of trident to plain storm
>>>>>> remains somewhat opaque to me and I haven't had time to dig deeper.  My
>>>>>> work around has been to limit myself to one DRPC spout per topology and
>>>>>> programmatically build multiple topologies for the variations (which
>>>>>> results in a lot of structural and functional duplication of deployed
>>>>>> topologies, but at least not code duplication).
>>>>>> 
>>>>>> Trident presents a seemingly nice abstraction, but from my point of view
>>>>>> it is a leaky one if I need to understand the compilation process to know
>>>>>> why adding a single DRPC spout double the topology size.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <andrew@serff.net
>>>>>> <javascript:_e(%7B%7D,'cvml','andrew@serff.net');> > wrote:
>>>>>>> Is there no one out there that can help with this?  If I use this
>>>>>>> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
>>>>>>> stream and I have like 50 spouts.  All of this adds up to a topology
>>>>>>> that I can't even submit because it's too large (and i've bumped the
>>>>>>> trident max to 50mb already...).  It seems like I'm thinking about this
>>>>>>> wrong, but I haven't be able to come up with another way to do it.  I
>>>>>>> don't really see how using vanilla Storm would help, maybe someone can
>>>>>>> offer some guidance?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Andrew
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <andrew@serff.net
>>>>>>> <javascript:_e(%7B%7D,'cvml','andrew@serff.net');> > wrote:
>>>>>>> Hello,
>>>>>>> 
>>>>>>> I'm new to using Trident and had a few questions about the best way to
>>>>>>> do things in this framework.  I'm trying to build a real-time streaming
>>>>>>> aggregation system and Trident seems to have a very easy framework to
>>>>>>> allow me to do that.  I have a basic setup working, but as I am adding
>>>>>>> more counters, the performance becomes very slow and eventually I start
>>>>>>> having many failures.  At the basic level here is what I want to do:
>>>>>>> 
>>>>>>> Have an incoming stream that is using a KafkaSpout to read data from.
>>>>>>> I take the Kafka stream, parse it and output multiple fields.
>>>>>>> I then want many different counters for those fields in the data.
>>>>>>> 
>>>>>>> For example, say it was the twitter stream.  I may want to count:
>>>>>>> - A counter for each username I come across.  So how many times I have
>>>>>>> received a tweet from each user
>>>>>>> - A counter for each hashtag so you know how many tweets mention a
>>>>>>> hashtag
>>>>>>> - Binned counters based on date for each tweet (i.e. how many tweets in
>>>>>>> 2014, June 2014, June 08 2014, etc).
>>>>>>> 
>>>>>>> The list could continue, but this can add up to hundreds of counters
>>>>>>> running in real time.  Right now I have something like the following:
>>>>>>> 
>>>>>>> TridentTopology topology = new TridentTopology();
>>>>>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>>>>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>>>>>                 .each(new Fields("str"), new FieldEmitter(), new
>>>>>>> Fields("username", "hashtag"));
>>>>>>> 
>>>>>>> stream.groupBy(new Fields("username"))
>>>>>>>                 .persistentAggregate(stateFactory, new
>>>>>>> Fields("username"), new Count(), new Fields("count"))
>>>>>>>                 .parallelismHint(6);
>>>>>>> stream.groupBy(new Fields("hashtag"))
>>>>>>>                 .persistentAggregate(stateFactory, new
>>>>>>> Fields("hashtag"), new Count(), new Fields("count"))
>>>>>>>                 .parallelismHint(6);
>>>>>>> 
>>>>>>> Repeat something similar for everything you want to have a unique count
>>>>>>> for.  
>>>>>>> 
>>>>>>> I end up having hundreds of GroupBys each that has an aggregator for
>>>>>>> each.  I have so far only run this on my local machine and not on a
>>>>>>> cluster yet, but I'm wondering if this is the correct design for
>>>>>>> something like this or if there is a better way to distribute this
>>>>>>> within Trident to make it more efficient.
>>>>>>> 
>>>>>>> Any suggestions would be appreciated!
>>>>>>> Thanks!
>>>>>>> Andrew
>>>>>>> 
>>>>>> 
>>>> 
>> 
> 
> 
> 
> -- 
> Cody A. Ray, LEED AP
> cody.a.ray@gmail.com <javascript:_e(%7B%7D,'cvml','cody.a.ray@gmail.com');>
> 215.501.7891 <tel:215.501.7891>



Re: Trident Binned Aggregation Design Help

Posted by Andrew Serff <an...@serff.net>.
That is Cody!  I do like the idea of creating key/value groupings. I'll
have to try that with the persistentCounts to see how it changes my
topology. I too want to bin all counts so your example helps.

Question, how are you querying your counts?  Are you creating DRPC streams
for each count?

Thanks again for the discussion!
Andrew

On Wednesday, June 18, 2014, Cody A. Ray <co...@gmail.com> wrote:

> We're currently building out a similar bucketed/binned stats aggregation
> system on trident that's very similar to this. We're still in the process
> of tuning it to handle the loads we're trying to throw at it, so if anyone
> has any insight as to whether this approach is good/bad, I'm all ears. :D
>
> Anyway, the key here is that instead of grouping on fields "username",
> "hashtag", etc, you can emit more generic metric names (key-value pairs)
> and greatly reduce the number of groupBys you need.
>
> For example, you might parse a single tweet
> <https://twitter.com/laterrastudio/status/479368401010184192>
>
> [image: Inline image 3]
>
> into the following tuples:
>
> (timestamp=1403128860, key=username, value=laterrastudio)
> (timestamp=1403128860, key=mention, value=SocialCyclist)
> (timestamp=1403128860, key=mention, value=Djump_in)
> (timestamp=1403128860, key=mention, value=peerby)
> (timestamp=1403128860, key=mention, value=laterrastudio)
> (timestamp=1403128860, key=hashtag, value=AppMyCity)
> (timestamp=1403128860, key=hashtag, value=NCS2014)
> (timestamp=1403128860, key=url, value=pic.twitter.com/dlGcj946Hx)
>
> Then you groupBy(new Fields("key", "value")) and do a
> persistentCount(stateFactory, new Fields("value"), new Count(), new
> Fields("count"))
>
> We do something like this for enterprise data using code similar to the
> following. Note that we just record 30s and 30m buckets, but you can
> substitute whatever buckets you want. (Your example was daily, monthly, and
> yearly; you'll likely have to performance test and make some binning
> decisions based on the performance of your trident state when being written
> to so frequently with large 'bins').
>
>   private static final Map<String, Long> BUCKETS = ImmutableMap.of("30s",
> 30000L, "30m", 1800000L);
>
>     TridentTopology topology = new TridentTopology();
>     Stream stream = topology.newStream("spout", spout)
>
> .parallelismHint(parallelism.forSpoutLayer()).shuffle().name("Transform")
>         .each(new Fields("bytes"), new BinaryToString(), new
> Fields("string"))
>         .each(new Fields("string"), new MessageParser(), new
> Fields("timestamp", "key", "value"));
>     for (Map.Entry<String, Long> entry : BUCKETS.entrySet()) {
>       stream.each(new Fields("timestamp"), new Bucket(entry.getValue()),
> new Fields("bucketStart", "bucketEnd"))
>           .parallelismHint(parallelism.forTransformLayer())
>           .groupBy(new Fields("bucketStart", "bucketEnd", "key", "value"))
>           .name("Aggregator-" + entry.getKey())
>           .persistentAggregate(stateFactory, new Fields("value"), new
> Count(), new Fields("count"))
>           .parallelismHint(parallelism.forPersistenceLayer());
>     }
>     return topology.build();
>
> This will one Transform bolt + one bolt per bucket size (two in this
> example). (Plus the extra spout bolts that Trident itself creates.)
>
> This example actually bins all counts, not just the number of tweets. But
> you may find that's better than just keeping a global counter forever. Or
> maybe not. YMMV. :)
>
> Either way, hopefully this gets you started in the right direction.
>
> -Cody
>
>
> On Wed, Jun 18, 2014 at 1:21 PM, Adam Lewis <mail@adamlewis.com
> <javascript:_e(%7B%7D,'cvml','mail@adamlewis.com');>> wrote:
>
>> Do you have any insight into how DRPC plays into this?  The groupBy bolt
>> boundary makes perfect sense and I understand how that maps to some
>> collection of bolts that would process different groupings (depending on
>> parallelism).  What stumps me are the cases where adding multiple DRPC
>> spouts to the topology seems to result in the whole things being duplicated
>> for each spout.  I can see some extra tracking mechanisms get stood up to
>> track DRPC requests and then match request with response but still not sure
>> why that wouldn't scale linearly with # of DRPC spouts.
>>
>>
>>
>>
>> On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz <ptgoetz@gmail.com
>> <javascript:_e(%7B%7D,'cvml','ptgoetz@gmail.com');>> wrote:
>>
>>> Not at the moment but I will be adding that functionality (trident
>>> state) to the storm-hbase project very soon. Currently it only supports
>>> MapState.
>>>
>>> -Taylor
>>>
>>> On Jun 17, 2014, at 6:09 PM, Andrew Serff <andrew@serff.net
>>> <javascript:_e(%7B%7D,'cvml','andrew@serff.net');>> wrote:
>>>
>>> I'm currently using Redis, but I'm by no means tied to it.  Are there
>>> any example of using either to do that?
>>>
>>> Andrew
>>>
>>>
>>> On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <ptgoetz@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','ptgoetz@gmail.com');>> wrote:
>>>
>>>> Andrew/Adam,
>>>>
>>>> Partitioning operations like groupBy() form the bolt boundaries in
>>>> trident topologies, so the more you have the more bolts you will have and
>>>> thus, potentially, more network transfer.
>>>>
>>>> What backing store are you using for persistence? If you are using
>>>> something with counter support like HBase or Cassandra you could leverage
>>>> that in combination with tridents exactly once semantics to let it handle
>>>> the counting, and potentially greatly reduce the complexity of your
>>>> topology.
>>>>
>>>> -Taylor
>>>>
>>>> On Jun 17, 2014, at 5:15 PM, Adam Lewis <mail@adamlewis.com
>>>> <javascript:_e(%7B%7D,'cvml','mail@adamlewis.com');>> wrote:
>>>>
>>>> I, too, am eagerly awaiting a reply from the list on this topic.  I hit
>>>> up against max topology size limits doing something similar with trident.
>>>>  There are definitely "linear" changes to a trident topology that result in
>>>> quadratic growth of the "compiled" storm topology size, such as adding DRPC
>>>> spouts.  Sadly the compilation process of trident to plain storm remains
>>>> somewhat opaque to me and I haven't had time to dig deeper.  My work around
>>>> has been to limit myself to one DRPC spout per topology and
>>>> programmatically build multiple topologies for the variations (which
>>>> results in a lot of structural and functional duplication of deployed
>>>> topologies, but at least not code duplication).
>>>>
>>>> Trident presents a seemingly nice abstraction, but from my point of
>>>> view it is a leaky one if I need to understand the compilation process to
>>>> know why adding a single DRPC spout double the topology size.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <andrew@serff.net
>>>> <javascript:_e(%7B%7D,'cvml','andrew@serff.net');>> wrote:
>>>>
>>>>> Is there no one out there that can help with this?  If I use this
>>>>> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
>>>>> stream and I have like 50 spouts.  All of this adds up to a topology that I
>>>>> can't even submit because it's too large (and i've bumped the trident max
>>>>> to 50mb already...).  It seems like I'm thinking about this wrong, but I
>>>>> haven't be able to come up with another way to do it.  I don't really see
>>>>> how using vanilla Storm would help, maybe someone can offer some guidance?
>>>>>
>>>>> Thanks
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <andrew@serff.net
>>>>> <javascript:_e(%7B%7D,'cvml','andrew@serff.net');>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm new to using Trident and had a few questions about the best way
>>>>>> to do things in this framework.  I'm trying to build a real-time streaming
>>>>>> aggregation system and Trident seems to have a very easy framework to allow
>>>>>> me to do that.  I have a basic setup working, but as I am adding more
>>>>>> counters, the performance becomes very slow and eventually I start having
>>>>>> many failures.  At the basic level here is what I want to do:
>>>>>>
>>>>>> Have an incoming stream that is using a KafkaSpout to read data from.
>>>>>> I take the Kafka stream, parse it and output multiple fields.
>>>>>> I then want many different counters for those fields in the data.
>>>>>>
>>>>>> For example, say it was the twitter stream.  I may want to count:
>>>>>> - A counter for each username I come across.  So how many times I
>>>>>> have received a tweet from each user
>>>>>> - A counter for each hashtag so you know how many tweets mention a
>>>>>> hashtag
>>>>>> - Binned counters based on date for each tweet (i.e. how many tweets
>>>>>> in 2014, June 2014, June 08 2014, etc).
>>>>>>
>>>>>> The list could continue, but this can add up to hundreds of counters
>>>>>> running in real time.  Right now I have something like the following:
>>>>>>
>>>>>> TridentTopology topology = new TridentTopology();
>>>>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>>>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>>>>                 .each(new Fields("str"), new FieldEmitter(), new
>>>>>> Fields("username", "hashtag"));
>>>>>>
>>>>>> stream.groupBy(new Fields("username"))
>>>>>>                 .persistentAggregate(stateFactory, new
>>>>>> Fields("username"), new Count(), new Fields("count"))
>>>>>>                 .parallelismHint(6);
>>>>>> stream.groupBy(new Fields("hashtag"))
>>>>>>                 .persistentAggregate(stateFactory, new
>>>>>> Fields("hashtag"), new Count(), new Fields("count"))
>>>>>>                 .parallelismHint(6);
>>>>>>
>>>>>> Repeat something similar for everything you want to have a unique
>>>>>> count for.
>>>>>>
>>>>>> I end up having hundreds of GroupBys each that has an aggregator for
>>>>>> each.  I have so far only run this on my local machine and not on a cluster
>>>>>> yet, but I'm wondering if this is the correct design for something like
>>>>>> this or if there is a better way to distribute this within Trident to make
>>>>>> it more efficient.
>>>>>>
>>>>>> Any suggestions would be appreciated!
>>>>>> Thanks!
>>>>>> Andrew
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a.ray@gmail.com
> <javascript:_e(%7B%7D,'cvml','cody.a.ray@gmail.com');>
> 215.501.7891
>

Re: Trident Binned Aggregation Design Help

Posted by "Cody A. Ray" <co...@gmail.com>.
We're currently building out a similar bucketed/binned stats aggregation
system on trident that's very similar to this. We're still in the process
of tuning it to handle the loads we're trying to throw at it, so if anyone
has any insight as to whether this approach is good/bad, I'm all ears. :D

Anyway, the key here is that instead of grouping on fields "username",
"hashtag", etc, you can emit more generic metric names (key-value pairs)
and greatly reduce the number of groupBys you need.

For example, you might parse a single tweet
<https://twitter.com/laterrastudio/status/479368401010184192>

[image: Inline image 3]

into the following tuples:

(timestamp=1403128860, key=username, value=laterrastudio)
(timestamp=1403128860, key=mention, value=SocialCyclist)
(timestamp=1403128860, key=mention, value=Djump_in)
(timestamp=1403128860, key=mention, value=peerby)
(timestamp=1403128860, key=mention, value=laterrastudio)
(timestamp=1403128860, key=hashtag, value=AppMyCity)
(timestamp=1403128860, key=hashtag, value=NCS2014)
(timestamp=1403128860, key=url, value=pic.twitter.com/dlGcj946Hx)

Then you groupBy(new Fields("key", "value")) and do a
persistentCount(stateFactory, new Fields("value"), new Count(), new
Fields("count"))

We do something like this for enterprise data using code similar to the
following. Note that we just record 30s and 30m buckets, but you can
substitute whatever buckets you want. (Your example was daily, monthly, and
yearly; you'll likely have to performance test and make some binning
decisions based on the performance of your trident state when being written
to so frequently with large 'bins').

  private static final Map<String, Long> BUCKETS = ImmutableMap.of("30s",
30000L, "30m", 1800000L);

    TridentTopology topology = new TridentTopology();
    Stream stream = topology.newStream("spout", spout)

.parallelismHint(parallelism.forSpoutLayer()).shuffle().name("Transform")
        .each(new Fields("bytes"), new BinaryToString(), new
Fields("string"))
        .each(new Fields("string"), new MessageParser(), new
Fields("timestamp", "key", "value"));
    for (Map.Entry<String, Long> entry : BUCKETS.entrySet()) {
      stream.each(new Fields("timestamp"), new Bucket(entry.getValue()),
new Fields("bucketStart", "bucketEnd"))
          .parallelismHint(parallelism.forTransformLayer())
          .groupBy(new Fields("bucketStart", "bucketEnd", "key", "value"))
          .name("Aggregator-" + entry.getKey())
          .persistentAggregate(stateFactory, new Fields("value"), new
Count(), new Fields("count"))
          .parallelismHint(parallelism.forPersistenceLayer());
    }
    return topology.build();

This will one Transform bolt + one bolt per bucket size (two in this
example). (Plus the extra spout bolts that Trident itself creates.)

This example actually bins all counts, not just the number of tweets. But
you may find that's better than just keeping a global counter forever. Or
maybe not. YMMV. :)

Either way, hopefully this gets you started in the right direction.

-Cody


On Wed, Jun 18, 2014 at 1:21 PM, Adam Lewis <ma...@adamlewis.com> wrote:

> Do you have any insight into how DRPC plays into this?  The groupBy bolt
> boundary makes perfect sense and I understand how that maps to some
> collection of bolts that would process different groupings (depending on
> parallelism).  What stumps me are the cases where adding multiple DRPC
> spouts to the topology seems to result in the whole things being duplicated
> for each spout.  I can see some extra tracking mechanisms get stood up to
> track DRPC requests and then match request with response but still not sure
> why that wouldn't scale linearly with # of DRPC spouts.
>
>
>
>
> On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz <pt...@gmail.com>
> wrote:
>
>> Not at the moment but I will be adding that functionality (trident state)
>> to the storm-hbase project very soon. Currently it only supports MapState.
>>
>> -Taylor
>>
>> On Jun 17, 2014, at 6:09 PM, Andrew Serff <an...@serff.net> wrote:
>>
>> I'm currently using Redis, but I'm by no means tied to it.  Are there any
>> example of using either to do that?
>>
>> Andrew
>>
>>
>> On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <pt...@gmail.com>
>> wrote:
>>
>>> Andrew/Adam,
>>>
>>> Partitioning operations like groupBy() form the bolt boundaries in
>>> trident topologies, so the more you have the more bolts you will have and
>>> thus, potentially, more network transfer.
>>>
>>> What backing store are you using for persistence? If you are using
>>> something with counter support like HBase or Cassandra you could leverage
>>> that in combination with tridents exactly once semantics to let it handle
>>> the counting, and potentially greatly reduce the complexity of your
>>> topology.
>>>
>>> -Taylor
>>>
>>> On Jun 17, 2014, at 5:15 PM, Adam Lewis <ma...@adamlewis.com> wrote:
>>>
>>> I, too, am eagerly awaiting a reply from the list on this topic.  I hit
>>> up against max topology size limits doing something similar with trident.
>>>  There are definitely "linear" changes to a trident topology that result in
>>> quadratic growth of the "compiled" storm topology size, such as adding DRPC
>>> spouts.  Sadly the compilation process of trident to plain storm remains
>>> somewhat opaque to me and I haven't had time to dig deeper.  My work around
>>> has been to limit myself to one DRPC spout per topology and
>>> programmatically build multiple topologies for the variations (which
>>> results in a lot of structural and functional duplication of deployed
>>> topologies, but at least not code duplication).
>>>
>>> Trident presents a seemingly nice abstraction, but from my point of view
>>> it is a leaky one if I need to understand the compilation process to know
>>> why adding a single DRPC spout double the topology size.
>>>
>>>
>>>
>>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net> wrote:
>>>
>>>> Is there no one out there that can help with this?  If I use this
>>>> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
>>>> stream and I have like 50 spouts.  All of this adds up to a topology that I
>>>> can't even submit because it's too large (and i've bumped the trident max
>>>> to 50mb already...).  It seems like I'm thinking about this wrong, but I
>>>> haven't be able to come up with another way to do it.  I don't really see
>>>> how using vanilla Storm would help, maybe someone can offer some guidance?
>>>>
>>>> Thanks
>>>> Andrew
>>>>
>>>>
>>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm new to using Trident and had a few questions about the best way to
>>>>> do things in this framework.  I'm trying to build a real-time streaming
>>>>> aggregation system and Trident seems to have a very easy framework to allow
>>>>> me to do that.  I have a basic setup working, but as I am adding more
>>>>> counters, the performance becomes very slow and eventually I start having
>>>>> many failures.  At the basic level here is what I want to do:
>>>>>
>>>>> Have an incoming stream that is using a KafkaSpout to read data from.
>>>>> I take the Kafka stream, parse it and output multiple fields.
>>>>> I then want many different counters for those fields in the data.
>>>>>
>>>>> For example, say it was the twitter stream.  I may want to count:
>>>>> - A counter for each username I come across.  So how many times I have
>>>>> received a tweet from each user
>>>>> - A counter for each hashtag so you know how many tweets mention a
>>>>> hashtag
>>>>> - Binned counters based on date for each tweet (i.e. how many tweets
>>>>> in 2014, June 2014, June 08 2014, etc).
>>>>>
>>>>> The list could continue, but this can add up to hundreds of counters
>>>>> running in real time.  Right now I have something like the following:
>>>>>
>>>>> TridentTopology topology = new TridentTopology();
>>>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>>>                 .each(new Fields("str"), new FieldEmitter(), new
>>>>> Fields("username", "hashtag"));
>>>>>
>>>>> stream.groupBy(new Fields("username"))
>>>>>                 .persistentAggregate(stateFactory, new
>>>>> Fields("username"), new Count(), new Fields("count"))
>>>>>                 .parallelismHint(6);
>>>>> stream.groupBy(new Fields("hashtag"))
>>>>>                 .persistentAggregate(stateFactory, new
>>>>> Fields("hashtag"), new Count(), new Fields("count"))
>>>>>                 .parallelismHint(6);
>>>>>
>>>>> Repeat something similar for everything you want to have a unique
>>>>> count for.
>>>>>
>>>>> I end up having hundreds of GroupBys each that has an aggregator for
>>>>> each.  I have so far only run this on my local machine and not on a cluster
>>>>> yet, but I'm wondering if this is the correct design for something like
>>>>> this or if there is a better way to distribute this within Trident to make
>>>>> it more efficient.
>>>>>
>>>>> Any suggestions would be appreciated!
>>>>> Thanks!
>>>>> Andrew
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Cody A. Ray, LEED AP
cody.a.ray@gmail.com
215.501.7891

Re: Trident Binned Aggregation Design Help

Posted by Adam Lewis <ma...@adamlewis.com>.
Do you have any insight into how DRPC plays into this?  The groupBy bolt
boundary makes perfect sense and I understand how that maps to some
collection of bolts that would process different groupings (depending on
parallelism).  What stumps me are the cases where adding multiple DRPC
spouts to the topology seems to result in the whole things being duplicated
for each spout.  I can see some extra tracking mechanisms get stood up to
track DRPC requests and then match request with response but still not sure
why that wouldn't scale linearly with # of DRPC spouts.




On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> Not at the moment but I will be adding that functionality (trident state)
> to the storm-hbase project very soon. Currently it only supports MapState.
>
> -Taylor
>
> On Jun 17, 2014, at 6:09 PM, Andrew Serff <an...@serff.net> wrote:
>
> I'm currently using Redis, but I'm by no means tied to it.  Are there any
> example of using either to do that?
>
> Andrew
>
>
> On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <pt...@gmail.com>
> wrote:
>
>> Andrew/Adam,
>>
>> Partitioning operations like groupBy() form the bolt boundaries in
>> trident topologies, so the more you have the more bolts you will have and
>> thus, potentially, more network transfer.
>>
>> What backing store are you using for persistence? If you are using
>> something with counter support like HBase or Cassandra you could leverage
>> that in combination with tridents exactly once semantics to let it handle
>> the counting, and potentially greatly reduce the complexity of your
>> topology.
>>
>> -Taylor
>>
>> On Jun 17, 2014, at 5:15 PM, Adam Lewis <ma...@adamlewis.com> wrote:
>>
>> I, too, am eagerly awaiting a reply from the list on this topic.  I hit
>> up against max topology size limits doing something similar with trident.
>>  There are definitely "linear" changes to a trident topology that result in
>> quadratic growth of the "compiled" storm topology size, such as adding DRPC
>> spouts.  Sadly the compilation process of trident to plain storm remains
>> somewhat opaque to me and I haven't had time to dig deeper.  My work around
>> has been to limit myself to one DRPC spout per topology and
>> programmatically build multiple topologies for the variations (which
>> results in a lot of structural and functional duplication of deployed
>> topologies, but at least not code duplication).
>>
>> Trident presents a seemingly nice abstraction, but from my point of view
>> it is a leaky one if I need to understand the compilation process to know
>> why adding a single DRPC spout double the topology size.
>>
>>
>>
>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net> wrote:
>>
>>> Is there no one out there that can help with this?  If I use this
>>> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
>>> stream and I have like 50 spouts.  All of this adds up to a topology that I
>>> can't even submit because it's too large (and i've bumped the trident max
>>> to 50mb already...).  It seems like I'm thinking about this wrong, but I
>>> haven't be able to come up with another way to do it.  I don't really see
>>> how using vanilla Storm would help, maybe someone can offer some guidance?
>>>
>>> Thanks
>>> Andrew
>>>
>>>
>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm new to using Trident and had a few questions about the best way to
>>>> do things in this framework.  I'm trying to build a real-time streaming
>>>> aggregation system and Trident seems to have a very easy framework to allow
>>>> me to do that.  I have a basic setup working, but as I am adding more
>>>> counters, the performance becomes very slow and eventually I start having
>>>> many failures.  At the basic level here is what I want to do:
>>>>
>>>> Have an incoming stream that is using a KafkaSpout to read data from.
>>>> I take the Kafka stream, parse it and output multiple fields.
>>>> I then want many different counters for those fields in the data.
>>>>
>>>> For example, say it was the twitter stream.  I may want to count:
>>>> - A counter for each username I come across.  So how many times I have
>>>> received a tweet from each user
>>>> - A counter for each hashtag so you know how many tweets mention a
>>>> hashtag
>>>> - Binned counters based on date for each tweet (i.e. how many tweets in
>>>> 2014, June 2014, June 08 2014, etc).
>>>>
>>>> The list could continue, but this can add up to hundreds of counters
>>>> running in real time.  Right now I have something like the following:
>>>>
>>>> TridentTopology topology = new TridentTopology();
>>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>>                 .each(new Fields("str"), new FieldEmitter(), new
>>>> Fields("username", "hashtag"));
>>>>
>>>> stream.groupBy(new Fields("username"))
>>>>                 .persistentAggregate(stateFactory, new
>>>> Fields("username"), new Count(), new Fields("count"))
>>>>                 .parallelismHint(6);
>>>> stream.groupBy(new Fields("hashtag"))
>>>>                 .persistentAggregate(stateFactory, new
>>>> Fields("hashtag"), new Count(), new Fields("count"))
>>>>                 .parallelismHint(6);
>>>>
>>>> Repeat something similar for everything you want to have a unique count
>>>> for.
>>>>
>>>> I end up having hundreds of GroupBys each that has an aggregator for
>>>> each.  I have so far only run this on my local machine and not on a cluster
>>>> yet, but I'm wondering if this is the correct design for something like
>>>> this or if there is a better way to distribute this within Trident to make
>>>> it more efficient.
>>>>
>>>> Any suggestions would be appreciated!
>>>> Thanks!
>>>> Andrew
>>>>
>>>
>>>
>>
>

Re: Trident Binned Aggregation Design Help

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Not at the moment but I will be adding that functionality (trident state) to the storm-hbase project very soon. Currently it only supports MapState.

-Taylor

> On Jun 17, 2014, at 6:09 PM, Andrew Serff <an...@serff.net> wrote:
> 
> I'm currently using Redis, but I'm by no means tied to it.  Are there any example of using either to do that?  
> 
> Andrew
> 
> 
>> On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>> Andrew/Adam,
>> 
>> Partitioning operations like groupBy() form the bolt boundaries in trident topologies, so the more you have the more bolts you will have and thus, potentially, more network transfer.
>> 
>> What backing store are you using for persistence? If you are using something with counter support like HBase or Cassandra you could leverage that in combination with tridents exactly once semantics to let it handle the counting, and potentially greatly reduce the complexity of your topology.
>> 
>> -Taylor
>> 
>>> On Jun 17, 2014, at 5:15 PM, Adam Lewis <ma...@adamlewis.com> wrote:
>>> 
>>> I, too, am eagerly awaiting a reply from the list on this topic.  I hit up against max topology size limits doing something similar with trident.  There are definitely "linear" changes to a trident topology that result in quadratic growth of the "compiled" storm topology size, such as adding DRPC spouts.  Sadly the compilation process of trident to plain storm remains somewhat opaque to me and I haven't had time to dig deeper.  My work around has been to limit myself to one DRPC spout per topology and programmatically build multiple topologies for the variations (which results in a lot of structural and functional duplication of deployed topologies, but at least not code duplication).
>>> 
>>> Trident presents a seemingly nice abstraction, but from my point of view it is a leaky one if I need to understand the compilation process to know why adding a single DRPC spout double the topology size.
>>> 
>>> 
>>> 
>>>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net> wrote:
>>>> Is there no one out there that can help with this?  If I use this paradigm, my topology ends up having like 170 bolts.  Then I add DRPC stream and I have like 50 spouts.  All of this adds up to a topology that I can't even submit because it's too large (and i've bumped the trident max to 50mb already...).  It seems like I'm thinking about this wrong, but I haven't be able to come up with another way to do it.  I don't really see how using vanilla Storm would help, maybe someone can offer some guidance?
>>>> 
>>>> Thanks
>>>> Andrew
>>>> 
>>>> 
>>>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:
>>>>> Hello,
>>>>> 
>>>>> I'm new to using Trident and had a few questions about the best way to do things in this framework.  I'm trying to build a real-time streaming aggregation system and Trident seems to have a very easy framework to allow me to do that.  I have a basic setup working, but as I am adding more counters, the performance becomes very slow and eventually I start having many failures.  At the basic level here is what I want to do:
>>>>> 
>>>>> Have an incoming stream that is using a KafkaSpout to read data from. 
>>>>> I take the Kafka stream, parse it and output multiple fields.  
>>>>> I then want many different counters for those fields in the data.  
>>>>> 
>>>>> For example, say it was the twitter stream.  I may want to count:
>>>>> - A counter for each username I come across.  So how many times I have received a tweet from each user
>>>>> - A counter for each hashtag so you know how many tweets mention a hashtag
>>>>> - Binned counters based on date for each tweet (i.e. how many tweets in 2014, June 2014, June 08 2014, etc).  
>>>>> 
>>>>> The list could continue, but this can add up to hundreds of counters running in real time.  Right now I have something like the following:
>>>>> 
>>>>> TridentTopology topology = new TridentTopology();
>>>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>>>                 .each(new Fields("str"), new FieldEmitter(), new Fields("username", "hashtag"));
>>>>> 
>>>>> stream.groupBy(new Fields("username"))
>>>>>                 .persistentAggregate(stateFactory, new Fields("username"), new Count(), new Fields("count"))
>>>>>                 .parallelismHint(6);
>>>>> stream.groupBy(new Fields("hashtag"))
>>>>>                 .persistentAggregate(stateFactory, new Fields("hashtag"), new Count(), new Fields("count"))
>>>>>                 .parallelismHint(6);
>>>>> 
>>>>> Repeat something similar for everything you want to have a unique count for.  
>>>>> 
>>>>> I end up having hundreds of GroupBys each that has an aggregator for each.  I have so far only run this on my local machine and not on a cluster yet, but I'm wondering if this is the correct design for something like this or if there is a better way to distribute this within Trident to make it more efficient.  
>>>>> 
>>>>> Any suggestions would be appreciated!  
>>>>> Thanks!
>>>>> Andrew
>>>> 
>>> 
> 

Re: Trident Binned Aggregation Design Help

Posted by Andrew Serff <an...@serff.net>.
I'm currently using Redis, but I'm by no means tied to it.  Are there any
example of using either to do that?

Andrew


On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> Andrew/Adam,
>
> Partitioning operations like groupBy() form the bolt boundaries in trident
> topologies, so the more you have the more bolts you will have and thus,
> potentially, more network transfer.
>
> What backing store are you using for persistence? If you are using
> something with counter support like HBase or Cassandra you could leverage
> that in combination with tridents exactly once semantics to let it handle
> the counting, and potentially greatly reduce the complexity of your
> topology.
>
> -Taylor
>
> On Jun 17, 2014, at 5:15 PM, Adam Lewis <ma...@adamlewis.com> wrote:
>
> I, too, am eagerly awaiting a reply from the list on this topic.  I hit up
> against max topology size limits doing something similar with trident.
>  There are definitely "linear" changes to a trident topology that result in
> quadratic growth of the "compiled" storm topology size, such as adding DRPC
> spouts.  Sadly the compilation process of trident to plain storm remains
> somewhat opaque to me and I haven't had time to dig deeper.  My work around
> has been to limit myself to one DRPC spout per topology and
> programmatically build multiple topologies for the variations (which
> results in a lot of structural and functional duplication of deployed
> topologies, but at least not code duplication).
>
> Trident presents a seemingly nice abstraction, but from my point of view
> it is a leaky one if I need to understand the compilation process to know
> why adding a single DRPC spout double the topology size.
>
>
>
> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net> wrote:
>
>> Is there no one out there that can help with this?  If I use this
>> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
>> stream and I have like 50 spouts.  All of this adds up to a topology that I
>> can't even submit because it's too large (and i've bumped the trident max
>> to 50mb already...).  It seems like I'm thinking about this wrong, but I
>> haven't be able to come up with another way to do it.  I don't really see
>> how using vanilla Storm would help, maybe someone can offer some guidance?
>>
>> Thanks
>> Andrew
>>
>>
>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:
>>
>>> Hello,
>>>
>>> I'm new to using Trident and had a few questions about the best way to
>>> do things in this framework.  I'm trying to build a real-time streaming
>>> aggregation system and Trident seems to have a very easy framework to allow
>>> me to do that.  I have a basic setup working, but as I am adding more
>>> counters, the performance becomes very slow and eventually I start having
>>> many failures.  At the basic level here is what I want to do:
>>>
>>> Have an incoming stream that is using a KafkaSpout to read data from.
>>> I take the Kafka stream, parse it and output multiple fields.
>>> I then want many different counters for those fields in the data.
>>>
>>> For example, say it was the twitter stream.  I may want to count:
>>> - A counter for each username I come across.  So how many times I have
>>> received a tweet from each user
>>> - A counter for each hashtag so you know how many tweets mention a
>>> hashtag
>>> - Binned counters based on date for each tweet (i.e. how many tweets in
>>> 2014, June 2014, June 08 2014, etc).
>>>
>>> The list could continue, but this can add up to hundreds of counters
>>> running in real time.  Right now I have something like the following:
>>>
>>> TridentTopology topology = new TridentTopology();
>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>                 .each(new Fields("str"), new FieldEmitter(), new
>>> Fields("username", "hashtag"));
>>>
>>> stream.groupBy(new Fields("username"))
>>>                 .persistentAggregate(stateFactory, new
>>> Fields("username"), new Count(), new Fields("count"))
>>>                 .parallelismHint(6);
>>> stream.groupBy(new Fields("hashtag"))
>>>                 .persistentAggregate(stateFactory, new
>>> Fields("hashtag"), new Count(), new Fields("count"))
>>>                 .parallelismHint(6);
>>>
>>> Repeat something similar for everything you want to have a unique count
>>> for.
>>>
>>> I end up having hundreds of GroupBys each that has an aggregator for
>>> each.  I have so far only run this on my local machine and not on a cluster
>>> yet, but I'm wondering if this is the correct design for something like
>>> this or if there is a better way to distribute this within Trident to make
>>> it more efficient.
>>>
>>> Any suggestions would be appreciated!
>>> Thanks!
>>> Andrew
>>>
>>
>>
>

Re: Trident Binned Aggregation Design Help

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Andrew/Adam,

Partitioning operations like groupBy() form the bolt boundaries in trident topologies, so the more you have the more bolts you will have and thus, potentially, more network transfer.

What backing store are you using for persistence? If you are using something with counter support like HBase or Cassandra you could leverage that in combination with tridents exactly once semantics to let it handle the counting, and potentially greatly reduce the complexity of your topology.

-Taylor

> On Jun 17, 2014, at 5:15 PM, Adam Lewis <ma...@adamlewis.com> wrote:
> 
> I, too, am eagerly awaiting a reply from the list on this topic.  I hit up against max topology size limits doing something similar with trident.  There are definitely "linear" changes to a trident topology that result in quadratic growth of the "compiled" storm topology size, such as adding DRPC spouts.  Sadly the compilation process of trident to plain storm remains somewhat opaque to me and I haven't had time to dig deeper.  My work around has been to limit myself to one DRPC spout per topology and programmatically build multiple topologies for the variations (which results in a lot of structural and functional duplication of deployed topologies, but at least not code duplication).
> 
> Trident presents a seemingly nice abstraction, but from my point of view it is a leaky one if I need to understand the compilation process to know why adding a single DRPC spout double the topology size.
> 
> 
> 
>> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net> wrote:
>> Is there no one out there that can help with this?  If I use this paradigm, my topology ends up having like 170 bolts.  Then I add DRPC stream and I have like 50 spouts.  All of this adds up to a topology that I can't even submit because it's too large (and i've bumped the trident max to 50mb already...).  It seems like I'm thinking about this wrong, but I haven't be able to come up with another way to do it.  I don't really see how using vanilla Storm would help, maybe someone can offer some guidance?
>> 
>> Thanks
>> Andrew
>> 
>> 
>>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:
>>> Hello,
>>> 
>>> I'm new to using Trident and had a few questions about the best way to do things in this framework.  I'm trying to build a real-time streaming aggregation system and Trident seems to have a very easy framework to allow me to do that.  I have a basic setup working, but as I am adding more counters, the performance becomes very slow and eventually I start having many failures.  At the basic level here is what I want to do:
>>> 
>>> Have an incoming stream that is using a KafkaSpout to read data from. 
>>> I take the Kafka stream, parse it and output multiple fields.  
>>> I then want many different counters for those fields in the data.  
>>> 
>>> For example, say it was the twitter stream.  I may want to count:
>>> - A counter for each username I come across.  So how many times I have received a tweet from each user
>>> - A counter for each hashtag so you know how many tweets mention a hashtag
>>> - Binned counters based on date for each tweet (i.e. how many tweets in 2014, June 2014, June 08 2014, etc).  
>>> 
>>> The list could continue, but this can add up to hundreds of counters running in real time.  Right now I have something like the following:
>>> 
>>> TridentTopology topology = new TridentTopology();
>>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>>> Stream stream = topology.newStream("messages", spout).shuffle()
>>>                 .each(new Fields("str"), new FieldEmitter(), new Fields("username", "hashtag"));
>>> 
>>> stream.groupBy(new Fields("username"))
>>>                 .persistentAggregate(stateFactory, new Fields("username"), new Count(), new Fields("count"))
>>>                 .parallelismHint(6);
>>> stream.groupBy(new Fields("hashtag"))
>>>                 .persistentAggregate(stateFactory, new Fields("hashtag"), new Count(), new Fields("count"))
>>>                 .parallelismHint(6);
>>> 
>>> Repeat something similar for everything you want to have a unique count for.  
>>> 
>>> I end up having hundreds of GroupBys each that has an aggregator for each.  I have so far only run this on my local machine and not on a cluster yet, but I'm wondering if this is the correct design for something like this or if there is a better way to distribute this within Trident to make it more efficient.  
>>> 
>>> Any suggestions would be appreciated!  
>>> Thanks!
>>> Andrew
> 

Re: Trident Binned Aggregation Design Help

Posted by Adam Lewis <ma...@adamlewis.com>.
I, too, am eagerly awaiting a reply from the list on this topic.  I hit up
against max topology size limits doing something similar with trident.
 There are definitely "linear" changes to a trident topology that result in
quadratic growth of the "compiled" storm topology size, such as adding DRPC
spouts.  Sadly the compilation process of trident to plain storm remains
somewhat opaque to me and I haven't had time to dig deeper.  My work around
has been to limit myself to one DRPC spout per topology and
programmatically build multiple topologies for the variations (which
results in a lot of structural and functional duplication of deployed
topologies, but at least not code duplication).

Trident presents a seemingly nice abstraction, but from my point of view it
is a leaky one if I need to understand the compilation process to know why
adding a single DRPC spout double the topology size.



On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <an...@serff.net> wrote:

> Is there no one out there that can help with this?  If I use this
> paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
> stream and I have like 50 spouts.  All of this adds up to a topology that I
> can't even submit because it's too large (and i've bumped the trident max
> to 50mb already...).  It seems like I'm thinking about this wrong, but I
> haven't be able to come up with another way to do it.  I don't really see
> how using vanilla Storm would help, maybe someone can offer some guidance?
>
> Thanks
> Andrew
>
>
> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:
>
>> Hello,
>>
>> I'm new to using Trident and had a few questions about the best way to do
>> things in this framework.  I'm trying to build a real-time streaming
>> aggregation system and Trident seems to have a very easy framework to allow
>> me to do that.  I have a basic setup working, but as I am adding more
>> counters, the performance becomes very slow and eventually I start having
>> many failures.  At the basic level here is what I want to do:
>>
>> Have an incoming stream that is using a KafkaSpout to read data from.
>> I take the Kafka stream, parse it and output multiple fields.
>> I then want many different counters for those fields in the data.
>>
>> For example, say it was the twitter stream.  I may want to count:
>> - A counter for each username I come across.  So how many times I have
>> received a tweet from each user
>> - A counter for each hashtag so you know how many tweets mention a hashtag
>> - Binned counters based on date for each tweet (i.e. how many tweets in
>> 2014, June 2014, June 08 2014, etc).
>>
>> The list could continue, but this can add up to hundreds of counters
>> running in real time.  Right now I have something like the following:
>>
>> TridentTopology topology = new TridentTopology();
>> KafkaSpout spout = new KafkaSpout(kafkaConfig);
>> Stream stream = topology.newStream("messages", spout).shuffle()
>>                 .each(new Fields("str"), new FieldEmitter(), new
>> Fields("username", "hashtag"));
>>
>> stream.groupBy(new Fields("username"))
>>                 .persistentAggregate(stateFactory, new
>> Fields("username"), new Count(), new Fields("count"))
>>                 .parallelismHint(6);
>> stream.groupBy(new Fields("hashtag"))
>>                 .persistentAggregate(stateFactory, new Fields("hashtag"),
>> new Count(), new Fields("count"))
>>                 .parallelismHint(6);
>>
>> Repeat something similar for everything you want to have a unique count
>> for.
>>
>> I end up having hundreds of GroupBys each that has an aggregator for
>> each.  I have so far only run this on my local machine and not on a cluster
>> yet, but I'm wondering if this is the correct design for something like
>> this or if there is a better way to distribute this within Trident to make
>> it more efficient.
>>
>> Any suggestions would be appreciated!
>> Thanks!
>> Andrew
>>
>
>

Re: Trident Binned Aggregation Design Help

Posted by Andrew Serff <an...@serff.net>.
Is there no one out there that can help with this?  If I use this paradigm,
my topology ends up having like 170 bolts.  Then I add DRPC stream and I
have like 50 spouts.  All of this adds up to a topology that I can't even
submit because it's too large (and i've bumped the trident max to 50mb
already...).  It seems like I'm thinking about this wrong, but I haven't be
able to come up with another way to do it.  I don't really see how using
vanilla Storm would help, maybe someone can offer some guidance?

Thanks
Andrew


On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <an...@serff.net> wrote:

> Hello,
>
> I'm new to using Trident and had a few questions about the best way to do
> things in this framework.  I'm trying to build a real-time streaming
> aggregation system and Trident seems to have a very easy framework to allow
> me to do that.  I have a basic setup working, but as I am adding more
> counters, the performance becomes very slow and eventually I start having
> many failures.  At the basic level here is what I want to do:
>
> Have an incoming stream that is using a KafkaSpout to read data from.
> I take the Kafka stream, parse it and output multiple fields.
> I then want many different counters for those fields in the data.
>
> For example, say it was the twitter stream.  I may want to count:
> - A counter for each username I come across.  So how many times I have
> received a tweet from each user
> - A counter for each hashtag so you know how many tweets mention a hashtag
> - Binned counters based on date for each tweet (i.e. how many tweets in
> 2014, June 2014, June 08 2014, etc).
>
> The list could continue, but this can add up to hundreds of counters
> running in real time.  Right now I have something like the following:
>
> TridentTopology topology = new TridentTopology();
> KafkaSpout spout = new KafkaSpout(kafkaConfig);
> Stream stream = topology.newStream("messages", spout).shuffle()
>                 .each(new Fields("str"), new FieldEmitter(), new
> Fields("username", "hashtag"));
>
> stream.groupBy(new Fields("username"))
>                 .persistentAggregate(stateFactory, new Fields("username"),
> new Count(), new Fields("count"))
>                 .parallelismHint(6);
> stream.groupBy(new Fields("hashtag"))
>                 .persistentAggregate(stateFactory, new Fields("hashtag"),
> new Count(), new Fields("count"))
>                 .parallelismHint(6);
>
> Repeat something similar for everything you want to have a unique count
> for.
>
> I end up having hundreds of GroupBys each that has an aggregator for each.
>  I have so far only run this on my local machine and not on a cluster yet,
> but I'm wondering if this is the correct design for something like this or
> if there is a better way to distribute this within Trident to make it more
> efficient.
>
> Any suggestions would be appreciated!
> Thanks!
> Andrew
>