You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/26 07:52:42 UTC
[1/2] storm git commit: STORM-3121: Fix flaky metrics tests in
storm-core
Repository: storm
Updated Branches:
refs/heads/master a050ee8b1 -> 69801887c
STORM-3121: Fix flaky metrics tests in storm-core
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f876b31f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f876b31f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f876b31f
Branch: refs/heads/master
Commit: f876b31f4b91960b9012a679d37f04a112424a9b
Parents: d6f8afb
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jun 24 12:55:11 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jun 26 08:06:21 2018 +0200
----------------------------------------------------------------------
pom.xml | 7 +
storm-core/pom.xml | 5 +
.../test/clj/org/apache/storm/metrics_test.clj | 152 ++++++++++---------
storm-core/test/resources/log4j2-test.xml | 3 +-
4 files changed, 97 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f876b31f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8c9579d..fa4c77c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -298,6 +298,7 @@
<joda-time.version>2.3</joda-time.version>
<thrift.version>0.11.0</thrift.version>
<junit.version>4.11</junit.version>
+ <awaitility.version>3.1.0</awaitility.version>
<metrics-clojure.version>2.5.1</metrics-clojure.version>
<hdrhistogram.version>2.1.10</hdrhistogram.version>
<hamcrest.version>2.0.0.0</hamcrest.version>
@@ -1034,6 +1035,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
<version>${hamcrest.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/f876b31f/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 277f252..9690a06 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -185,6 +185,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/storm/blob/f876b31f/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 4221d0b..8ce8fbb 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -24,6 +24,10 @@
(:import [org.apache.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
(:import [org.apache.storm.metric.api.rpc CountShellMetric])
(:import [org.apache.storm Testing Testing$Condition LocalCluster$Builder])
+ (:import [org.awaitility Awaitility])
+ (:import [org.awaitility.core ConditionEvaluationListener ConditionTimeoutException])
+ (:import [java.util.concurrent TimeUnit Callable])
+ (:import [org.hamcrest CoreMatchers])
(:use [org.apache.storm config])
(:use [org.apache.storm clojure])
@@ -87,12 +91,22 @@
(first) ;; pick first task in the list, ignore other tasks' metric data.
(second)
(or [])))
-
-(defmacro assert-buckets! [comp-id metric-name expected cluster]
- `(do
- (let [N# (count ~expected)]
- (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name ~cluster)
- (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
+
+(defn assert-metric-running-sum! [comp-id metric-name expected min-buckets cluster]
+ (try
+ (do
+ (wait-for-atleast-N-buckets! min-buckets comp-id metric-name cluster)
+ (.until
+ (.atMost
+ (.conditionEvaluationListener
+ (.pollInterval (Awaitility/with) 10 TimeUnit/MILLISECONDS)
+ (reify ConditionEvaluationListener (conditionEvaluated [this condition]
+ (.advanceClusterTime cluster 1))))
+ Testing/TEST_TIMEOUT_MS TimeUnit/MILLISECONDS)
+ (reify Callable (call [this]
+ (reduce + (lookup-bucket-by-comp-id-&-metric-name! comp-id metric-name))))
+ (CoreMatchers/equalTo expected)))
+ (catch ConditionTimeoutException e (throw (AssertionError. (.getMessage e))))))
(defmacro assert-metric-data-exists! [comp-id metric-name]
`(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name))))
@@ -116,18 +130,18 @@
(.feed feeder ["a"] 1)
(.advanceClusterTime cluster 6)
- (assert-buckets! "2" "my-custom-metric" [1] cluster)
+ (assert-metric-running-sum! "2" "my-custom-metric" 1 1 cluster)
(.advanceClusterTime cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
+ (assert-metric-running-sum! "2" "my-custom-metric" 1 2 cluster)
(.advanceClusterTime cluster 20)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
+ (assert-metric-running-sum! "2" "my-custom-metric" 1 6 cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(.advanceClusterTime cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
+ (assert-metric-running-sum! "2" "my-custom-metric" 3 7 cluster))))
(deftest test-custom-metric-with-multi-tasks
(with-open [cluster (.build (doto (LocalCluster$Builder.)
@@ -148,18 +162,18 @@
(.feed feeder ["a"] 1)
(.advanceClusterTime cluster 6)
- (assert-buckets! "2" "my-custom-metric" [1] cluster)
+ (assert-metric-running-sum! "2" "my-custom-metric" 1 1 cluster)
(.advanceClusterTime cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
+ (assert-metric-running-sum! "2" "my-custom-metric" 1 2 cluster)
(.advanceClusterTime cluster 20)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
+ (assert-metric-running-sum! "2" "my-custom-metric" 1 6 cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(.advanceClusterTime cluster 5)
- (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
+ (assert-metric-running-sum! "2" "my-custom-metric" 3 7 cluster))))
(defn mk-shell-bolt-with-metrics-spec
[inputs command file]
@@ -185,18 +199,18 @@
(.feed feeder ["a"] 1)
(.advanceClusterTime cluster 6)
- (assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
+ (assert-metric-running-sum! "2" "my-custom-shell-metric" 1 1 cluster)
(.advanceClusterTime cluster 5)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
+ (assert-metric-running-sum! "2" "my-custom-shell-metric" 1 2 cluster)
(.advanceClusterTime cluster 20)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
+ (assert-metric-running-sum! "2" "my-custom-shell-metric" 1 6 cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(.advanceClusterTime cluster 5)
- (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
+ (assert-metric-running-sum! "2" "my-custom-shell-metric" 3 7 cluster)
)))
(defn mk-shell-spout-with-metrics-spec
@@ -220,7 +234,7 @@
(.submitTopology cluster "shell-spout-metrics-tester" {} topology)
(.advanceClusterTime cluster 7)
- (assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
+ (assert-metric-running-sum! "1" "my-custom-shellspout-metric" 2 1 cluster)
)))
@@ -242,27 +256,27 @@
(.feed feeder ["a"] 1)
(.advanceClusterTime cluster 61)
- (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 1 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 1 1 cluster)
(.advanceClusterTime cluster 120)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0] cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 1 3 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 1 3 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 1 3 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 3 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 1 3 cluster)
(.feed feeder ["b"] 1)
(.feed feeder ["c"] 1)
(.advanceClusterTime cluster 60)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2] cluster))))
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 3 4 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 3 4 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 4 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 3 4 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 4 cluster))))
(deftest test-builtin-metrics-2
@@ -288,37 +302,37 @@
(.feed feeder ["a"] 1)
(.advanceClusterTime cluster 6)
- (assert-buckets! "myspout" "__fail-count/default" [] cluster)
- (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
+ (assert-metric-running-sum! "myspout" "__fail-count/default" 0 0 cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 1 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 1 1 cluster)
(assert-acked tracker 1)
(.feed feeder ["b"] 2)
(.advanceClusterTime cluster 5)
- (assert-buckets! "myspout" "__fail-count/default" [] cluster)
- (assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1 1] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
+ (assert-metric-running-sum! "myspout" "__fail-count/default" 0 0 cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 1 2 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 2 2 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 2 2 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 2 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 2 2 cluster)
(.advanceClusterTime cluster 15)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 1 5 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 2 5 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 2 5 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 5 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 2 5 cluster)
(.feed feeder ["c"] 3)
(.advanceClusterTime cluster 15)
- (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 2 8 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 3 8 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 8 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 8 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 8 cluster))))
(deftest test-builtin-metrics-3
(with-open [cluster (.build (doto (LocalCluster$Builder.)
@@ -345,22 +359,22 @@
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
(.advanceClusterTime cluster 9)
- (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 2 1 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 3 1 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 1 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 1 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 1 cluster)
(assert-acked tracker 1 3)
(is (not (.isFailed tracker 2)))
(.advanceClusterTime cluster 30)
(assert-failed tracker 2)
- (assert-buckets! "myspout" "__fail-count/default" [1] cluster)
- (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
- (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0] cluster)
- (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0] cluster)
- (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0] cluster)
- (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
+ (assert-metric-running-sum! "myspout" "__fail-count/default" 1 1 cluster)
+ (assert-metric-running-sum! "myspout" "__ack-count/default" 2 4 cluster)
+ (assert-metric-running-sum! "myspout" "__emit-count/default" 3 4 cluster)
+ (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 4 cluster)
+ (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 4 cluster)
+ (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 4 cluster))))
(deftest test-system-bolt
(with-open [cluster (.build (doto (LocalCluster$Builder.)
@@ -376,12 +390,12 @@
(.feed feeder ["a"] 1)
(.advanceClusterTime cluster 70)
- (assert-buckets! "__system" "newWorkerEvent" [1] cluster)
+ (assert-metric-running-sum! "__system" "newWorkerEvent" 1 1 cluster)
(assert-metric-data-exists! "__system" "uptimeSecs")
(assert-metric-data-exists! "__system" "startTimeSecs")
(.advanceClusterTime cluster 180)
- (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
+ (assert-metric-running-sum! "__system" "newWorkerEvent" 1 4 cluster)
)))
http://git-wip-us.apache.org/repos/asf/storm/blob/f876b31f/storm-core/test/resources/log4j2-test.xml
----------------------------------------------------------------------
diff --git a/storm-core/test/resources/log4j2-test.xml b/storm-core/test/resources/log4j2-test.xml
index e8ae19e..d498538 100644
--- a/storm-core/test/resources/log4j2-test.xml
+++ b/storm-core/test/resources/log4j2-test.xml
@@ -23,7 +23,8 @@
</Console>
</Appenders>
<Loggers>
- <Logger name="org.apache.zookeeper" level="WARN"/>
+ <Logger name="org.apache.storm.shade.org.apache.zookeeper" level="WARN"/>
+ <Logger name="org.apache.storm.shade.org.apache.curator" level="WARN"/>
<Root level="${env:LOG_LEVEL:-INFO}">
<AppenderRef ref="Console"/>
</Root>
[2/2] storm git commit: Merge branch 'STORM-3121' of
https://github.com/srdo/storm into STORM-3121-merge
Posted by ka...@apache.org.
Merge branch 'STORM-3121' of https://github.com/srdo/storm into STORM-3121-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69801887
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69801887
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69801887
Branch: refs/heads/master
Commit: 69801887c324a7c6366b2dbefbada4c4e6a48c8e
Parents: a050ee8 f876b31
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 26 16:52:33 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 26 16:52:33 2018 +0900
----------------------------------------------------------------------
pom.xml | 7 +
storm-core/pom.xml | 5 +
.../test/clj/org/apache/storm/metrics_test.clj | 152 ++++++++++---------
storm-core/test/resources/log4j2-test.xml | 3 +-
4 files changed, 97 insertions(+), 70 deletions(-)
----------------------------------------------------------------------