You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/08/17 18:05:16 UTC

[1/6] storm git commit: STORM-1928 ShellSpout should only check heartbeat while ShellSpout is waiting for subprocess to sync

Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch a29898843 -> 711174585


STORM-1928 ShellSpout should only check heartbeat while ShellSpout is waiting for subprocess to sync

* only check heartbeat while ShellSpout is waiting for subprocess to sync
* change ShellSpout.die logic similar to ShellBolt
  * ShellSpout's die message is logged but worker is not died after logging
  * Confirmed that worker becomes died when die has been called
* rename method to more meaningful name
* mark waiting subprocess first to check heartbeat while also sending spout message to subprocess

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd070345
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd070345
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd070345

Branch: refs/heads/0.10.x-branch
Commit: cd070345665979ef52b5310252c555d5f29c0f23
Parents: a51a289
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 28 00:07:59 2016 +0900
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:21:32 2016 -0400

----------------------------------------------------------------------
 .../jvm/backtype/storm/spout/ShellSpout.java    | 53 ++++++++++++++++----
 1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cd070345/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..c605b12 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -31,6 +31,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import clojure.lang.RT;
@@ -45,7 +46,9 @@ public class ShellSpout implements ISpout {
     private SpoutOutputCollector _collector;
     private String[] _command;
     private ShellProcess _process;
-    
+    private volatile boolean _running = true;
+    private volatile RuntimeException _exception;
+
     private TopologyContext _context;
     
     private SpoutMsg _spoutMsg;
@@ -53,6 +56,7 @@ public class ShellSpout implements ISpout {
     private int workerTimeoutMills;
     private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+    private AtomicBoolean waitingOnSubprocess = new AtomicBoolean(false);
 
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -80,9 +84,14 @@ public class ShellSpout implements ISpout {
     public void close() {
         heartBeatExecutorService.shutdownNow();
         _process.destroy();
+        _running = false;
     }
 
     public void nextTuple() {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -92,6 +101,10 @@ public class ShellSpout implements ISpout {
     }
 
     public void ack(Object msgId) {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -101,6 +114,10 @@ public class ShellSpout implements ISpout {
     }
 
     public void fail(Object msgId) {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -139,6 +156,7 @@ public class ShellSpout implements ISpout {
 
     private void querySubprocess() {
         try {
+            markWaitingSubprocess();
             _process.writeSpoutMsg(_spoutMsg);
 
             while (true) {
@@ -178,6 +196,8 @@ public class ShellSpout implements ISpout {
         } catch (Exception e) {
             String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
             throw new RuntimeException(processInfo, e);
+        } finally {
+            completedWaitingSubprocess();
         }
     }
 
@@ -233,13 +253,25 @@ public class ShellSpout implements ISpout {
         return lastHeartbeatTimestamp.get();
     }
 
-    private void die(Throwable exception) {
-        heartBeatExecutorService.shutdownNow();
+    private void markWaitingSubprocess() {
+        waitingOnSubprocess.compareAndSet(false, true);
+    }
+
+    private void completedWaitingSubprocess() {
+        waitingOnSubprocess.compareAndSet(true, false);
+    }
 
-        LOG.error("Halting process: ShellSpout died.", exception);
+    private void die(Throwable exception) {
+        String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+        _exception = new RuntimeException(processInfo, exception);
+        String message = String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s",
+            Arrays.toString(_command),
+            processInfo);
+        LOG.error(message, exception);
         _collector.reportError(exception);
-        _process.destroy();
-        System.exit(11);
+        if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
+            System.exit(11);
+        }
     }
 
     private class SpoutHeartbeatTimerTask extends TimerTask {
@@ -251,13 +283,14 @@ public class ShellSpout implements ISpout {
 
         @Override
         public void run() {
-            long currentTimeMillis = System.currentTimeMillis();
             long lastHeartbeat = getLastHeartbeat();
+            long currentTimestamp = System.currentTimeMillis();
+            boolean isWaitingOnSubprocess = waitingOnSubprocess.get();
 
-            LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
-                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+            LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}",
+                    lastHeartbeat, isWaitingOnSubprocess, workerTimeoutMills);
 
-            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+            if (isWaitingOnSubprocess && currentTimestamp - lastHeartbeat > workerTimeoutMills) {
                 spout.die(new RuntimeException("subprocess heartbeat timeout"));
             }
         }


[3/6] storm git commit: STORM-1609: Netty Client is not best effort delivery on failed Connection

Posted by pt...@apache.org.
STORM-1609: Netty Client is not best effort delivery on failed Connection


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/392a49d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/392a49d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/392a49d1

Branch: refs/heads/0.10.x-branch
Commit: 392a49d145a4864731ec1491c269f00de8b2abf7
Parents: 24c77c3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:42:22 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:42:22 2016 -0400

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 34 ++++++++++++++++++++
 1 file changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/392a49d1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 2149c0d..3091130 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -64,9 +65,16 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
     private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
 
+    /**
+     * Periodically checks for connected channel in order to avoid loss
+     * of messages
+     */
+    private static final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
+
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
+    private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
 
     private final Map stormConf;
     private final StormBoundedExponentialBackoffRetry retryPolicy;
@@ -137,10 +145,36 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         bootstrap = createClientBootstrap(factory, bufferSize);
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
+        launchChannelAliveThread();
         scheduleConnect(NO_DELAY_MS);
         batcher = new MessageBuffer(messageBatchSize);
     }
 
+    /**
+     * This thread helps us to check for channel connection periodically.
+     * This is performed just to know whether the destination address
+     * is alive or attempts to refresh connections if not alive. This
+     * solution is better than what we have now in case of a bad channel.
+     */
+    private void launchChannelAliveThread() {
+        // netty TimerTask is already defined and hence a fully
+        // qualified name
+        timer.schedule(new java.util.TimerTask() {
+            public void run() {
+                try {
+                    LOG.debug("running timer task, address {}", dstAddress);
+                    if(closing) {
+                        this.cancel();
+                        return;
+                    }
+                    getConnectedChannel();
+                } catch (Exception exp) {
+                    LOG.error("channel connection error {}", exp);
+                }
+            }
+        }, 0, CHANNEL_ALIVE_INTERVAL_MS);
+    }
+
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
         ClientBootstrap bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);


[6/6] storm git commit: Merge branch '0.10.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 0.10.x-branch

Posted by pt...@apache.org.
Merge branch '0.10.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 0.10.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/71117458
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/71117458
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/71117458

Branch: refs/heads/0.10.x-branch
Commit: 711174585a7967d2615d9b54467c8d4e236f7f6e
Parents: 0535da9 a298988
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 14:04:56 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 14:04:56 2016 -0400

----------------------------------------------------------------------
 docs/Concepts.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/6] storm git commit: Fix missing imports.

Posted by pt...@apache.org.
Fix missing imports.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/24c77c3b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/24c77c3b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/24c77c3b

Branch: refs/heads/0.10.x-branch
Commit: 24c77c3beef8eb598792b136834b1718bc66ba9d
Parents: cd07034
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:22:10 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:22:10 2016 -0400

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/24c77c3b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index c605b12..79266ca 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,6 +25,8 @@ import backtype.storm.multilang.ShellMsg;
 import backtype.storm.multilang.SpoutMsg;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
+
+import java.util.Arrays;
 import java.util.Map;
 import java.util.List;
 import java.util.TimerTask;


[5/6] storm git commit: add STORM-1928 to changelog

Posted by pt...@apache.org.
add STORM-1928 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0535da9a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0535da9a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0535da9a

Branch: refs/heads/0.10.x-branch
Commit: 0535da9a98aedf117d5c3a9b9cd5ec952ec8306e
Parents: 77c3ba9
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:44:33 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:44:33 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0535da9a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e6ca76f..02e4db4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
 ## 0.10.2
  * STORM-1609: Netty Client is not best effort delivery on failed Connection
+ * STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
  * STORM-1989: X-Frame-Options support for Storm UI
  * STORM-1834: Documentation How to Generate Certificates For Local Testing SSL Setup
  * STORM-1754: Correct java version in 0.10.x storm-starter


[4/6] storm git commit: add STORM-1609 to changelog

Posted by pt...@apache.org.
add STORM-1609 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77c3ba97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77c3ba97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77c3ba97

Branch: refs/heads/0.10.x-branch
Commit: 77c3ba973b633e96450e91cb0cae56e6c70ebf08
Parents: 392a49d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:43:04 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:43:04 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/77c3ba97/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0e79f96..e6ca76f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.2
+ * STORM-1609: Netty Client is not best effort delivery on failed Connection
  * STORM-1989: X-Frame-Options support for Storm UI
  * STORM-1834: Documentation How to Generate Certificates For Local Testing SSL Setup
  * STORM-1754: Correct java version in 0.10.x storm-starter