You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/09/30 22:12:46 UTC

[3/5] storm git commit: Storm-1058 Add kill_workers CLI

Storm-1058 Add kill_workers CLI


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1abe4867
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1abe4867
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1abe4867

Branch: refs/heads/master
Commit: 1abe4867160cb07a3eff6c2307d38b049cb51634
Parents: ced6224
Author: zhuol <zh...@yahoo-inc.com>
Authored: Tue Sep 22 10:44:43 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Tue Sep 29 15:26:57 2015 -0500

----------------------------------------------------------------------
 bin/storm.py                                        | 16 +++++++++++++++-
 .../src/clj/backtype/storm/daemon/executor.clj      |  1 -
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1abe4867/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 024912f..d2507ed 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -348,6 +348,19 @@ def get_errors(*args):
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
 
+def kill_workers(*args):
+    """Syntax: [storm kill_workers]
+
+    Kill the workers running on this supervisor. This command should be run
+    on a supervisor node. If the cluster is running in secure node, then user needs 
+    to have admin rights on the node to be able to successfully kill all workers.
+    """
+    exec_storm_class(
+        "backtype.storm.command.kill_workers",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
+
 def shell(resourcesdir, command, *args):
     tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
     os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
@@ -561,7 +574,8 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui
             "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
             "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
             "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor,
-            "upload-credentials": upload_credentials, "get-errors": get_errors }
+            "upload-credentials": upload_credentials, "get-errors": get_errors,
+            "kill_workers": kill_workers }
 
 def parse_config(config_list):
     global CONFIG_OPTS

http://git-wip-us.apache.org/repos/asf/storm/blob/1abe4867/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index d7a68be..746fde3 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -647,7 +647,6 @@
           
           (let [active? @(:storm-active-atom executor-data)
                 curr-count (.get emitted-count)
-                ;; suspend-time ((:storm-conf executor-data) BACKPRESSURE-SPOUT-SUSPEND-TIME-MS)
                 backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
                 throttle-on (and backpressure-enabled
                               @(:throttle-on (:worker executor-data)))