You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Toby Hobson <to...@gmail.com> on 2014/11/16 21:47:23 UTC
Shutting down long running bolts
I'm trying to shutdown a long running bolt (based on the sample
ExclamationBolt) but it seems the cluster.killTopology() call is only
interrupting one task and continuing to execute other tasks. To simplify
things I am only running one bolt in the topology. This is the bolt code I
am running (locally):
@Override
public void execute(Tuple tuple) {
logger.info("EXECUTE");
try {
Thread.sleep(2000);
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
} catch (InterruptedException e) {
logger.info("Thread Interrupted");
}
}
In the logs I see something like this
7927 [Thread-12-exclaim1] INFO ThreadedExclamationBolt - EXECUTE
...
16572 [Thread-8-exclaim1] INFO ThreadedExclamationBolt - Thread Interrupted
16572 [Thread-8-exclaim1] INFO ThreadedExclamationBolt - EXECUTE
...
17950 [Thread-10-exclaim1] INFO ThreadedExclamationBolt - EXECUTE
17958 [Thread-12-exclaim1] INFO ThreadedExclamationBolt - EXECUTE
The only workaround I have found is to set an instance variable when I
catch the single InterruptedException i.e.
public class ThreadedExclamationBolt extends BaseRichBolt {
...
private AtomicBoolean sleepInterrupted = new AtomicBoolean(false);
@Override
public void execute(Tuple tuple) {
try {
if (sleepInterrupted.get())
return;
Thread.sleep(2000);
_collector.emit(tuple, new Values(tuple.getString(0)));
_collector.ack(tuple);
} catch (InterruptedException e) {
sleepInterrupted.set(true);
}
}
But this doesn't seem right to me, what am I missing here?