You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/03/04 17:58:58 UTC

[3/9] storm git commit: Refactoring kill workers method

Refactoring kill workers method


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bbdad039
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bbdad039
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bbdad039

Branch: refs/heads/master
Commit: bbdad03967962dfd017b9ecfb3f83f1d53df7595
Parents: a26f811
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Feb 19 13:52:16 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 19 13:52:16 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/supervisor.clj  | 21 +++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bbdad039/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index d057a01..18fec2d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -553,6 +553,16 @@
           (rm-topo-files conf storm-id localizer false)
           storm-id)))))
 
+(defn kill-existing-workers-with-change-in-components [supervisor existing-assignment new-assignment]
+  (let [assigned-executors (or (ls-local-assignments (:local-state supervisor)) {})
+        allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs))
+        valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated)
+        port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))]
+    (doseq [p (set/intersection (set (keys existing-assignment))
+                                (set (keys new-assignment)))]
+      (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p)))
+        (shutdown-worker supervisor (port->worker-id p))))))
+
 (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
   (fn this []
     (let [conf (:conf supervisor)
@@ -578,11 +588,7 @@
           assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
           localizer (:localizer supervisor)
           checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids))
-          downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)
-          assigned-executors (or (ls-local-assignments local-state) {})
-          allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs))
-          valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated)
-          port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))]
+          downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)]
 
       (log-debug "Synchronizing supervisor")
       (log-debug "Storm code map: " storm-code-map)
@@ -615,10 +621,7 @@
       (doseq [p (set/difference (set (keys existing-assignment))
                                 (set (keys new-assignment)))]
         (.killedWorker isupervisor (int p)))
-      (doseq [p (set/intersection (set (keys existing-assignment))
-                                  (set (keys new-assignment)))]
-        (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p)))
-          (shutdown-worker supervisor (port->worker-id p))))
+      (kill-existing-workers-with-change-in-components supervisor existing-assignment new-assignment)
       (.assigned isupervisor (keys new-assignment))
       (ls-local-assignments! local-state
             new-assignment)