You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/08/17 18:05:16 UTC
[1/6] storm git commit: STORM-1928 ShellSpout should only check
heartbeat while ShellSpout is waiting for subprocess to sync
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch a29898843 -> 711174585
STORM-1928 ShellSpout should only check heartbeat while ShellSpout is waiting for subprocess to sync
* only check heartbeat while ShellSpout is waiting for subprocess to sync
* change ShellSpout.die logic similar to ShellBolt
* ShellSpout's die message is logged but worker is not died after logging
* Confirmed that worker becomes died when die has been called
* rename method to more meaningful name
* mark waiting subprocess first to check heartbeat while also sending spout message to subprocess
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd070345
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd070345
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd070345
Branch: refs/heads/0.10.x-branch
Commit: cd070345665979ef52b5310252c555d5f29c0f23
Parents: a51a289
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 28 00:07:59 2016 +0900
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:21:32 2016 -0400
----------------------------------------------------------------------
.../jvm/backtype/storm/spout/ShellSpout.java | 53 ++++++++++++++++----
1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd070345/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..c605b12 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -31,6 +31,7 @@ import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import clojure.lang.RT;
@@ -45,7 +46,9 @@ public class ShellSpout implements ISpout {
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
-
+ private volatile boolean _running = true;
+ private volatile RuntimeException _exception;
+
private TopologyContext _context;
private SpoutMsg _spoutMsg;
@@ -53,6 +56,7 @@ public class ShellSpout implements ISpout {
private int workerTimeoutMills;
private ScheduledExecutorService heartBeatExecutorService;
private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+ private AtomicBoolean waitingOnSubprocess = new AtomicBoolean(false);
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
@@ -80,9 +84,14 @@ public class ShellSpout implements ISpout {
public void close() {
heartBeatExecutorService.shutdownNow();
_process.destroy();
+ _running = false;
}
public void nextTuple() {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -92,6 +101,10 @@ public class ShellSpout implements ISpout {
}
public void ack(Object msgId) {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -101,6 +114,10 @@ public class ShellSpout implements ISpout {
}
public void fail(Object msgId) {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -139,6 +156,7 @@ public class ShellSpout implements ISpout {
private void querySubprocess() {
try {
+ markWaitingSubprocess();
_process.writeSpoutMsg(_spoutMsg);
while (true) {
@@ -178,6 +196,8 @@ public class ShellSpout implements ISpout {
} catch (Exception e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException(processInfo, e);
+ } finally {
+ completedWaitingSubprocess();
}
}
@@ -233,13 +253,25 @@ public class ShellSpout implements ISpout {
return lastHeartbeatTimestamp.get();
}
- private void die(Throwable exception) {
- heartBeatExecutorService.shutdownNow();
+ private void markWaitingSubprocess() {
+ waitingOnSubprocess.compareAndSet(false, true);
+ }
+
+ private void completedWaitingSubprocess() {
+ waitingOnSubprocess.compareAndSet(true, false);
+ }
- LOG.error("Halting process: ShellSpout died.", exception);
+ private void die(Throwable exception) {
+ String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+ _exception = new RuntimeException(processInfo, exception);
+ String message = String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s",
+ Arrays.toString(_command),
+ processInfo);
+ LOG.error(message, exception);
_collector.reportError(exception);
- _process.destroy();
- System.exit(11);
+ if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
+ System.exit(11);
+ }
}
private class SpoutHeartbeatTimerTask extends TimerTask {
@@ -251,13 +283,14 @@ public class ShellSpout implements ISpout {
@Override
public void run() {
- long currentTimeMillis = System.currentTimeMillis();
long lastHeartbeat = getLastHeartbeat();
+ long currentTimestamp = System.currentTimeMillis();
+ boolean isWaitingOnSubprocess = waitingOnSubprocess.get();
- LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
- currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+ LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}",
+ lastHeartbeat, isWaitingOnSubprocess, workerTimeoutMills);
- if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+ if (isWaitingOnSubprocess && currentTimestamp - lastHeartbeat > workerTimeoutMills) {
spout.die(new RuntimeException("subprocess heartbeat timeout"));
}
}
[3/6] storm git commit: STORM-1609: Netty Client is not best effort
delivery on failed Connection
Posted by pt...@apache.org.
STORM-1609: Netty Client is not best effort delivery on failed Connection
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/392a49d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/392a49d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/392a49d1
Branch: refs/heads/0.10.x-branch
Commit: 392a49d145a4864731ec1491c269f00de8b2abf7
Parents: 24c77c3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:42:22 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:42:22 2016 -0400
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 34 ++++++++++++++++++++
1 file changed, 34 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/392a49d1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 2149c0d..3091130 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -64,9 +65,16 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+ /**
+ * Periodically checks for connected channel in order to avoid loss
+ * of messages
+ */
+ private static final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
+
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
private static final long NO_DELAY_MS = 0L;
+ private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
private final Map stormConf;
private final StormBoundedExponentialBackoffRetry retryPolicy;
@@ -137,10 +145,36 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
bootstrap = createClientBootstrap(factory, bufferSize);
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
+ launchChannelAliveThread();
scheduleConnect(NO_DELAY_MS);
batcher = new MessageBuffer(messageBatchSize);
}
+ /**
+ * This thread helps us to check for channel connection periodically.
+ * This is performed just to know whether the destination address
+ * is alive or attempts to refresh connections if not alive. This
+ * solution is better than what we have now in case of a bad channel.
+ */
+ private void launchChannelAliveThread() {
+ // netty TimerTask is already defined and hence a fully
+ // qualified name
+ timer.schedule(new java.util.TimerTask() {
+ public void run() {
+ try {
+ LOG.debug("running timer task, address {}", dstAddress);
+ if(closing) {
+ this.cancel();
+ return;
+ }
+ getConnectedChannel();
+ } catch (Exception exp) {
+ LOG.error("channel connection error {}", exp);
+ }
+ }
+ }, 0, CHANNEL_ALIVE_INTERVAL_MS);
+ }
+
private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
[6/6] storm git commit: Merge branch '0.10.x-branch' of
https://git-wip-us.apache.org/repos/asf/storm into 0.10.x-branch
Posted by pt...@apache.org.
Merge branch '0.10.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 0.10.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/71117458
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/71117458
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/71117458
Branch: refs/heads/0.10.x-branch
Commit: 711174585a7967d2615d9b54467c8d4e236f7f6e
Parents: 0535da9 a298988
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 14:04:56 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 14:04:56 2016 -0400
----------------------------------------------------------------------
docs/Concepts.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/6] storm git commit: Fix missing imports.
Posted by pt...@apache.org.
Fix missing imports.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/24c77c3b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/24c77c3b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/24c77c3b
Branch: refs/heads/0.10.x-branch
Commit: 24c77c3beef8eb598792b136834b1718bc66ba9d
Parents: cd07034
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:22:10 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:22:10 2016 -0400
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/24c77c3b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index c605b12..79266ca 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,6 +25,8 @@ import backtype.storm.multilang.ShellMsg;
import backtype.storm.multilang.SpoutMsg;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
+
+import java.util.Arrays;
import java.util.Map;
import java.util.List;
import java.util.TimerTask;
[5/6] storm git commit: add STORM-1928 to changelog
Posted by pt...@apache.org.
add STORM-1928 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0535da9a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0535da9a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0535da9a
Branch: refs/heads/0.10.x-branch
Commit: 0535da9a98aedf117d5c3a9b9cd5ec952ec8306e
Parents: 77c3ba9
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:44:33 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:44:33 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0535da9a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e6ca76f..02e4db4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
## 0.10.2
* STORM-1609: Netty Client is not best effort delivery on failed Connection
+ * STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
* STORM-1989: X-Frame-Options support for Storm UI
* STORM-1834: Documentation How to Generate Certificates For Local Testing SSL Setup
* STORM-1754: Correct java version in 0.10.x storm-starter
[4/6] storm git commit: add STORM-1609 to changelog
Posted by pt...@apache.org.
add STORM-1609 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/77c3ba97
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/77c3ba97
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/77c3ba97
Branch: refs/heads/0.10.x-branch
Commit: 77c3ba973b633e96450e91cb0cae56e6c70ebf08
Parents: 392a49d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 17 13:43:04 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 17 13:43:04 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/77c3ba97/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0e79f96..e6ca76f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.10.2
+ * STORM-1609: Netty Client is not best effort delivery on failed Connection
* STORM-1989: X-Frame-Options support for Storm UI
* STORM-1834: Documentation How to Generate Certificates For Local Testing SSL Setup
* STORM-1754: Correct java version in 0.10.x storm-starter