You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:51 UTC
[20/24] git commit: change metrics_test use
simulated-time-local-cluster and modify the hang problem with assert-buckets
change metrics_test use simulated-time-local-cluster and modify the hang problem with assert-buckets
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4bab2b85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4bab2b85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4bab2b85
Branch: refs/heads/master
Commit: 4bab2b857d0d481d1859336adeb87d7245c6e1c3
Parents: ecd0adb
Author: dashengju <da...@qq.com>
Authored: Fri Jun 20 10:34:38 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Jun 20 10:34:38 2014 +0800
----------------------------------------------------------------------
.../src/dev/resources/tester_spout_metrics.py | 12 +-
.../test/clj/backtype/storm/metrics_test.clj | 156 ++++++++++---------
2 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/src/dev/resources/tester_spout_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout_metrics.py b/storm-core/src/dev/resources/tester_spout_metrics.py
index a91128a..0480566 100644
--- a/storm-core/src/dev/resources/tester_spout_metrics.py
+++ b/storm-core/src/dev/resources/tester_spout_metrics.py
@@ -32,16 +32,14 @@ class TesterSpout(storm.Spout):
self.count = 0
def nextTuple(self):
- sleep(1)
- storm.log("TesterSpout emit a tuple")
- word = choice(words)
- id = str(uuid4())
- self.pending[id] = word
if self.count < 2:
+ word = choice(words)
+ id = str(uuid4())
+ self.pending[id] = word
storm.rpcMetrics("my-custom-shellspout-metric", 1)
- storm.log("TesterSpout update my-custom-shellspout-metric")
self.count = self.count + 1
- storm.emit([word], id=id)
+ storm.log("TesterSpout update my-custom-shellspout-metric "+str(self.count))
+ storm.emit([word], id=id)
def ack(self, id):
del self.pending[id]
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index 4aaa3dc..4356d30 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -70,15 +70,17 @@
(def metrics-data backtype.storm.metric.testing/buffer)
-(defn wait-for-atleast-N-buckets! [N comp-id metric-name]
- (while
+(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
+ (while-timeout TEST-TIMEOUT-MS
(let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
(or
(and (not= N 0) (nil? taskid->buckets))
(not-every? #(<= N %) (map (comp count second) taskid->buckets))))
-;; (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
-;; "and metric name" metric-name)
- (Thread/sleep 10)))
+ ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
+ (if cluster
+ (advance-cluster-time cluster 1)
+ (Thread/sleep 10))))
+
(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
(-> @metrics-data
@@ -88,10 +90,10 @@
(second)
(or [])))
-(defmacro assert-buckets! [comp-id metric-name expected]
+(defmacro assert-buckets! [comp-id metric-name expected cluster]
`(do
(let [N# (count ~expected)]
- (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
+ (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster)
(is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
(defmacro assert-metric-data-exists! [comp-id metric-name]
@@ -112,18 +114,18 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
- (assert-buckets! "2" "my-custom-metric" [1])
+ (assert-buckets! "2" "my-custom-metric" [1] cluster)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(deftest test-custom-metric-with-multi-tasks
(with-simulated-time-local-cluster
@@ -140,18 +142,18 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
- (assert-buckets! "2" "my-custom-metric" [1])
+ (assert-buckets! "2" "my-custom-metric" [1] cluster)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
(advance-cluster-time cluster 20)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(defn mk-shell-bolt-with-metrics-spec
[inputs command & kwargs]
@@ -160,7 +162,7 @@
(PythonShellMetricsBolt. command) kwargs)))
(deftest test-custom-metric-with-multilang-py
- (with-local-cluster
+ (with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
@@ -173,19 +175,19 @@
(submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
(.feed feeder ["a"] 1)
- (Thread/sleep 6000)
- (assert-buckets! "2" "my-custom-shell-metric" [1])
+ (advance-cluster-time cluster 6)
+ (assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
- (Thread/sleep 5000)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0])
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
- (Thread/sleep 20000)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0])
+ (advance-cluster-time cluster 20)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (Thread/sleep 5000)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2])
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
)))
(defn mk-shell-spout-with-metrics-spec
@@ -194,7 +196,7 @@
(apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
(deftest test-custom-metric-with-spout-multilang-py
- (with-local-cluster
+ (with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
@@ -204,8 +206,8 @@
{"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
(submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
- (Thread/sleep 7000)
- (assert-buckets! "1" "my-custom-shellspout-metric" [2])
+ (advance-cluster-time cluster 7)
+ (assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
)))
@@ -223,27 +225,27 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 61)
- (assert-buckets! "myspout" "__ack-count/default" [1])
- (assert-buckets! "myspout" "__emit-count/default" [1])
- (assert-buckets! "myspout" "__transfer-count/default" [1])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+ (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
(advance-cluster-time cluster 120)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0])
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0] cluster)
(.feed feeder ["b"] 1)
(.feed feeder ["c"] 1)
(advance-cluster-time cluster 60)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2])
- (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2])
- (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2]))))
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2] cluster))))
(deftest test-builtin-metrics-2
@@ -266,36 +268,36 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 6)
(assert-acked tracker 1)
- (assert-buckets! "myspout" "__fail-count/default" [])
- (assert-buckets! "myspout" "__ack-count/default" [1])
- (assert-buckets! "myspout" "__emit-count/default" [1])
- (assert-buckets! "myspout" "__transfer-count/default" [1])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+ (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+ (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
(.feed feeder ["b"] 2)
(advance-cluster-time cluster 5)
- (assert-buckets! "myspout" "__fail-count/default" [])
- (assert-buckets! "myspout" "__ack-count/default" [1 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 1])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
+ (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+ (assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
(advance-cluster-time cluster 15)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 15)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0]))))
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
(deftest test-builtin-metrics-3
(with-simulated-time-local-cluster
@@ -319,21 +321,21 @@
(.feed feeder ["c"] 3)
(advance-cluster-time cluster 9)
(assert-acked tracker 1 3)
- (assert-buckets! "myspout" "__ack-count/default" [2])
- (assert-buckets! "myspout" "__emit-count/default" [3])
- (assert-buckets! "myspout" "__transfer-count/default" [3])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [2])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [3])
+ (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
(is (not (.isFailed tracker 2)))
(advance-cluster-time cluster 30)
(assert-failed tracker 2)
- (assert-buckets! "myspout" "__fail-count/default" [1])
- (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0])
- (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0])
- (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0])
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0])
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0]))))
+ (assert-buckets! "myspout" "__fail-count/default" [1] cluster)
+ (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
+ (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0] cluster)
+ (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0] cluster)
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
(deftest test-system-bolt
(with-simulated-time-local-cluster
@@ -348,12 +350,12 @@
(.feed feeder ["a"] 1)
(advance-cluster-time cluster 70)
- (assert-buckets! "__system" "newWorkerEvent" [1])
+ (assert-buckets! "__system" "newWorkerEvent" [1] cluster)
(assert-metric-data-exists! "__system" "uptimeSecs")
(assert-metric-data-exists! "__system" "startTimeSecs")
(advance-cluster-time cluster 180)
- (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0])
+ (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
)))