You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/11 23:18:18 UTC
[4/9] git commit: Cleaned up and ready for pull.
Cleaned up and ready for pull.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/310e09e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/310e09e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/310e09e1
Branch: refs/heads/master
Commit: 310e09e1a594ef44785e7aeb4b2fd5e4299a303b
Parents: 516b333
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Thu Jun 26 21:40:29 2014 +0000
Committer: Kyle Nusbaum <kn...@yahoo-inc.com>
Committed: Fri Jun 27 17:56:57 2014 +0000
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 28 ++++++--------------
1 file changed, 8 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/310e09e1/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 3b2a6b5..927446e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -35,32 +35,22 @@
(shutdown-all-workers [this])
)
-(defn- assignments-snapshot [storm-cluster-state callback existing-assignment assignment-versions]
- (log-message (str "Recalculating assignments with old: " assignment-versions))
+(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
(let [storm-ids (.assignments storm-cluster-state callback)]
(let [new-assignments
(->>
(dofor [sid storm-ids]
(let [recorded-version (:version (get assignment-versions sid))]
(if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
- (do
- (log-message (str "Version: " assignment-version " || Recorded Version: " recorded-version))
- (if (= assignment-version recorded-version)
- (do
- (log-message "Using Existing assignment.")
- {sid (get assignment-versions sid)})
- (do
- (log-message "Getting new Assignments.")
- (let [assignments (.assignment-info-with-version storm-cluster-state sid callback)]
- (log-message (str "Assignments: " assignments))
- {sid assignments}))))
+ (if (= assignment-version recorded-version)
+ {sid (get assignment-versions sid)}
+ {sid (.assignment-info-with-version storm-cluster-state sid callback)})
{sid nil})))
(apply merge)
(filter-val not-nil?))]
-
+
{:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
:versions new-assignments})))
-
(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (get assignments-snapshot storm-id)
@@ -317,13 +307,10 @@
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
- existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS)
{assignments-snapshot :assignments versions :versions} (assignments-snapshot
storm-cluster-state sync-callback
- existing-assignment assignment-versions)
- _ (log-message (str "Got Assignments: " assignments-snapshot
- " || And Versions: " (pr-str versions)))
+ assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
all-assignment (read-assignments
@@ -331,7 +318,8 @@
(:assignment-id supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
- assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
+ existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)