You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/03/04 17:58:58 UTC
[3/9] storm git commit: Refactoring kill workers method
Refactoring kill workers method
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bbdad039
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bbdad039
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bbdad039
Branch: refs/heads/master
Commit: bbdad03967962dfd017b9ecfb3f83f1d53df7595
Parents: a26f811
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Feb 19 13:52:16 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 19 13:52:16 2016 -0600
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/supervisor.clj | 21 +++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bbdad039/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index d057a01..18fec2d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -553,6 +553,16 @@
(rm-topo-files conf storm-id localizer false)
storm-id)))))
+(defn kill-existing-workers-with-change-in-components [supervisor existing-assignment new-assignment]
+ (let [assigned-executors (or (ls-local-assignments (:local-state supervisor)) {})
+ allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs))
+ valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated)
+ port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))]
+ (doseq [p (set/intersection (set (keys existing-assignment))
+ (set (keys new-assignment)))]
+ (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p)))
+ (shutdown-worker supervisor (port->worker-id p))))))
+
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
@@ -578,11 +588,7 @@
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
localizer (:localizer supervisor)
checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids))
- downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)
- assigned-executors (or (ls-local-assignments local-state) {})
- allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs))
- valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated)
- port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))]
+ downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
@@ -615,10 +621,7 @@
(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
- (doseq [p (set/intersection (set (keys existing-assignment))
- (set (keys new-assignment)))]
- (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p)))
- (shutdown-worker supervisor (port->worker-id p))))
+ (kill-existing-workers-with-change-in-components supervisor existing-assignment new-assignment)
(.assigned isupervisor (keys new-assignment))
(ls-local-assignments! local-state
new-assignment)