You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/11 23:18:16 UTC

[2/9] git commit: Added smart assignment-refreshing for workers.

Added smart assignment-refreshing for workers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/41e5b91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/41e5b91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/41e5b91a

Branch: refs/heads/master
Commit: 41e5b91a71a609818c8ca8b0291ea292710dd468
Parents: b27b8c4
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Fri Jun 27 17:52:52 2014 +0000
Committer: Kyle Nusbaum <kn...@yahoo-inc.com>
Committed: Fri Jun 27 17:52:52 2014 +0000

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 23 +++++++++++++++++---
 1 file changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/41e5b91a/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 74475ee..0ec9bda 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -28,7 +28,8 @@
 
 (defmulti mk-suicide-fn cluster-mode)
 
-(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
+(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
+  (log-message "Reading Assignments.")
   (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
     (doall
      (concat     
@@ -174,11 +175,15 @@
                        )
             :timer-name timer-name))
 
+(def assignment-versions (atom {}))
+
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id]
   (let [cluster-state (cluster/mk-distributed-cluster-state conf)
         storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
         storm-conf (read-supervisor-storm-conf conf storm-id)
-        executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
+        _temp (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)
+        _ (log-message (str "Worker assignments: " _temp))
+        executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
@@ -240,6 +245,8 @@
     [node (Integer/valueOf port-str)]
     ))
 
+(def assignment-versions (atom {}))
+
 (defn mk-refresh-connections [worker]
   (let [outbound-tasks (worker-outbound-tasks worker)
         conf (:conf worker)
@@ -249,7 +256,17 @@
       ([]
         (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
       ([callback]
-        (let [assignment (.assignment-info storm-cluster-state storm-id callback)
+         (log-message "Refreshing Assignments")
+         (let [version (.assignment-version storm-cluster-state storm-id callback)
+;               _ (log-message (str "Assignments are: " @assignment-versions))
+               assignment (if (= version (:version (get @assignment-versions storm-id)))
+                            (do
+                              (log-message "Keeping old Assignments.")
+                              (:data (get @assignment-versions storm-id)))
+                            (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)]
+                              (log-message "Getting new Assignments.")
+                              (swap! assignment-versions assoc storm-id new-assignment)
+                              (:data new-assignment)))
               my-assignment (-> assignment
                                 :executor->node+port
                                 to-task->node+port