You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/11 23:18:21 UTC
[7/9] git commit: Keeping version data only in memory,
as part of supervisor and worker structures.
Keeping version data only in memory, as part of supervisor and worker structures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f6b826b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f6b826b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f6b826b9
Branch: refs/heads/master
Commit: f6b826b91d2e299e3286dac78c4866cbded165ce
Parents: f4e6c19
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Tue Jul 1 16:06:13 2014 +0000
Committer: Kyle Nusbaum <kn...@yahoo-inc.com>
Committed: Tue Jul 1 16:06:13 2014 +0000
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/daemon/common.clj | 1 -
storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 7 +++----
storm-core/src/clj/backtype/storm/daemon/worker.clj | 12 ++++++------
3 files changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6b826b9/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index f2ad7ce..43746b3 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -61,7 +61,6 @@
(def LS-ID "supervisor-id")
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")
-(def LS-ASSIGNMENT-VERSIONS "local-assignment-versions")
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6b826b9/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 927446e..7ba1c69 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -211,6 +211,7 @@
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")
))
+ :assignment-versions (atom {})
})
(defn sync-processes [supervisor]
@@ -307,7 +308,7 @@
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
- assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS)
+ assignment-versions @(:assignment-versions supervisor)
{assignments-snapshot :assignments versions :versions} (assignments-snapshot
storm-cluster-state sync-callback
assignment-versions)
@@ -353,9 +354,7 @@
(.put local-state
LS-LOCAL-ASSIGNMENTS
new-assignment)
- (.put local-state
- LS-ASSIGNMENT-VERSIONS
- versions)
+ (swap! (:assignment-versions supervisor) versions)
(reset! (:curr-assignment supervisor) new-assignment)
;; remove any downloaded code that's no longer assigned or active
;; important that this happens after setting the local assignment so that
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f6b826b9/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 605bc49..aeabdf6 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -175,10 +175,9 @@
)
:timer-name timer-name))
-(def assignment-versions (atom {}))
-
(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
- (let [cluster-state (cluster/mk-distributed-cluster-state conf)
+ (let [assignment-versions (atom {})
+ cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
storm-conf (read-supervisor-storm-conf conf storm-id)
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
@@ -233,6 +232,7 @@
:transfer-local-fn (mk-transfer-local-fn <>)
:receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
:transfer-fn (mk-transfer-fn <>)
+ :assignment-versions assignment-versions
)))
(defn- endpoint->string [[node port]]
@@ -253,10 +253,10 @@
(this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
([callback]
(let [version (.assignment-version storm-cluster-state storm-id callback)
- assignment (if (= version (:version (get @assignment-versions storm-id)))
- (:data (get @assignment-versions storm-id))
+ assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
+ (:data (get @(:assignment-versions worker) storm-id))
(let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)]
- (swap! assignment-versions assoc storm-id new-assignment)
+ (swap! (:assignment-versions worker) assoc storm-id new-assignment)
(:data new-assignment)))
my-assignment (-> assignment
:executor->node+port