You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:51 UTC

[20/24] git commit: change metrics_test use simulated-time-local-cluster and modify the hang problem with assert-buckets

change metrics_test use simulated-time-local-cluster and modify the hang problem with assert-buckets


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4bab2b85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4bab2b85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4bab2b85

Branch: refs/heads/master
Commit: 4bab2b857d0d481d1859336adeb87d7245c6e1c3
Parents: ecd0adb
Author: dashengju <da...@qq.com>
Authored: Fri Jun 20 10:34:38 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Fri Jun 20 10:34:38 2014 +0800

----------------------------------------------------------------------
 .../src/dev/resources/tester_spout_metrics.py   |  12 +-
 .../test/clj/backtype/storm/metrics_test.clj    | 156 ++++++++++---------
 2 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/src/dev/resources/tester_spout_metrics.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout_metrics.py b/storm-core/src/dev/resources/tester_spout_metrics.py
index a91128a..0480566 100644
--- a/storm-core/src/dev/resources/tester_spout_metrics.py
+++ b/storm-core/src/dev/resources/tester_spout_metrics.py
@@ -32,16 +32,14 @@ class TesterSpout(storm.Spout):
         self.count = 0
 
     def nextTuple(self):
-        sleep(1)
-        storm.log("TesterSpout emit a tuple")
-        word = choice(words)
-        id = str(uuid4())
-        self.pending[id] = word
         if self.count < 2:
+            word = choice(words)
+            id = str(uuid4())
+            self.pending[id] = word
             storm.rpcMetrics("my-custom-shellspout-metric", 1)
-            storm.log("TesterSpout update my-custom-shellspout-metric")
             self.count = self.count + 1
-        storm.emit([word], id=id)
+            storm.log("TesterSpout update my-custom-shellspout-metric "+str(self.count))
+            storm.emit([word], id=id)
 
     def ack(self, id):
         del self.pending[id]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4bab2b85/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index 4aaa3dc..4356d30 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -70,15 +70,17 @@
 
 (def metrics-data backtype.storm.metric.testing/buffer)
 
