You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/11 23:18:18 UTC

[4/9] git commit: Cleaned up and ready for pull.

Cleaned up and ready for pull.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/310e09e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/310e09e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/310e09e1

Branch: refs/heads/master
Commit: 310e09e1a594ef44785e7aeb4b2fd5e4299a303b
Parents: 516b333
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Thu Jun 26 21:40:29 2014 +0000
Committer: Kyle Nusbaum <kn...@yahoo-inc.com>
Committed: Fri Jun 27 17:56:57 2014 +0000

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 28 ++++++--------------
 1 file changed, 8 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/310e09e1/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 3b2a6b5..927446e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -35,32 +35,22 @@
   (shutdown-all-workers [this])
   )
 
-(defn- assignments-snapshot [storm-cluster-state callback existing-assignment assignment-versions]
-  (log-message (str "Recalculating assignments with old: " assignment-versions))
+(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
   (let [storm-ids (.assignments storm-cluster-state callback)]
     (let [new-assignments 
           (->>
            (dofor [sid storm-ids] 
                   (let [recorded-version (:version (get assignment-versions sid))]
                     (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
-                      (do
-                        (log-message (str "Version: " assignment-version " || Recorded Version: " recorded-version))
-                        (if (= assignment-version recorded-version)
-                          (do
-                            (log-message "Using Existing assignment.")
-                            {sid (get assignment-versions sid)})
-                          (do
-                            (log-message "Getting new Assignments.")
-                            (let [assignments (.assignment-info-with-version storm-cluster-state sid callback)] 
-                              (log-message (str "Assignments: " assignments))
-                              {sid assignments}))))
+                      (if (= assignment-version recorded-version)
+                        {sid (get assignment-versions sid)}
+                        {sid (.assignment-info-with-version storm-cluster-state sid callback)})
                       {sid nil})))
            (apply merge)
            (filter-val not-nil?))]
-
+          
       {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
        :versions new-assignments})))
-            
   
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
   (let [assignment (get assignments-snapshot storm-id)
@@ -317,13 +307,10 @@
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
           sync-callback (fn [& ignored] (.add event-manager this))
-          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
           assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS)
           {assignments-snapshot :assignments versions :versions}  (assignments-snapshot 
                                                                    storm-cluster-state sync-callback 
-                                                                   existing-assignment assignment-versions)
-          _ (log-message (str "Got Assignments: " assignments-snapshot
-                              " || And Versions: " (pr-str versions)))
+                                                                   assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
           all-assignment (read-assignments
@@ -331,7 +318,8 @@
                            (:assignment-id supervisor))
           new-assignment (->> all-assignment
                               (filter-key #(.confirmAssigned isupervisor %)))
-          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
+          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
       (log-debug "Synchronizing supervisor")
       (log-debug "Storm code map: " storm-code-map)
       (log-debug "Downloaded storm ids: " downloaded-storm-ids)