You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/01 19:09:19 UTC
[35/50] [abbrv] storm git commit: STORM-1341 Let topology have own
heartbeat timeout for multilang subprocess
STORM-1341 Let topology have own heartbeat timeout for multilang subprocess
* config name: topology.subprocess.timeout.secs
* if it's not specified, supervisor.worker.timeout.secs will be used
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc8c296e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc8c296e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc8c296e
Branch: refs/heads/STORM-1040
Commit: fc8c296efc41c1efc6060ba09c07d406ceacc844
Parents: 20a864d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Nov 25 13:46:59 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Nov 25 13:46:59 2015 +0900
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 8 ++++++++
storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 6 +++++-
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 6 +++++-
3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fc8c296e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ab17263..89422f6 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1712,6 +1712,14 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
/**
+ * How long a subprocess can go without heartbeating before the ShellSpout/ShellBolt tries to
+ * suicide itself.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_SUBPROCESS_TIMEOUT_SECS = "topology.subprocess.timeout.secs";
+
+ /**
* Topology central logging sensitivity to determine who has access to logs in central logging system.
* The possible values are:
* S0 - Public (open to all users on grid)
http://git-wip-us.apache.org/repos/asf/storm/blob/fc8c296e/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 4976903..bfdfe67 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -74,7 +74,11 @@ public class ShellSpout implements ISpout {
_collector = collector;
_context = context;
- workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+ if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
+ workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+ } else {
+ workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+ }
_process = new ShellProcess(_command);
if (!env.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/fc8c296e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index dda99ca..0103715 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -114,7 +114,11 @@ public class ShellBolt implements IBolt {
_context = context;
- workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+ if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
+ workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+ } else {
+ workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+ }
_process = new ShellProcess(_command);
if (!env.isEmpty()) {