You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/08/15 18:38:10 UTC

[2/7] storm git commit: STORM-1928 ShellSpout should only check heartbeat while ShellSpout is waiting for subprocess to sync

STORM-1928 ShellSpout should only check heartbeat while ShellSpout is waiting for subprocess to sync

* only check heartbeat while ShellSpout is waiting for subprocess to sync
* change ShellSpout.die logic similar to ShellBolt
  * ShellSpout's die message is logged but worker is not died after logging
  * Confirmed that worker becomes died when die has been called
* rename method to more meaningful name
* mark waiting subprocess first to check heartbeat while also sending spout message to subprocess


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1d78c7f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1d78c7f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1d78c7f

Branch: refs/heads/0.9.x-branch
Commit: c1d78c7faa56547a7851aec5f807049907e34d4e
Parents: 6127edc
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 28 00:07:59 2016 +0900
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Jul 20 14:31:57 2016 -0400

----------------------------------------------------------------------
 .../jvm/backtype/storm/spout/ShellSpout.java    | 54 ++++++++++++++++----
 1 file changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c1d78c7f/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..9198862 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -31,6 +31,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import clojure.lang.RT;
@@ -45,7 +46,9 @@ public class ShellSpout implements ISpout {
     private SpoutOutputCollector _collector;
     private String[] _command;
     private ShellProcess _process;
-    
+    private volatile boolean _running = true;
+    private volatile RuntimeException _exception;
+
     private TopologyContext _context;
     
     private SpoutMsg _spoutMsg;
@@ -53,6 +56,7 @@ public class ShellSpout implements ISpout {
     private int workerTimeoutMills;
     private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+    private AtomicBoolean waitingOnSubprocess = new AtomicBoolean(false);
 
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -80,9 +84,14 @@ public class ShellSpout implements ISpout {
     public void close() {
         heartBeatExecutorService.shutdownNow();
         _process.destroy();
+        _running = false;
     }
 
     public void nextTuple() {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -92,6 +101,10 @@ public class ShellSpout implements ISpout {
     }
 
     public void ack(Object msgId) {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -101,6 +114,10 @@ public class ShellSpout implements ISpout {
     }
 
     public void fail(Object msgId) {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -139,6 +156,7 @@ public class ShellSpout implements ISpout {
 
     private void querySubprocess() {
         try {
+            markWaitingSubprocess();
             _process.writeSpoutMsg(_spoutMsg);
 
             while (true) {
@@ -178,9 +196,12 @@ public class ShellSpout implements ISpout {
         } catch (Exception e) {
             String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
             throw new RuntimeException(processInfo, e);
+        } finally {
+            completedWaitingSubprocess();
         }
     }
 
+
     private void handleLog(ShellMsg shellMsg) {
         String msg = shellMsg.getMsg();
         msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
@@ -233,13 +254,25 @@ public class ShellSpout implements ISpout {
         return lastHeartbeatTimestamp.get();
     }
 
-    private void die(Throwable exception) {
-        heartBeatExecutorService.shutdownNow();
+    private void markWaitingSubprocess() {
+        waitingOnSubprocess.compareAndSet(false, true);
+    }
+
+    private void completedWaitingSubprocess() {
+        waitingOnSubprocess.compareAndSet(true, false);
+    }
 
-        LOG.error("Halting process: ShellSpout died.", exception);
+    private void die(Throwable exception) {
+        String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+        _exception = new RuntimeException(processInfo, exception);
+        String message = String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s",
+            Arrays.toString(_command),
+            processInfo);
+        LOG.error(message, exception);
         _collector.reportError(exception);
-        _process.destroy();
-        System.exit(11);
+        if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
+            System.exit(11);
+        }
     }
 
     private class SpoutHeartbeatTimerTask extends TimerTask {
@@ -251,13 +284,14 @@ public class ShellSpout implements ISpout {
 
         @Override
         public void run() {
-            long currentTimeMillis = System.currentTimeMillis();
             long lastHeartbeat = getLastHeartbeat();
+            long currentTimestamp = System.currentTimeMillis();
+            boolean isWaitingOnSubprocess = waitingOnSubprocess.get();
 
-            LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
-                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+            LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}",
+                    lastHeartbeat, isWaitingOnSubprocess, workerTimeoutMills);
 
-            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+            if (isWaitingOnSubprocess && currentTimestamp - lastHeartbeat > workerTimeoutMills) {
                 spout.die(new RuntimeException("subprocess heartbeat timeout"));
             }
         }