You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Olivier Mallassi <ol...@gmail.com> on 2015/04/01 17:15:47 UTC

groupBy / persistentAggregate and "distribution"

Hi all

I would like to better understand how groupBy/persistentAggregate works in
a "distributed environment"

I got the following topology (just an extract)

Stream currStream = topology.newStream(this.getTopologyName(), jmsspout)
...
currStream.groupBy(new Fields(A,B))

 .persistentAggregate(myDistributedCacheStateFactory(hosts),
                            new Fields(C),
                            (CombinerAggregator) sum,
                            new Fields("output"))
                    .parallelismHint(parallHintValue);

MyDistributedCacheStateFactory is an implementation of MapState (should
work because of the groupBy) that persist into a distributed cache. to keep
it simple, an entry is identified by the value of [A,B]

So as far as I understand, my batch will be grouped by [A, B] and my
CombinerAggregator will do a sum on all the objects C.

1/ correct me if I am wrong but in a distributed env, I will have several
instances of MyDistributedCacheStateFactory executed in different JVM or
threads

2/ Does Storm guarantee me that each "groupBy" will always go to the same
"thread"? or not?
if not, how can I ensure a "select for update where key=[A,B]"? how can I
ensure I will not update the same entry of the distributed cache from two
different threads (ie. a kind of race conditions)

3/ is it a better option to use groupBy().aggregate().partitionPersist()?

Hope my questions are not too blur.

Regards.

Re: groupBy / persistentAggregate and "distribution"

Posted by Olivier Mallassi <ol...@gmail.com>.
looks like this article
https://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
gave me some answers.

On Wed, Apr 1, 2015 at 5:15 PM, Olivier Mallassi <olivier.mallassi@gmail.com
> wrote:

> Hi all
>
> I would like to better understand how groupBy/persistentAggregate works in
> a "distributed environment"
>
> I got the following topology (just an extract)
>
> Stream currStream = topology.newStream(this.getTopologyName(), jmsspout)
> ...
> currStream.groupBy(new Fields(A,B))
>
>  .persistentAggregate(myDistributedCacheStateFactory(hosts),
>                             new Fields(C),
>                             (CombinerAggregator) sum,
>                             new Fields("output"))
>                     .parallelismHint(parallHintValue);
>
> MyDistributedCacheStateFactory is an implementation of MapState (should
> work because of the groupBy) that persist into a distributed cache. to keep
> it simple, an entry is identified by the value of [A,B]
>
> So as far as I understand, my batch will be grouped by [A, B] and my
> CombinerAggregator will do a sum on all the objects C.
>
> 1/ correct me if I am wrong but in a distributed env, I will have several
> instances of MyDistributedCacheStateFactory executed in different JVM or
> threads
>
> 2/ Does Storm guarantee me that each "groupBy" will always go to the same
> "thread"? or not?
> if not, how can I ensure a "select for update where key=[A,B]"? how can I
> ensure I will not update the same entry of the distributed cache from two
> different threads (ie. a kind of race conditions)
>
> 3/ is it a better option to use groupBy().aggregate().partitionPersist()?
>
> Hope my questions are not too blur.
>
> Regards.
>