-(defn wait-for-atleast-N-buckets! [N comp-id metric-name]
-  (while
+(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
+  (while-timeout TEST-TIMEOUT-MS
       (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
         (or
          (and (not= N 0) (nil? taskid->buckets))
          (not-every? #(<= N %) (map (comp count second) taskid->buckets))))
-;;    (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
-;;             "and metric name" metric-name)
-    (Thread/sleep 10)))
+      ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
+    (if cluster
+      (advance-cluster-time cluster 1)
+      (Thread/sleep 10))))
+    
 
 (defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
   (-> @metrics-data
@@ -88,10 +90,10 @@
       (second)
       (or [])))
 
-(defmacro assert-buckets! [comp-id metric-name expected]
+(defmacro assert-buckets! [comp-id metric-name expected cluster]
   `(do
      (let [N# (count ~expected)]
-       (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
+       (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster)
        (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
 
 (defmacro assert-metric-data-exists! [comp-id metric-name]
@@ -112,18 +114,18 @@
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
-      (assert-buckets! "2" "my-custom-metric" [1])
+      (assert-buckets! "2" "my-custom-metric" [1] cluster)
             
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
 
       (advance-cluster-time cluster 20)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
       
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)               
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (deftest test-custom-metric-with-multi-tasks
   (with-simulated-time-local-cluster
@@ -140,18 +142,18 @@
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
-      (assert-buckets! "2" "my-custom-metric" [1])
+      (assert-buckets! "2" "my-custom-metric" [1] cluster)
 
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
 
       (advance-cluster-time cluster 20)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
 
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
       (advance-cluster-time cluster 5)
-      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (defn mk-shell-bolt-with-metrics-spec
   [inputs command & kwargs]
@@ -160,7 +162,7 @@
          (PythonShellMetricsBolt. command) kwargs)))
 
 (deftest test-custom-metric-with-multilang-py
-  (with-local-cluster 
+  (with-simulated-time-local-cluster 
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                        [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
@@ -173,19 +175,19 @@
       (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
-      (Thread/sleep 6000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1])
+      (advance-cluster-time cluster 6)
+      (assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
             
-      (Thread/sleep 5000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1 0])
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
 
-      (Thread/sleep 20000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0])
+      (advance-cluster-time cluster 20)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
       
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)               
-      (Thread/sleep 5000)
-      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2])
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
       )))
 
 (defn mk-shell-spout-with-metrics-spec
@@ -194,7 +196,7 @@
     (apply thrift/mk-spout-spec (PythonShellMetricsSpout. command) kwargs)))
 
 (deftest test-custom-metric-with-spout-multilang-py
-  (with-local-cluster 
+  (with-simulated-time-local-cluster 
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                        [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
@@ -204,8 +206,8 @@
                      {"2" (thrift/mk-bolt-spec {"1" :all} count-acks)})]
       (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
 
-      (Thread/sleep 7000)
-      (assert-buckets! "1" "my-custom-shellspout-metric" [2])
+      (advance-cluster-time cluster 7)
+      (assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
       )))
 
 
@@ -223,27 +225,27 @@
       
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 61)
-      (assert-buckets! "myspout" "__ack-count/default" [1])
-      (assert-buckets! "myspout" "__emit-count/default" [1])
-      (assert-buckets! "myspout" "__transfer-count/default" [1])            
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+      (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
 
       (advance-cluster-time cluster 120)
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0])
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0] cluster)
 
       (.feed feeder ["b"] 1)
       (.feed feeder ["c"] 1)
       (advance-cluster-time cluster 60)
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2])
-      (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2])      
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2]))))
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)      
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2] cluster))))
 
 
 (deftest test-builtin-metrics-2
@@ -266,36 +268,36 @@
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
       (assert-acked tracker 1)
-      (assert-buckets! "myspout" "__fail-count/default" [])
-      (assert-buckets! "myspout" "__ack-count/default" [1])
-      (assert-buckets! "myspout" "__emit-count/default" [1])
-      (assert-buckets! "myspout" "__transfer-count/default" [1])            
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])     
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+      (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+      (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)     
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
 
       (.feed feeder ["b"] 2)      
       (advance-cluster-time cluster 5)
-      (assert-buckets! "myspout" "__fail-count/default" [])
-      (assert-buckets! "myspout" "__ack-count/default" [1 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 1])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1])                  
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
+      (assert-buckets! "myspout" "__fail-count/default" [] cluster)
+      (assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1] cluster)                  
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
 
       (advance-cluster-time cluster 15)      
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
       
       (.feed feeder ["c"] 3)            
       (advance-cluster-time cluster 15)      
-      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0]))))
+      (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
 
 (deftest test-builtin-metrics-3
   (with-simulated-time-local-cluster
@@ -319,21 +321,21 @@
       (.feed feeder ["c"] 3)
       (advance-cluster-time cluster 9)
       (assert-acked tracker 1 3)
-      (assert-buckets! "myspout" "__ack-count/default" [2])
-      (assert-buckets! "myspout" "__emit-count/default" [3])
-      (assert-buckets! "myspout" "__transfer-count/default" [3])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3])
+      (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
       
       (is (not (.isFailed tracker 2)))
       (advance-cluster-time cluster 30)
       (assert-failed tracker 2)
-      (assert-buckets! "myspout" "__fail-count/default" [1])
-      (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0])
-      (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0])
-      (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0])
-      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0])
-      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0]))))
+      (assert-buckets! "myspout" "__fail-count/default" [1] cluster)
+      (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
+      (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0] cluster)
+      (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0] cluster)
+      (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
 
 (deftest test-system-bolt
   (with-simulated-time-local-cluster
@@ -348,12 +350,12 @@
 
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 70)
-      (assert-buckets! "__system" "newWorkerEvent" [1])
+      (assert-buckets! "__system" "newWorkerEvent" [1] cluster)
       (assert-metric-data-exists! "__system" "uptimeSecs")
       (assert-metric-data-exists! "__system" "startTimeSecs")
 
       (advance-cluster-time cluster 180)
-      (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0])
+      (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
       )))