You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Toby Hobson <to...@gmail.com> on 2014/11/16 21:47:23 UTC

Shutting down long running bolts

I'm trying to shutdown a long running bolt (based on the sample
ExclamationBolt) but it seems the cluster.killTopology() call is only
interrupting one task and continuing to execute other tasks. To simplify
things I am only running one bolt in the topology. This is the bolt code I
am running (locally):

@Override
public void execute(Tuple tuple) {
    logger.info("EXECUTE");
    try {
        Thread.sleep(2000);
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    } catch (InterruptedException e) {
        logger.info("Thread Interrupted");
    }
}

In the logs I see something like this

7927 [Thread-12-exclaim1] INFO  ThreadedExclamationBolt - EXECUTE
...
16572 [Thread-8-exclaim1] INFO  ThreadedExclamationBolt - Thread Interrupted
16572 [Thread-8-exclaim1] INFO  ThreadedExclamationBolt - EXECUTE
...
17950 [Thread-10-exclaim1] INFO ThreadedExclamationBolt - EXECUTE
17958 [Thread-12-exclaim1] INFO ThreadedExclamationBolt - EXECUTE

The only workaround I have found is to set an instance variable when I
catch the single InterruptedException i.e.

public class ThreadedExclamationBolt extends BaseRichBolt {
...
private AtomicBoolean sleepInterrupted = new AtomicBoolean(false);

@Override
public void execute(Tuple tuple) {
    try {
        if (sleepInterrupted.get())
            return;

        Thread.sleep(2000);
        _collector.emit(tuple, new Values(tuple.getString(0)));
        _collector.ack(tuple);
    } catch (InterruptedException e) {
        sleepInterrupted.set(true);
    }
}

But this doesn't seem right to me, what am I missing here?