You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/24 03:03:14 UTC
[4/7] storm git commit: Removing unwanted rebalance topology call
from nimbus_test
Removing unwanted rebalance topology call from nimbus_test
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce5dd660
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce5dd660
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce5dd660
Branch: refs/heads/master
Commit: ce5dd660d7607a4298220c905851f07d883f6baf
Parents: 6c0ebf7
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Oct 23 16:48:24 2015 -0500
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Oct 23 16:48:24 2015 -0500
----------------------------------------------------------------------
.../test/clj/backtype/storm/nimbus_test.clj | 28 +++++++-------------
1 file changed, 9 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ce5dd660/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 0b36e51..0893d1c 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -191,7 +191,7 @@
)))
(deftest test-assignment
- (with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
+ (with-simulated-time-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
:daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
@@ -206,9 +206,7 @@
"4" (thrift/mk-bolt-spec {"1" :global "2" :none} (TestPlannerBolt.) :parallelism-hint 4)}
)
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- _ (Thread/sleep 1000)
+ _ (advance-cluster-time cluster 11)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
;; 3 should be assigned once (if it were optimized, we'd have
@@ -219,9 +217,7 @@
(is (= 1 (count (task-info "3"))))
(is (= 4 (storm-num-workers state "mystorm")))
(submit-local-topology nimbus "storm2" {TOPOLOGY-WORKERS 20} topology2)
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance nimbus "storm2" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- (Thread/sleep 1000)
+ (advance-cluster-time cluster 11)
(check-consistency cluster "storm2")
(is (= 2 (count (.assignments state nil))))
(let [task-info (storm-component->task-info cluster "storm2")]
@@ -338,7 +334,7 @@
)))
(deftest test-zero-executor-or-tasks
- (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ (with-simulated-time-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
topology (thrift/mk-topology
@@ -346,9 +342,7 @@
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 1 :conf {TOPOLOGY-TASKS 2})
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :conf {TOPOLOGY-TASKS 5})})
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- _ (Thread/sleep 1000)
+ _ (advance-cluster-time cluster 11)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
(is (= 0 (count (task-info "1"))))
@@ -358,16 +352,14 @@
)))
(deftest test-executor-assignments
- (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ (with-simulated-time-local-cluster[cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [nimbus (:nimbus cluster)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3 :conf {TOPOLOGY-TASKS 5})}
{"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 8 :conf {TOPOLOGY-TASKS 2})
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.) :parallelism-hint 3)})
_ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- _ (.rebalance nimbus "mystorm" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- _ (Thread/sleep 1000)
+ _ (advance-cluster-time cluster 11)
task-info (storm-component->task-info cluster "mystorm")
executor-info (->> (storm-component->executor-info cluster "mystorm")
(map-val #(map executor-id->tasks %)))]
@@ -383,7 +375,7 @@
)))
(deftest test-over-parallelism-assignment
- (with-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
+ (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
:daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)
@@ -394,9 +386,7 @@
"4" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 10)}
)
_ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology)
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- _ (.rebalance nimbus "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- _ (Thread/sleep 1000)
+ _ (advance-cluster-time cluster 11)
task-info (storm-component->task-info cluster "test")]
(check-consistency cluster "test")
(is (= 21 (count (task-info "1"))))