You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/05/29 21:13:30 UTC

[1/5] git commit: bug fix: task metric send repeatedly

Repository: incubator-storm
Updated Branches:
  refs/heads/master bfdce7aae -> cd0b11884


bug fix: task metric send repeatedly


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d20ba2e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d20ba2e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d20ba2e7

Branch: refs/heads/master
Commit: d20ba2e720f66529c1b187d855c857ee6f8d78f2
Parents: f06d81b
Author: troyding <di...@gmail.com>
Authored: Mon May 19 14:18:34 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Mon May 19 14:18:34 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 42 ++++++++++----------
 1 file changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d20ba2e7/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index f133a1b..15c375f 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -270,27 +270,27 @@
           receive-queue
           [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
 
-(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
+(defn metrics-tick [executor-data task-data ^TupleImpl tuple]
   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
-        interval (.getInteger tuple 0)]
-    (doseq [[task-id task-data] task-datas
-            :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
-                  task-info (IMetricsConsumer$TaskInfo.
-                             (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
-                             (.getThisWorkerPort worker-context)
-                             (:component-id executor-data)
-                             task-id
-                             (long (/ (System/currentTimeMillis) 1000))
-                             interval)
-                  data-points (->> name->imetric
-                                   (map (fn [[name imetric]]
-                                          (let [value (.getValueAndReset ^IMetric imetric)]
-                                            (if value
-                                              (IMetricsConsumer$DataPoint. name value)))))
-                                   (filter identity)
-                                   (into []))]]
+        interval (.getInteger tuple 0)
+        task-id (:task-id task-data)
+        name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
+        task-info (IMetricsConsumer$TaskInfo.
+                    (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
+                    (.getThisWorkerPort worker-context)
+                    (:component-id executor-data)
+                    task-id
+                    (long (/ (System/currentTimeMillis) 1000))
+                    interval)
+        data-points (->> name->imetric
+                      (map (fn [[name imetric]]
+                             (let [value (.getValueAndReset ^IMetric imetric)]
+                               (if value
+                                 (IMetricsConsumer$DataPoint. name value)))))
+                      (filter identity)
+                      (into []))]
       (if (seq data-points)
-        (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))))
+        (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
 
 (defn setup-ticks! [worker executor-data]
   (let [storm-conf (:storm-conf executor-data)
@@ -432,7 +432,7 @@
                           (let [stream-id (.getSourceStreamId tuple)]
                             (condp = stream-id
                               Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
                               (let [id (.getValue tuple 0)
                                     [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
                                 (when spout-id
@@ -616,7 +616,7 @@
                           ;; need to do it this way to avoid reflection
                           (let [stream-id (.getSourceStreamId tuple)]
                             (condp = stream-id
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
                               (let [task-data (get task-datas task-id)
                                     ^IBolt bolt-obj (:object task-data)
                                     user-context (:user-context task-data)


[3/5] git commit: Add test case to metrics_test.clj

Posted by pt...@apache.org.
Add test case to metrics_test.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7346db2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7346db2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7346db2f

Branch: refs/heads/master
Commit: 7346db2f4dc10e59965d3f9dfa9a9611ccefe41a
Parents: 0d6b438
Author: troyding <di...@gmail.com>
Authored: Wed May 21 16:58:20 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Wed May 21 16:58:20 2014 +0800

----------------------------------------------------------------------
 .../starter/ExclamationTopologyWithMetrics.java | 88 --------------------
 .../test/clj/backtype/storm/metrics_test.clj    | 24 ++++++
 2 files changed, 24 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
deleted file mode 100644
index 96939c3..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.metric.LoggingMetricsConsumer;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology with metrics logging to the file.
- */
-public class ExclamationTopologyWithMetrics {
-
-  public static class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-      _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-
-
-  }
-
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("word", new TestWordSpout(), 10);
-    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word").setNumTasks(6);
-    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1").setNumTasks(6);
-
-    Config conf = new Config();
-    conf.setDebug(true);
-    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
-      cluster.killTopology("test");
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index edd3a45..2f5b0e1 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -118,6 +118,30 @@
       (advance-cluster-time cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
 
+(deftest test-custom-metric-with-multi-tasks
+  (with-simulated-time-local-cluster
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+    (let [feeder (feeder-spout ["field1"])
+          topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec feeder)}
+                     {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
+      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+      (.feed feeder ["a"] 1)
+      (advance-cluster-time cluster 6)
+      (assert-buckets! "2" "my-custom-metric" [1])
+
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-metric" [1 0])
+
+      (advance-cluster-time cluster 20)
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
 
 (deftest test-builtin-metrics-1
   (with-simulated-time-local-cluster


[2/5] git commit: Add ExclamationTopologyWithMetrics to storm-starter

Posted by pt...@apache.org.
Add ExclamationTopologyWithMetrics to storm-starter


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0d6b438f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0d6b438f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0d6b438f

Branch: refs/heads/master
Commit: 0d6b438fff0ddeb254efe3d858ae7a29251ce289
Parents: d20ba2e
Author: troyding <di...@gmail.com>
Authored: Tue May 20 11:37:52 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Tue May 20 11:37:52 2014 +0800

----------------------------------------------------------------------
 .../starter/ExclamationTopologyWithMetrics.java | 88 ++++++++++++++++++++
 1 file changed, 88 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0d6b438f/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
new file mode 100644
index 0000000..96939c3
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.metric.LoggingMetricsConsumer;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * This is a basic example of a Storm topology with metrics logging to the file.
+ */
+public class ExclamationTopologyWithMetrics {
+
+  public static class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+      _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+      _collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.setSpout("word", new TestWordSpout(), 10);
+    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word").setNumTasks(6);
+    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1").setNumTasks(6);
+
+    Config conf = new Config();
+    conf.setDebug(true);
+    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    }
+    else {
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("test", conf, builder.createTopology());
+      Utils.sleep(10000);
+      cluster.killTopology("test");
+      cluster.shutdown();
+    }
+  }
+}


[5/5] git commit: STORM-326: update changelog

Posted by pt...@apache.org.
STORM-326: update changelog


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/cd0b1188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/cd0b1188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/cd0b1188

Branch: refs/heads/master
Commit: cd0b118843fb8627810debd78664f0aa53cd6727
Parents: 2cd8ee3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu May 29 15:13:19 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu May 29 15:13:19 2014 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/cd0b1188/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 960473c..134d009 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.9.2-incubating (unreleased)
+ * STORM-326: tasks send duplicate metrics
  * STORM-331: Update the Kafka dependency of storm-kafka to 0.8.1.1
  * STORM-308: Add support for config_value to {supervisor,nimbus,ui,drpc,logviewer} childopts
  * STORM-309: storm-starter Readme: windows documentation update


[4/5] git commit: Merge branch 'master' of github.com:troyding/incubator-storm

Posted by pt...@apache.org.
Merge branch 'master' of github.com:troyding/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2cd8ee37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2cd8ee37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2cd8ee37

Branch: refs/heads/master
Commit: 2cd8ee37e60c180fe46f03d2bdbe3042db8bc517
Parents: bfdce7a 7346db2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu May 29 15:08:49 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu May 29 15:08:49 2014 -0400

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 42 ++++++++++----------
 .../test/clj/backtype/storm/metrics_test.clj    | 24 +++++++++++
 2 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------