You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/11 23:18:21 UTC

[7/9] git commit: Keeping version data only in memory, as part of supervisor and worker structures.

Keeping version data only in memory, as part of supervisor and worker structures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f6b826b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f6b826b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f6b826b9

Branch: refs/heads/master
Commit: f6b826b91d2e299e3286dac78c4866cbded165ce
Parents: f4e6c19
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Tue Jul 1 16:06:13 2014 +0000
Committer: Kyle Nusbaum <kn...@yahoo-inc.com>
Committed: Tue Jul 1 16:06:13 2014 +0000

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/daemon/common.clj     |  1 -
 storm-core/src/clj/backtype/storm/daemon/supervisor.clj |  7 +++----
 storm-core/src/clj/backtype/storm/daemon/worker.clj     | 12 ++++++------
 3 files changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6b826b9/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index f2ad7ce..43746b3 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -61,7 +61,6 @@
 (def LS-ID "supervisor-id")
 (def LS-LOCAL-ASSIGNMENTS "local-assignments")
 (def LS-APPROVED-WORKERS "approved-workers")
-(def LS-ASSIGNMENT-VERSIONS "local-assignment-versions")
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6b826b9/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 927446e..7ba1c69 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -211,6 +211,7 @@
                                (log-error t "Error when processing event")
                                (exit-process! 20 "Error when processing an event")
                                ))
+   :assignment-versions (atom {})
    })
 
 (defn sync-processes [supervisor]
@@ -307,7 +308,7 @@
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
           sync-callback (fn [& ignored] (.add event-manager this))
-          assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS)
+          assignment-versions @(:assignment-versions supervisor)
           {assignments-snapshot :assignments versions :versions}  (assignments-snapshot 
                                                                    storm-cluster-state sync-callback 
                                                                    assignment-versions)
@@ -353,9 +354,7 @@
       (.put local-state
             LS-LOCAL-ASSIGNMENTS
             new-assignment)
-      (.put local-state
-            LS-ASSIGNMENT-VERSIONS
-            versions)
+      (swap! (:assignment-versions supervisor) versions)
       (reset! (:curr-assignment supervisor) new-assignment)
       ;; remove any downloaded code that's no longer assigned or active
       ;; important that this happens after setting the local assignment so that

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6b826b9/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 605bc49..aeabdf6 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -175,10 +175,9 @@
                        )
             :timer-name timer-name))
 
-(def assignment-versions (atom {}))
-
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id]
-  (let [cluster-state (cluster/mk-distributed-cluster-state conf)
+  (let [assignment-versions (atom {})
+        cluster-state (cluster/mk-distributed-cluster-state conf)
         storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
         storm-conf (read-supervisor-storm-conf conf storm-id)
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
@@ -233,6 +232,7 @@
       :transfer-local-fn (mk-transfer-local-fn <>)
       :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
       :transfer-fn (mk-transfer-fn <>)
+      :assignment-versions assignment-versions
       )))
 
 (defn- endpoint->string [[node port]]
@@ -253,10 +253,10 @@
         (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
       ([callback]
          (let [version (.assignment-version storm-cluster-state storm-id callback)
-               assignment (if (= version (:version (get @assignment-versions storm-id)))
-                            (:data (get @assignment-versions storm-id))
+               assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
+                            (:data (get @(:assignment-versions worker) storm-id))
                             (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)]
-                              (swap! assignment-versions assoc storm-id new-assignment)
+                              (swap! (:assignment-versions worker) assoc storm-id new-assignment)
                               (:data new-assignment)))
               my-assignment (-> assignment
                                 :executor->node+port