You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Dimitris Sarlis <sa...@gmail.com> on 2015/07/24 11:05:52 UTC
Storm topology hangs up
Hello,
I'm trying to run a topology containing some spouts that take input from
a Kafka queue and redirect that input to a pool of bolts. After just a
few records have been sent, the topology seems to hang up and no more
messages can be digested. I've set the TOPOLOGY_MAX_SPOUT_PENDING to 500
and approximately, when the first 500 messages are sent, the bolts stop
processing other records. However, through Ganglia I can see that each
worker has a CPU utilization of ~20% even though it seems that bolts
don't perform any processing.
My configuration is as follows: 1 master running the nimbus and 14
slaves running 4 supervisor slots each. I deploy 14 spouts and 42 bolts
using the following code:
for (int i = 1; i <= boltNo; i ++) {
bd = builder.setBolt("worker" + i, new ExclamationBolt(boltNo),
boltPar);
for (int j = 1; j <= spoutNo; j++)
bd = bd.shuffleGrouping("words" + j);
for (int k = 1; k <= boltNo; k++) {
bd = bd.directGrouping("worker" + k);
}
}
Essentially, spouts shuffle records to bolts and each bolt is connected
with a direct grouping with the other bolts.
The execute method of my ExclamationBolt is:
public void execute(Tuple tuple) {
if (!tuple.getString(0).contains("!")) {
Random ran = new Random();
int worker = ran.nextInt(boltNo) + 1;
List<Integer> l = _topo.getComponentTasks("worker" + worker);
LOG.info("List: " + l + " for worker: " + worker);
String out = tuple.getString(0) + "!";
LOG.info("Sending record to appropriate worker: worker" +
worker);
_collector.emitDirect(l.get(0), new Values(out));
}
else {
LOG.info("Already processed this record");
}
_collector.ack(tuple);
}
As you can see, if the record does not contain a "!", I append a "!" and
send it to a random worker (performing one hop) or else I just print a
log message and proceed to the next record.
Any ideas what might be wrong with this scenario?
Thanks,
Dimitris Sarlis