You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Dimitris Sarlis <sa...@gmail.com> on 2015/07/24 11:05:52 UTC

Storm topology hangs up

Hello,

I'm trying to run a topology containing some spouts that take input from 
a Kafka queue and redirect that input to a pool of bolts. After just a 
few records have been sent, the topology seems to hang up and no more 
messages can be digested. I've set the TOPOLOGY_MAX_SPOUT_PENDING to 500 
and approximately, when the first 500 messages are sent, the bolts stop 
processing other records. However, through Ganglia I can see that each 
worker has a CPU utilization of ~20% even though it seems that bolts 
don't perform any processing.

My configuration is as follows: 1 master running the nimbus and 14 
slaves running 4 supervisor slots each. I deploy 14 spouts and 42 bolts 
using the following code:

for (int i = 1; i <= boltNo; i ++) {
       bd = builder.setBolt("worker" + i, new ExclamationBolt(boltNo), 
boltPar);
       for (int j = 1; j <= spoutNo; j++)
             bd = bd.shuffleGrouping("words" + j);
       for (int k = 1; k <= boltNo; k++) {
             bd = bd.directGrouping("worker" + k);
      }
}

Essentially, spouts shuffle records to bolts and each bolt is connected 
with a direct grouping with the other bolts.

The execute method of my ExclamationBolt is:

public void execute(Tuple tuple) {
         if (!tuple.getString(0).contains("!")) {
             Random ran = new Random();
             int worker = ran.nextInt(boltNo) + 1;
             List<Integer> l = _topo.getComponentTasks("worker" + worker);
             LOG.info("List: " + l + " for worker: " + worker);
             String out = tuple.getString(0) + "!";
             LOG.info("Sending record to appropriate worker: worker" + 
worker);
             _collector.emitDirect(l.get(0), new Values(out));
         }
         else {
             LOG.info("Already processed this record");
         }
         _collector.ack(tuple);
}

As you can see, if the record does not contain a "!", I append a "!" and 
send it to a random worker (performing one hop) or else I just print a 
log message and proceed to the next record.

Any ideas what might be wrong with this scenario?

Thanks,
Dimitris Sarlis