You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/30 14:27:49 UTC

[1/3] storm git commit: STORM-1229: port backtype.storm.metric.testing to java

Repository: storm
Updated Branches:
  refs/heads/master ca45d46ca -> 7d724e8ca


STORM-1229: port backtype.storm.metric.testing to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07e1b231
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07e1b231
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07e1b231

Branch: refs/heads/master
Commit: 07e1b231e37217705add955d76b57fa530b99955
Parents: 367464a
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Sun Mar 20 01:17:33 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Sun Mar 20 01:17:33 2016 +0530

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/metric/testing.clj | 68 ---------------
 .../test/clj/org/apache/storm/metrics_test.clj  | 32 ++++---
 .../apache/storm/metric/FakeMetricConsumer.java | 88 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/07e1b231/storm-core/src/clj/org/apache/storm/metric/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/metric/testing.clj b/storm-core/src/clj/org/apache/storm/metric/testing.clj
deleted file mode 100644
index a8ec438..0000000
--- a/storm-core/src/clj/org/apache/storm/metric/testing.clj
+++ /dev/null
@@ -1,68 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.metric.testing
-  "This namespace is for AOT dependent metrics testing code."
-  (:gen-class))
-
-(letfn [(for- [threader arg seq-exprs body]
-          `(reduce #(%2 %1)
-                   ~arg
-                   (for ~seq-exprs
-                     (fn [arg#] (~threader arg# ~@body)))))]
-  (defmacro for->
-    "Apply a thread expression to a sequence.
-   eg.
-      (-> 1
-        (for-> [x [1 2 3]]
-          (+ x)))
-   => 7"
-    {:indent 1}
-    [arg seq-exprs & body]
-    (for- 'clojure.core/-> arg seq-exprs body)))
-
-(gen-class
- :name clojure.storm.metric.testing.FakeMetricConsumer
- :implements [org.apache.storm.metric.api.IMetricsConsumer]
- :prefix "impl-")
-
-(def buffer (atom nil))
-
-(defn impl-prepare [this conf argument ctx error-reporter]
-  (reset! buffer {}))
-
-(defn impl-cleanup [this]
-  (reset! buffer {}))
-
-(defn vec-conj [coll x] (if coll
-                          (conj coll x)
-                          [x]))
-
-(defn expand-complex-datapoint [dp]
-  (if (or (map? (.value dp))
-          (instance? java.util.AbstractMap (.value dp)))
-    (into [] (for [[k v] (.value dp)]
-               [(str (.name dp) "/" k) v]))
-    [[(.name dp) (.value dp)]]))
-
-(defn impl-handleDataPoints [this task-info data-points]  
-  (swap! buffer
-         (fn [old]
-           (-> old
-            (for-> [dp data-points
-                    [name val] (expand-complex-datapoint dp)]
-                   (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val))))))
- 
-

http://git-wip-us.apache.org/repos/asf/storm/blob/07e1b231/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index c186288..3308653 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -28,9 +28,10 @@
   (:use [org.apache.storm testing config])
   (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common])
-  (:use [org.apache.storm.metric testing])
+  (:use [org.apache.storm.util])
   (:import [org.apache.storm Thrift])
-  (:import [org.apache.storm.utils Utils]))
+  (:import [org.apache.storm.utils Utils]
+           (org.apache.storm.metric FakeMetricConsumer)))
 
 (defbolt acking-bolt {} {:prepare true}
   [conf context collector]  
@@ -68,24 +69,21 @@
               (.incr mycustommetric)
               (ack! collector tuple)))))
 
-(def metrics-data org.apache.storm.metric.testing/buffer)
-
 (defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
   (while-timeout TEST-TIMEOUT-MS
-      (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
+      (let [taskid->buckets (clojurify-structure (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name))]
         (or
          (and (not= N 0) (nil? taskid->buckets))
          (not-every? #(<= N %) (map (comp count second) taskid->buckets))))
-      ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " (-> @metrics-data (get comp-id) (get metric-name)))
+      ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " FakeMetricConsumer/getTaskIdToBuckets)
     (if cluster
       (advance-cluster-time cluster 1)
       (Thread/sleep 10))))
     
 
 (defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
-  (-> @metrics-data
-      (get comp-id)
-      (get metric-name)
+  (-> (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name)
+      (clojurify-structure)
       (first) ;; pick first task in the list, ignore other tasks' metric data.
       (second)
       (or [])))
@@ -102,7 +100,7 @@
 (deftest test-custom-metric
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            "storm.zookeeper.connection.timeout" 30000
                            "storm.zookeeper.session.timeout" 60000
                            }]
@@ -133,7 +131,7 @@
 (deftest test-custom-metric-with-multi-tasks
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            "storm.zookeeper.connection.timeout" 30000
                            "storm.zookeeper.session.timeout" 60000
                            }]
@@ -169,7 +167,7 @@
 (deftest test-custom-metric-with-multilang-py
   (with-simulated-time-local-cluster 
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                       [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                       [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
                        "storm.zookeeper.session.timeout" 60000
                        }]
@@ -205,7 +203,7 @@
 (deftest test-custom-metric-with-spout-multilang-py
   (with-simulated-time-local-cluster 
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                       [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                       [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                        "storm.zookeeper.connection.timeout" 30000
                        "storm.zookeeper.session.timeout" 60000}]
     (let [topology (Thrift/buildTopology
@@ -224,7 +222,7 @@
 (deftest test-builtin-metrics-1
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER                    
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
     (let [feeder (feeder-spout ["field1"])
@@ -264,7 +262,7 @@
 (deftest test-builtin-metrics-2
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
     (let [feeder (feeder-spout ["field1"])
@@ -318,7 +316,7 @@
 (deftest test-builtin-metrics-3
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
@@ -359,7 +357,7 @@
 (deftest test-system-bolt
   (with-simulated-time-local-cluster
     [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
     (let [feeder (feeder-spout ["field1"])
           topology (Thrift/buildTopology

http://git-wip-us.apache.org/repos/asf/storm/blob/07e1b231/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java b/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java
new file mode 100644
index 0000000..a6dc391
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FakeMetricConsumer implements IMetricsConsumer {
+
+    public static final Table<String, String, Multimap<Integer, Object>> buffer = HashBasedTable.create();
+
+    @Override
+    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
+        synchronized (buffer) {
+            buffer.clear();
+        }
+    }
+
+    @Override
+    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+        synchronized (buffer) {
+            for (DataPoint dp : dataPoints) {
+                for (Map.Entry<String, Object> entry : expandComplexDataPoint(dp).entrySet()) {
+                    String metricName = entry.getKey();
+                    Multimap<Integer, Object> taskIdToBucket = buffer.get(taskInfo.srcComponentId, metricName);
+                    if (null == taskIdToBucket) {
+                        taskIdToBucket = ArrayListMultimap.create();
+                        taskIdToBucket.put(taskInfo.srcTaskId, entry.getValue());
+                    } else {
+                        taskIdToBucket.get(taskInfo.srcTaskId).add(entry.getValue());
+                    }
+                    buffer.put(taskInfo.srcComponentId, metricName, taskIdToBucket);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        synchronized (buffer) {
+            buffer.clear();
+        }
+    }
+
+    private Map<String, Object> expandComplexDataPoint(DataPoint dp) {
+        Map<String, Object> expanded = new HashMap<>();
+        if (dp.value instanceof Map) {
+            for (Map.Entry entry : ((Map<Object, Object>) dp.value).entrySet()) {
+                expanded.put(dp.name + "/" + entry.getKey(), entry.getValue());
+            }
+        } else {
+            expanded.put(dp.name, dp.value);
+        }
+        return expanded;
+    }
+
+    public static Map<Integer, Collection<Object>> getTaskIdToBuckets(String componentName, String metricName) {
+        synchronized (buffer) {
+            Multimap<Integer, Object> taskIdToBuckets = buffer.get(componentName, metricName);
+            return (null != taskIdToBuckets) ? taskIdToBuckets.asMap() : null;
+        }
+    }
+}


[2/3] storm git commit: Merge branch 'metric-testing' of https://github.com/abhishekagarwal87/storm into STORM-1229

Posted by ka...@apache.org.
Merge branch 'metric-testing' of https://github.com/abhishekagarwal87/storm into STORM-1229


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ec635958
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ec635958
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ec635958

Branch: refs/heads/master
Commit: ec6359584c45fe428794fdb3f72342876850965f
Parents: ca45d46 07e1b23
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 30 23:27:04 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 30 23:27:04 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/metric/testing.clj | 68 ---------------
 .../test/clj/org/apache/storm/metrics_test.clj  | 32 ++++---
 .../apache/storm/metric/FakeMetricConsumer.java | 88 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 85 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: add STORM-1229 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1229 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d724e8c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d724e8c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d724e8c

Branch: refs/heads/master
Commit: 7d724e8ca83b3396a5c9a6e218098912d704e9bc
Parents: ec63595
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 30 23:27:34 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 30 23:27:34 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7d724e8c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 619f414..93ba84a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1229: port backtype.storm.metric.testing to java
  * STORM-1228: port backtype.storm.fields-test to java
  * STORM-1233: Port AuthUtilsTest to java
  * STORM-1920: version of parent pom for storm-kafka-monitor is set 1.0.2-SNAPSHOT in master branch