You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 22:58:05 UTC

[06/50] git commit: made read-assignment retry on reading inconsistent assignments.

made read-assignment retry on reading inconsistent assignments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/14bcc9b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/14bcc9b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/14bcc9b2

Branch: refs/heads/security
Commit: 14bcc9b2155ab5be71ccffa43689ef6f51b153f4
Parents: 96e81c1
Author: iwasakims <iw...@example.com>
Authored: Mon Jul 14 10:12:24 2014 -0700
Committer: iwasakims <iw...@example.com>
Committed: Wed Jul 16 08:16:50 2014 -0700

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 26 ++++++---
 .../test/clj/backtype/storm/supervisor_test.clj | 56 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/14bcc9b2/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 7ba1c69..8d1ac46 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -67,12 +67,19 @@
                [(Integer. port) (LocalAssignment. storm-id (doall executors))]
                ))))
 
-
 (defn- read-assignments
   "Returns map from port to struct containing :storm-id and :executors"
-  [assignments-snapshot assignment-id]
-  (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
-       (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
+  ([assignments-snapshot assignment-id]
+     (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
+          (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
+  ([assignments-snapshot assignment-id existing-assignment retries]
+     (try (let [assignments (read-assignments assignments-snapshot assignment-id)]
+            (reset! retries 0)
+            assignments)
+          (catch RuntimeException e
+            (if (> @retries 2) (throw e) (swap! retries inc))
+            (log-warn (.getMessage e))
+            existing-assignment))))
 
 (defn- read-storm-code-locations
   [assignments-snapshot]
@@ -212,6 +219,7 @@
                                (exit-process! 20 "Error when processing an event")
                                ))
    :assignment-versions (atom {})
+   :sync-retry (atom 0)
    })
 
 (defn sync-processes [supervisor]
@@ -314,13 +322,15 @@
                                                                    assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
-          all-assignment (read-assignments
-                           assignments-snapshot
-                           (:assignment-id supervisor))
+          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
+          all-assignment (read-assignments assignments-snapshot
+                                           (:assignment-id supervisor)
+                                           existing-assignment
+                                           (:sync-retry supervisor))
           new-assignment (->> all-assignment
                               (filter-key #(.confirmAssigned isupervisor %)))
           assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
-          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
+          ]
       (log-debug "Synchronizing supervisor")
       (log-debug "Storm code map: " storm-code-map)
       (log-debug "Downloaded storm ids: " downloaded-storm-ids)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/14bcc9b2/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index eb69f11..15d0982 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -352,3 +352,59 @@
   ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
   )
 
+(deftest test-retry-read-assignments
+  (with-simulated-time-local-cluster [cluster
+                                      :supervisors 0
+                                      :ports-per-supervisor 2
+                                      :daemon-conf {NIMBUS-REASSIGN false
+                                                    NIMBUS-MONITOR-FREQ-SECS 10
+                                                    TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                    TOPOLOGY-ACKER-EXECUTORS 0}]
+    (letlocals
+     (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+     (bind topology1 (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                      {}))
+     (bind topology2 (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                      {}))
+     (bind state (:storm-cluster-state cluster))
+     (bind changed (capture-changed-workers
+                    (submit-mocked-assignment
+                     (:nimbus cluster)
+                     "topology1"
+                     {TOPOLOGY-WORKERS 2}
+                     topology1
+                     {1 "1"
+                      2 "1"}
+                     {[1] ["sup1" 1]
+                      [2] ["sup1" 2]
+                      })
+                    (submit-mocked-assignment
+                     (:nimbus cluster)
+                     "topology2"
+                     {TOPOLOGY-WORKERS 2}
+                     topology2
+                     {1 "1"
+                      2 "1"}
+                     {[1] ["sup1" 1]
+                      [2] ["sup1" 2]
+                      })
+                    (advance-cluster-time cluster 10)
+                    ))
+     (is (empty? (:launched changed)))
+     (bind options (RebalanceOptions.))
+     (.set_wait_secs options 0)
+     (bind changed (capture-changed-workers
+                    (.rebalance (:nimbus cluster) "topology2" options)
+                    (advance-cluster-time cluster 10)
+                    (heartbeat-workers cluster "sup1" [1 2 3 4])
+                    (advance-cluster-time cluster 10)
+                    ))
+     (validate-launched-once (:launched changed)
+                             {"sup1" [1 2]}
+                             (get-storm-id (:storm-cluster-state cluster) "topology1"))
+     (validate-launched-once (:launched changed)
+                             {"sup1" [3 4]}
+                             (get-storm-id (:storm-cluster-state cluster) "topology2"))
+     )))