You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/11 23:18:16 UTC
[2/9] git commit: Added smart assignment-refreshing for workers.
Added smart assignment-refreshing for workers.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/41e5b91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/41e5b91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/41e5b91a
Branch: refs/heads/master
Commit: 41e5b91a71a609818c8ca8b0291ea292710dd468
Parents: b27b8c4
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Fri Jun 27 17:52:52 2014 +0000
Committer: Kyle Nusbaum <kn...@yahoo-inc.com>
Committed: Fri Jun 27 17:52:52 2014 +0000
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/worker.clj | 23 +++++++++++++++++---
1 file changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41e5b91a/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 74475ee..0ec9bda 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -28,7 +28,8 @@
(defmulti mk-suicide-fn cluster-mode)
-(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
+(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
+ (log-message "Reading Assignments.")
(let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
(doall
(concat
@@ -174,11 +175,15 @@
)
:timer-name timer-name))
+(def assignment-versions (atom {}))
+
(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
storm-conf (read-supervisor-storm-conf conf storm-id)
- executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
+ _temp (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)
+ _ (log-message (str "Worker assignments: " _temp))
+ executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
@@ -240,6 +245,8 @@
[node (Integer/valueOf port-str)]
))
+(def assignment-versions (atom {}))
+
(defn mk-refresh-connections [worker]
(let [outbound-tasks (worker-outbound-tasks worker)
conf (:conf worker)
@@ -249,7 +256,17 @@
([]
(this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
([callback]
- (let [assignment (.assignment-info storm-cluster-state storm-id callback)
+ (log-message "Refreshing Assignments")
+ (let [version (.assignment-version storm-cluster-state storm-id callback)
+; _ (log-message (str "Assignments are: " @assignment-versions))
+ assignment (if (= version (:version (get @assignment-versions storm-id)))
+ (do
+ (log-message "Keeping old Assignments.")
+ (:data (get @assignment-versions storm-id)))
+ (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)]
+ (log-message "Getting new Assignments.")
+ (swap! assignment-versions assoc storm-id new-assignment)
+ (:data new-assignment)))
my-assignment (-> assignment
:executor->node+port
to-task->node+port