You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by 최범균 <ma...@madvirus.net> on 2014/06/12 03:52:14 UTC

which partition strategy storm use when groupby without no partition and parallelismHint()

I wrote code following:

topology.newStream("log", new LogSpout())
.each(new Fields("shopLog"), new AddGroupingValueFunction(), new
Fields("productId:time"))
*.groupBy(new Fields("productId:time")) // how partition tuple*
*.aggregate(new CountAggregator(), new Fields("count")) // 3 instance*
.each(new Fields("productId:time", "count"), new CountSumFunction(), new
Fields("sum"))
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.*parallelismHint(3)*;

And, I found that three CountAggregator instance was created.

How storm partition grouped tuple after groupBy and before aggregate?

Re: which partition strategy storm use when groupby without no partition and parallelismHint()

Posted by 최범균 <ma...@madvirus.net>.
So,, Which partition strategy Storm choose when no partion strategy
specified between groupBy and aggregate?
shuffle? partitionBy(fields)? or else?

*.groupBy(new Fields("productId:time")) // no partition strategy*
*.aggregate(new CountAggregator(), new Fields("count")) // how storm choose
partition which receive grouped tuples?*
.each(new Fields("productId:time", "count"), new CountSumFunction(), new
Fields("sum"))
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.*parallelismHint(3)*;





2014-06-13 0:11 GMT+09:00 Romain Leroux <le...@gmail.com>:

> This compiles 3 bolts because you set .parallelismHint(3); after aggregate
> (important operation, batch size matters)
>
> See
> https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams
>
> .each() operations are not concerned by batch size and thus they are going
> to be included in the next bolt generated
>
>
> 2014-06-12 10:52 GMT+09:00 최범균 <ma...@madvirus.net>:
>
> I wrote code following:
>>
>> topology.newStream("log", new LogSpout())
>> .each(new Fields("shopLog"), new AddGroupingValueFunction(), new
>> Fields("productId:time"))
>>  *.groupBy(new Fields("productId:time")) // how partition tuple*
>> *.aggregate(new CountAggregator(), new Fields("count")) // 3 instance*
>>  .each(new Fields("productId:time", "count"), new CountSumFunction(),
>> new Fields("sum"))
>> .each(new Fields("productId:time", "sum"), new ThresholdFilter())
>>  .*parallelismHint(3)*;
>>
>> And, I found that three CountAggregator instance was created.
>>
>> How storm partition grouped tuple after groupBy and before aggregate?
>>
>>
>

Re: which partition strategy storm use when groupby without no partition and parallelismHint()

Posted by Romain Leroux <le...@gmail.com>.
This compiles 3 bolts because you set .parallelismHint(3); after aggregate
(important operation, batch size matters)

See
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams

.each() operations are not concerned by batch size and thus they are going
to be included in the next bolt generated


2014-06-12 10:52 GMT+09:00 최범균 <ma...@madvirus.net>:

> I wrote code following:
>
> topology.newStream("log", new LogSpout())
> .each(new Fields("shopLog"), new AddGroupingValueFunction(), new
> Fields("productId:time"))
>  *.groupBy(new Fields("productId:time")) // how partition tuple*
> *.aggregate(new CountAggregator(), new Fields("count")) // 3 instance*
>  .each(new Fields("productId:time", "count"), new CountSumFunction(), new
> Fields("sum"))
> .each(new Fields("productId:time", "sum"), new ThresholdFilter())
>  .*parallelismHint(3)*;
>
> And, I found that three CountAggregator instance was created.
>
> How storm partition grouped tuple after groupBy and before aggregate?
>
>