You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Susana González <su...@gmail.com> on 2014/03/10 16:52:24 UTC

How to define grouping in a Topology with an 'a priori' unknown number of streams to subscribe?

Hi,


I need help to go from a simple Storm topology like this:



TopologyBuilder builder = *new* TopologyBuilder();

builder.setSpout("mySpout", spout, spoutParallelism);

builder.setBolt("myBoltA", boltA, boltAParallelism).shuffleGrouping(
"mySpout");

builder.setBolt("myBoltB", boltB, boltBParallelism).shuffleGrouping(
"myBoltA");



to a new topology  where there are several processes myBoltA* whose emitted
tuples need to be processed by a same process boltB.


The problem is that the number of processes myBoltA* I have is read from a
configuration file when the topology is started, so I don't know them a
priori to define the grouping in the code just with:



builder.setBolt("myBoltB", boltB, boltBParallelism)

   .shuffleGrouping("myBoltA1")

   .shuffleGrouping("myBoltA2")

   .etc...

   .shuffleGrouping("myBoltAn");



I've searched if it's possible to do it using a CustomStreamGrouping or
using Trident... but I haven't found how to implement it yet.



Any idea?



Thanks in advance!

     Susana

Re: How to define grouping in a Topology with an 'a priori' unknown number of streams to subscribe?

Posted by Susana González <su...@gmail.com>.
Thanks a lot Nathan!


On Mon, Mar 10, 2014 at 4:56 PM, Nathan Leung <nc...@gmail.com> wrote:

> You can do something like
>
> BoltDeclarer bd = builder.setBolt("myBoltB", boltB, boltBparallelism);
> for (int i = 1; i < numInstances; ++i) {
>     bd.shuffleGrouping("myBoltA" + i);
> }
>
>
> On Mon, Mar 10, 2014 at 11:52 AM, Susana González <su...@gmail.com>wrote:
>
>> Hi,
>>
>>
>> I need help to go from a simple Storm topology like this:
>>
>>
>>
>> TopologyBuilder builder = *new* TopologyBuilder();
>>
>> builder.setSpout("mySpout", spout, spoutParallelism);
>>
>> builder.setBolt("myBoltA", boltA, boltAParallelism).shuffleGrouping(
>> "mySpout");
>>
>> builder.setBolt("myBoltB", boltB, boltBParallelism).shuffleGrouping(
>> "myBoltA");
>>
>>
>>
>> to a new topology  where there are several processes myBoltA* whose
>> emitted tuples need to be processed by a same process boltB.
>>
>>
>> The problem is that the number of processes myBoltA* I have is read from
>> a configuration file when the topology is started, so I don't know them a
>> priori to define the grouping in the code just with:
>>
>>
>>
>> builder.setBolt("myBoltB", boltB, boltBParallelism)
>>
>>    .shuffleGrouping("myBoltA1")
>>
>>    .shuffleGrouping("myBoltA2")
>>
>>    .etc...
>>
>>    .shuffleGrouping("myBoltAn");
>>
>>
>>
>> I've searched if it's possible to do it using a CustomStreamGrouping or
>> using Trident... but I haven't found how to implement it yet.
>>
>>
>>
>> Any idea?
>>
>>
>>
>> Thanks in advance!
>>
>>      Susana
>>
>>
>>
>

Re: How to define grouping in a Topology with an 'a priori' unknown number of streams to subscribe?

Posted by Nathan Leung <nc...@gmail.com>.
You can do something like

BoltDeclarer bd = builder.setBolt("myBoltB", boltB, boltBparallelism);
for (int i = 1; i < numInstances; ++i) {
    bd.shuffleGrouping("myBoltA" + i);
}


On Mon, Mar 10, 2014 at 11:52 AM, Susana González <su...@gmail.com>wrote:

> Hi,
>
>
> I need help to go from a simple Storm topology like this:
>
>
>
> TopologyBuilder builder = *new* TopologyBuilder();
>
> builder.setSpout("mySpout", spout, spoutParallelism);
>
> builder.setBolt("myBoltA", boltA, boltAParallelism).shuffleGrouping(
> "mySpout");
>
> builder.setBolt("myBoltB", boltB, boltBParallelism).shuffleGrouping(
> "myBoltA");
>
>
>
> to a new topology  where there are several processes myBoltA* whose
> emitted tuples need to be processed by a same process boltB.
>
>
> The problem is that the number of processes myBoltA* I have is read from a
> configuration file when the topology is started, so I don't know them a
> priori to define the grouping in the code just with:
>
>
>
> builder.setBolt("myBoltB", boltB, boltBParallelism)
>
>    .shuffleGrouping("myBoltA1")
>
>    .shuffleGrouping("myBoltA2")
>
>    .etc...
>
>    .shuffleGrouping("myBoltAn");
>
>
>
> I've searched if it's possible to do it using a CustomStreamGrouping or
> using Trident... but I haven't found how to implement it yet.
>
>
>
> Any idea?
>
>
>
> Thanks in advance!
>
>      Susana
>
>
>