You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/08/15 18:38:10 UTC
[2/7] storm git commit: STORM-1928 ShellSpout should only check
heartbeat while ShellSpout is waiting for subprocess to sync
STORM-1928 ShellSpout should only check heartbeat while ShellSpout is waiting for subprocess to sync
* only check heartbeat while ShellSpout is waiting for subprocess to sync
* change ShellSpout.die logic similar to ShellBolt
* ShellSpout's die message is logged but worker is not died after logging
* Confirmed that worker becomes died when die has been called
* rename method to more meaningful name
* mark waiting subprocess first to check heartbeat while also sending spout message to subprocess
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1d78c7f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1d78c7f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1d78c7f
Branch: refs/heads/0.9.x-branch
Commit: c1d78c7faa56547a7851aec5f807049907e34d4e
Parents: 6127edc
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 28 00:07:59 2016 +0900
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Jul 20 14:31:57 2016 -0400
----------------------------------------------------------------------
.../jvm/backtype/storm/spout/ShellSpout.java | 54 ++++++++++++++++----
1 file changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c1d78c7f/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..9198862 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -31,6 +31,7 @@ import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import clojure.lang.RT;
@@ -45,7 +46,9 @@ public class ShellSpout implements ISpout {
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
-
+ private volatile boolean _running = true;
+ private volatile RuntimeException _exception;
+
private TopologyContext _context;
private SpoutMsg _spoutMsg;
@@ -53,6 +56,7 @@ public class ShellSpout implements ISpout {
private int workerTimeoutMills;
private ScheduledExecutorService heartBeatExecutorService;
private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+ private AtomicBoolean waitingOnSubprocess = new AtomicBoolean(false);
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
@@ -80,9 +84,14 @@ public class ShellSpout implements ISpout {
public void close() {
heartBeatExecutorService.shutdownNow();
_process.destroy();
+ _running = false;
}
public void nextTuple() {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -92,6 +101,10 @@ public class ShellSpout implements ISpout {
}
public void ack(Object msgId) {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -101,6 +114,10 @@ public class ShellSpout implements ISpout {
}
public void fail(Object msgId) {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -139,6 +156,7 @@ public class ShellSpout implements ISpout {
private void querySubprocess() {
try {
+ markWaitingSubprocess();
_process.writeSpoutMsg(_spoutMsg);
while (true) {
@@ -178,9 +196,12 @@ public class ShellSpout implements ISpout {
} catch (Exception e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException(processInfo, e);
+ } finally {
+ completedWaitingSubprocess();
}
}
+
private void handleLog(ShellMsg shellMsg) {
String msg = shellMsg.getMsg();
msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
@@ -233,13 +254,25 @@ public class ShellSpout implements ISpout {
return lastHeartbeatTimestamp.get();
}
- private void die(Throwable exception) {
- heartBeatExecutorService.shutdownNow();
+ private void markWaitingSubprocess() {
+ waitingOnSubprocess.compareAndSet(false, true);
+ }
+
+ private void completedWaitingSubprocess() {
+ waitingOnSubprocess.compareAndSet(true, false);
+ }
- LOG.error("Halting process: ShellSpout died.", exception);
+ private void die(Throwable exception) {
+ String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+ _exception = new RuntimeException(processInfo, exception);
+ String message = String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s",
+ Arrays.toString(_command),
+ processInfo);
+ LOG.error(message, exception);
_collector.reportError(exception);
- _process.destroy();
- System.exit(11);
+ if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
+ System.exit(11);
+ }
}
private class SpoutHeartbeatTimerTask extends TimerTask {
@@ -251,13 +284,14 @@ public class ShellSpout implements ISpout {
@Override
public void run() {
- long currentTimeMillis = System.currentTimeMillis();
long lastHeartbeat = getLastHeartbeat();
+ long currentTimestamp = System.currentTimeMillis();
+ boolean isWaitingOnSubprocess = waitingOnSubprocess.get();
- LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
- currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+ LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}",
+ lastHeartbeat, isWaitingOnSubprocess, workerTimeoutMills);
- if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+ if (isWaitingOnSubprocess && currentTimestamp - lastHeartbeat > workerTimeoutMills) {
spout.die(new RuntimeException("subprocess heartbeat timeout"));
}
}