You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 22:58:05 UTC
[06/50] git commit: made read-assignment retry on reading
inconsistent assignments.
made read-assignment retry on reading inconsistent assignments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/14bcc9b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/14bcc9b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/14bcc9b2
Branch: refs/heads/security
Commit: 14bcc9b2155ab5be71ccffa43689ef6f51b153f4
Parents: 96e81c1
Author: iwasakims <iw...@example.com>
Authored: Mon Jul 14 10:12:24 2014 -0700
Committer: iwasakims <iw...@example.com>
Committed: Wed Jul 16 08:16:50 2014 -0700
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 26 ++++++---
.../test/clj/backtype/storm/supervisor_test.clj | 56 ++++++++++++++++++++
2 files changed, 74 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/14bcc9b2/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 7ba1c69..8d1ac46 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -67,12 +67,19 @@
[(Integer. port) (LocalAssignment. storm-id (doall executors))]
))))
-
(defn- read-assignments
"Returns map from port to struct containing :storm-id and :executors"
- [assignments-snapshot assignment-id]
- (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
- (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
+ ([assignments-snapshot assignment-id]
+ (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
+ (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
+ ([assignments-snapshot assignment-id existing-assignment retries]
+ (try (let [assignments (read-assignments assignments-snapshot assignment-id)]
+ (reset! retries 0)
+ assignments)
+ (catch RuntimeException e
+ (if (> @retries 2) (throw e) (swap! retries inc))
+ (log-warn (.getMessage e))
+ existing-assignment))))
(defn- read-storm-code-locations
[assignments-snapshot]
@@ -212,6 +219,7 @@
(exit-process! 20 "Error when processing an event")
))
:assignment-versions (atom {})
+ :sync-retry (atom 0)
})
(defn sync-processes [supervisor]
@@ -314,13 +322,15 @@
assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
- all-assignment (read-assignments
- assignments-snapshot
- (:assignment-id supervisor))
+ existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
+ all-assignment (read-assignments assignments-snapshot
+ (:assignment-id supervisor)
+ existing-assignment
+ (:sync-retry supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
- existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
+ ]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/14bcc9b2/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index eb69f11..15d0982 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -352,3 +352,59 @@
;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
)
+(deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {NIMBUS-REASSIGN false
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1] ["sup1" 1]
+ [2] ["sup1" 2]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1] ["sup1" 1]
+ [2] ["sup1" 2]
+ })
+ (advance-cluster-time cluster 10)
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
+ )))