You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/01 19:09:19 UTC

[35/50] [abbrv] storm git commit: STORM-1341 Let topology have own heartbeat timeout for multilang subprocess

STORM-1341 Let topology have own heartbeat timeout for multilang subprocess

* config name: topology.subprocess.timeout.secs
* if it's not specified, supervisor.worker.timeout.secs will be used


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc8c296e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc8c296e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc8c296e

Branch: refs/heads/STORM-1040
Commit: fc8c296efc41c1efc6060ba09c07d406ceacc844
Parents: 20a864d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Nov 25 13:46:59 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Nov 25 13:46:59 2015 +0900

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java           | 8 ++++++++
 storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 6 +++++-
 storm-core/src/jvm/backtype/storm/task/ShellBolt.java   | 6 +++++-
 3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fc8c296e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ab17263..89422f6 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1712,6 +1712,14 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
 
     /**
+     * How long a subprocess can go without heartbeating before the ShellSpout/ShellBolt tries to
+     * suicide itself.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_SUBPROCESS_TIMEOUT_SECS = "topology.subprocess.timeout.secs";
+
+    /**
      * Topology central logging sensitivity to determine who has access to logs in central logging system.
      * The possible values are:
      *   S0 - Public (open to all users on grid)

http://git-wip-us.apache.org/repos/asf/storm/blob/fc8c296e/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 4976903..bfdfe67 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -74,7 +74,11 @@ public class ShellSpout implements ISpout {
         _collector = collector;
         _context = context;
 
-        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
+            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+        } else {
+            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        }
 
         _process = new ShellProcess(_command);
         if (!env.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/fc8c296e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index dda99ca..0103715 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -114,7 +114,11 @@ public class ShellBolt implements IBolt {
 
         _context = context;
 
-        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
+            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+        } else {
+            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        }
 
         _process = new ShellProcess(_command);
         if (!env.isEmpty()) {