You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Navin Ipe <na...@searchlighthealth.com> on 2016/06/09 06:48:56 UTC

Dynamically linking Spouts to Bolts

Hi,

I've already changed the design of my code to solve the problem, but just
wanted to ask if the problem could have been solved alternatively.

*The problem:*
Wanted to create x number of spouts that would each handle a batch of
records from a database. First spout handles batch 1 to 1000. Second spout
handles 1001 to 2000 etc.
The problem is if x spouts are created dynamically, then how do we assign
the output of all the spouts to the bolts?

*Solution that didn't work:*
A colleague suggested

String s = "sp";
*for(*Integer ordinal = 1; ordinal <= *numSpouts*; ++ordinal) *{*
    String spoutName = s + ordinal.toString();
    builder.setSpout(spoutName, new mdSpout(), 1)
        .setNumTasks(1);
*}*

*BoltDeclarer boltDec = builder.setBolt(cgBolt, new CBolt(), thr);*
*for(*int i=0; i < *numSpouts*; i++) *{*
    *boltDec.*fieldsGrouping(s+Integer.toString(i), new
Fields(configRef.BID))
            .allGrouping(s+Integer.toString(i), configRef.ID);
*}*

*But this gave an invalid topology exception.*


*Solution I eventually had to use:*
*for(*Integer ordinal = 1; ordinal <= numSpouts; ++ordinal) *{*
    String spoutName = s + ordinal.toString();
    builder.setSpout(spoutName, new mdSpout(), 1)
            .setNumTasks(1);
*}*

*switch(numSpouts) {*
    case 1:
        builder.setBolt(cgBolt, new CBolt(), thr)
                //.setNumTasks(3)
                .fieldsGrouping(s+"1", new Fields(configRef.BID))
                .allGrouping(s+"1", configRef.ID);
        break;
    case 2:
        builder.setBolt(cgBolt, new CBolt(), thr)
                .fieldsGrouping(s+"1", new Fields(configRef.BID))
                .allGrouping(s+"1", ID)
                .fieldsGrouping(s+"2", new Fields(configRef.BID))
                .allGrouping(s+"2", configRef.ID);
        break;
    case 3:
        builder.setBolt(cgBolt, new CBolt(), thr)
                .fieldsGrouping(s+"1", new Fields(configRef.BID))
                .allGrouping(s+"1", configRef.ID)
                .fieldsGrouping(s+"2", new Fields(configRef.BID))
                .allGrouping(s+"2", configRef.ID)
                .fieldsGrouping(s+"3", new Fields(configRef.BID))
                .allGrouping(s+"3", configRef.ID);
        break;
*}*

This worked, but obviously there should be a better way to do this, right?

ps: Eventually I used a different method of sending the batches to the
spouts so that the spouts didn't have to be created with a for loop, but am
still curious about how to solve the above problem.

-- 
Regards,
Navin