You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/30 14:27:49 UTC
[1/3] storm git commit: STORM-1229: port
backtype.storm.metric.testing to java
Repository: storm
Updated Branches:
refs/heads/master ca45d46ca -> 7d724e8ca
STORM-1229: port backtype.storm.metric.testing to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07e1b231
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07e1b231
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07e1b231
Branch: refs/heads/master
Commit: 07e1b231e37217705add955d76b57fa530b99955
Parents: 367464a
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Sun Mar 20 01:17:33 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Sun Mar 20 01:17:33 2016 +0530
----------------------------------------------------------------------
.../src/clj/org/apache/storm/metric/testing.clj | 68 ---------------
.../test/clj/org/apache/storm/metrics_test.clj | 32 ++++---
.../apache/storm/metric/FakeMetricConsumer.java | 88 ++++++++++++++++++++
3 files changed, 103 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/07e1b231/storm-core/src/clj/org/apache/storm/metric/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/metric/testing.clj b/storm-core/src/clj/org/apache/storm/metric/testing.clj
deleted file mode 100644
index a8ec438..0000000
--- a/storm-core/src/clj/org/apache/storm/metric/testing.clj
+++ /dev/null
@@ -1,68 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.metric.testing
- "This namespace is for AOT dependent metrics testing code."
- (:gen-class))
-
-(letfn [(for- [threader arg seq-exprs body]
- `(reduce #(%2 %1)
- ~arg
- (for ~seq-exprs
- (fn [arg#] (~threader arg# ~@body)))))]
- (defmacro for->
- "Apply a thread expression to a sequence.
- eg.
- (-> 1
- (for-> [x [1 2 3]]
- (+ x)))
- => 7"
- {:indent 1}
- [arg seq-exprs & body]
- (for- 'clojure.core/-> arg seq-exprs body)))
-
-(gen-class
- :name clojure.storm.metric.testing.FakeMetricConsumer
- :implements [org.apache.storm.metric.api.IMetricsConsumer]
- :prefix "impl-")
-
-(def buffer (atom nil))
-
-(defn impl-prepare [this conf argument ctx error-reporter]
- (reset! buffer {}))
-
-(defn impl-cleanup [this]
- (reset! buffer {}))
-
-(defn vec-conj [coll x] (if coll
- (conj coll x)
- [x]))
-
-(defn expand-complex-datapoint [dp]
- (if (or (map? (.value dp))
- (instance? java.util.AbstractMap (.value dp)))
- (into [] (for [[k v] (.value dp)]
- [(str (.name dp) "/" k) v]))
- [[(.name dp) (.value dp)]]))
-
-(defn impl-handleDataPoints [this task-info data-points]
- (swap! buffer
- (fn [old]
- (-> old
- (for-> [dp data-points
- [name val] (expand-complex-datapoint dp)]
- (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val))))))
-
-
http://git-wip-us.apache.org/repos/asf/storm/blob/07e1b231/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index c186288..3308653 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -28,9 +28,10 @@
(:use [org.apache.storm testing config])
(:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common])
- (:use [org.apache.storm.metric testing])
+ (:use [org.apache.storm.util])
(:import [org.apache.storm Thrift])
- (:import [org.apache.storm.utils Utils]))
+ (:import [org.apache.storm.utils Utils]
+ (org.apache.storm.metric FakeMetricConsumer)))
(defbolt acking-bolt {} {:prepare true}
[conf context collector]
@@ -68,24 +69,21 @@
(.incr mycustommetric)
(ack! collector tuple)))))
-(def metrics-data org.apache.storm.metric.testing/buffer)
-
(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
(while-timeout TEST-TIMEOUT-MS
- (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
+ (let [taskid->buckets (clojurify-structure (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name))]
(or
(and (not= N 0) (nil? taskid->buckets))
(not-every? #(<= N %) (map (comp count second) taskid->buckets))))
- ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
+ ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " FakeMetricConsumer/getTaskIdToBuckets)
(if cluster
(advance-cluster-time cluster 1)
(Thread/sleep 10))))
(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
- (-> @metrics-data
- (get comp-id)
- (get metric-name)
+ (-> (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name)
+ (clojurify-structure)
(first) ;; pick first task in the list, ignore other tasks' metric data.
(second)
(or [])))
@@ -102,7 +100,7 @@
(deftest test-custom-metric
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
}]
@@ -133,7 +131,7 @@
(deftest test-custom-metric-with-multi-tasks
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
}]
@@ -169,7 +167,7 @@
(deftest test-custom-metric-with-multilang-py
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
}]
@@ -205,7 +203,7 @@
(deftest test-custom-metric-with-spout-multilang-py
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000}]
(let [topology (Thrift/buildTopology
@@ -224,7 +222,7 @@
(deftest test-builtin-metrics-1
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
(let [feeder (feeder-spout ["field1"])
@@ -264,7 +262,7 @@
(deftest test-builtin-metrics-2
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
(let [feeder (feeder-spout ["field1"])
@@ -318,7 +316,7 @@
(deftest test-builtin-metrics-3
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
@@ -359,7 +357,7 @@
(deftest test-system-bolt
(with-simulated-time-local-cluster
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
(let [feeder (feeder-spout ["field1"])
topology (Thrift/buildTopology
http://git-wip-us.apache.org/repos/asf/storm/blob/07e1b231/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java b/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java
new file mode 100644
index 0000000..a6dc391
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FakeMetricConsumer implements IMetricsConsumer {
+
+ public static final Table<String, String, Multimap<Integer, Object>> buffer = HashBasedTable.create();
+
+ @Override
+ public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
+ synchronized (buffer) {
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ synchronized (buffer) {
+ for (DataPoint dp : dataPoints) {
+ for (Map.Entry<String, Object> entry : expandComplexDataPoint(dp).entrySet()) {
+ String metricName = entry.getKey();
+ Multimap<Integer, Object> taskIdToBucket = buffer.get(taskInfo.srcComponentId, metricName);
+ if (null == taskIdToBucket) {
+ taskIdToBucket = ArrayListMultimap.create();
+ taskIdToBucket.put(taskInfo.srcTaskId, entry.getValue());
+ } else {
+ taskIdToBucket.get(taskInfo.srcTaskId).add(entry.getValue());
+ }
+ buffer.put(taskInfo.srcComponentId, metricName, taskIdToBucket);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ synchronized (buffer) {
+ buffer.clear();
+ }
+ }
+
+ private Map<String, Object> expandComplexDataPoint(DataPoint dp) {
+ Map<String, Object> expanded = new HashMap<>();
+ if (dp.value instanceof Map) {
+ for (Map.Entry entry : ((Map<Object, Object>) dp.value).entrySet()) {
+ expanded.put(dp.name + "/" + entry.getKey(), entry.getValue());
+ }
+ } else {
+ expanded.put(dp.name, dp.value);
+ }
+ return expanded;
+ }
+
+ public static Map<Integer, Collection<Object>> getTaskIdToBuckets(String componentName, String metricName) {
+ synchronized (buffer) {
+ Multimap<Integer, Object> taskIdToBuckets = buffer.get(componentName, metricName);
+ return (null != taskIdToBuckets) ? taskIdToBuckets.asMap() : null;
+ }
+ }
+}
[2/3] storm git commit: Merge branch 'metric-testing' of
https://github.com/abhishekagarwal87/storm into STORM-1229
Posted by ka...@apache.org.
Merge branch 'metric-testing' of https://github.com/abhishekagarwal87/storm into STORM-1229
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ec635958
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ec635958
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ec635958
Branch: refs/heads/master
Commit: ec6359584c45fe428794fdb3f72342876850965f
Parents: ca45d46 07e1b23
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 30 23:27:04 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 30 23:27:04 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/metric/testing.clj | 68 ---------------
.../test/clj/org/apache/storm/metrics_test.clj | 32 ++++---
.../apache/storm/metric/FakeMetricConsumer.java | 88 ++++++++++++++++++++
3 files changed, 103 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-1229 to CHANGELOG
Posted by ka...@apache.org.
add STORM-1229 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d724e8c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d724e8c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d724e8c
Branch: refs/heads/master
Commit: 7d724e8ca83b3396a5c9a6e218098912d704e9bc
Parents: ec63595
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 30 23:27:34 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 30 23:27:34 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7d724e8c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 619f414..93ba84a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1229: port backtype.storm.metric.testing to java
* STORM-1228: port backtype.storm.fields-test to java
* STORM-1233: Port AuthUtilsTest to java
* STORM-1920: version of parent pom for storm-kafka-monitor is set 1.0.2-SNAPSHOT in master branch