You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/05/29 21:13:30 UTC
[1/5] git commit: bug fix: task metric send repeatedly
Repository: incubator-storm
Updated Branches:
refs/heads/master bfdce7aae -> cd0b11884
bug fix: task metric send repeatedly
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d20ba2e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d20ba2e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d20ba2e7
Branch: refs/heads/master
Commit: d20ba2e720f66529c1b187d855c857ee6f8d78f2
Parents: f06d81b
Author: troyding <di...@gmail.com>
Authored: Mon May 19 14:18:34 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Mon May 19 14:18:34 2014 +0800
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 42 ++++++++++----------
1 file changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d20ba2e7/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index f133a1b..15c375f 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -270,27 +270,27 @@
receive-queue
[[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
-(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
+(defn metrics-tick [executor-data task-data ^TupleImpl tuple]
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
- interval (.getInteger tuple 0)]
- (doseq [[task-id task-data] task-datas
- :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
- task-info (IMetricsConsumer$TaskInfo.
- (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
- (.getThisWorkerPort worker-context)
- (:component-id executor-data)
- task-id
- (long (/ (System/currentTimeMillis) 1000))
- interval)
- data-points (->> name->imetric
- (map (fn [[name imetric]]
- (let [value (.getValueAndReset ^IMetric imetric)]
- (if value
- (IMetricsConsumer$DataPoint. name value)))))
- (filter identity)
- (into []))]]
+ interval (.getInteger tuple 0)
+ task-id (:task-id task-data)
+ name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
+ task-info (IMetricsConsumer$TaskInfo.
+ (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
+ (.getThisWorkerPort worker-context)
+ (:component-id executor-data)
+ task-id
+ (long (/ (System/currentTimeMillis) 1000))
+ interval)
+ data-points (->> name->imetric
+ (map (fn [[name imetric]]
+ (let [value (.getValueAndReset ^IMetric imetric)]
+ (if value
+ (IMetricsConsumer$DataPoint. name value)))))
+ (filter identity)
+ (into []))]
(if (seq data-points)
- (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))))
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -432,7 +432,7 @@
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -616,7 +616,7 @@
;; need to do it this way to avoid reflection
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
(let [task-data (get task-datas task-id)
^IBolt bolt-obj (:object task-data)
user-context (:user-context task-data)
[3/5] git commit: Add test case to metrics_test.clj
Posted by pt...@apache.org.
Add test case to metrics_test.clj
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7346db2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7346db2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7346db2f
Branch: refs/heads/master
Commit: 7346db2f4dc10e59965d3f9dfa9a9611ccefe41a
Parents: 0d6b438
Author: troyding <di...@gmail.com>
Authored: Wed May 21 16:58:20 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Wed May 21 16:58:20 2014 +0800
----------------------------------------------------------------------
.../starter/ExclamationTopologyWithMetrics.java | 88 --------------------
.../test/clj/backtype/storm/metrics_test.clj | 24 ++++++
2 files changed, 24 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
deleted file mode 100644
index 96939c3..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.metric.LoggingMetricsConsumer;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology with metrics logging to the file.
- */
-public class ExclamationTopologyWithMetrics {
-
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
-
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("word", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word").setNumTasks(6);
- builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1").setNumTasks(6);
-
- Config conf = new Config();
- conf.setDebug(true);
- conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index edd3a45..2f5b0e1 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -118,6 +118,30 @@
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+(deftest test-custom-metric-with-multi-tasks
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+ (let [feeder (feeder-spout ["field1"])
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
+ (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 6)
+ (assert-buckets! "2" "my-custom-metric" [1])
+
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-metric" [1 0])
+
+ (advance-cluster-time cluster 20)
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
(deftest test-builtin-metrics-1
(with-simulated-time-local-cluster
[2/5] git commit: Add ExclamationTopologyWithMetrics to storm-starter
Posted by pt...@apache.org.
Add ExclamationTopologyWithMetrics to storm-starter
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0d6b438f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0d6b438f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0d6b438f
Branch: refs/heads/master
Commit: 0d6b438fff0ddeb254efe3d858ae7a29251ce289
Parents: d20ba2e
Author: troyding <di...@gmail.com>
Authored: Tue May 20 11:37:52 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Tue May 20 11:37:52 2014 +0800
----------------------------------------------------------------------
.../starter/ExclamationTopologyWithMetrics.java | 88 ++++++++++++++++++++
1 file changed, 88 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0d6b438f/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
new file mode 100644
index 0000000..96939c3
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.metric.LoggingMetricsConsumer;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * This is a basic example of a Storm topology with metrics logging to the file.
+ */
+public class ExclamationTopologyWithMetrics {
+
+ public static class ExclamationBolt extends BaseRichBolt {
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ _collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("word", new TestWordSpout(), 10);
+ builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word").setNumTasks(6);
+ builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1").setNumTasks(6);
+
+ Config conf = new Config();
+ conf.setDebug(true);
+ conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ }
+ else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(10000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
[5/5] git commit: STORM-326: update changelog
Posted by pt...@apache.org.
STORM-326: update changelog
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/cd0b1188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/cd0b1188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/cd0b1188
Branch: refs/heads/master
Commit: cd0b118843fb8627810debd78664f0aa53cd6727
Parents: 2cd8ee3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu May 29 15:13:19 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu May 29 15:13:19 2014 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/cd0b1188/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 960473c..134d009 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.9.2-incubating (unreleased)
+ * STORM-326: tasks send duplicate metrics
* STORM-331: Update the Kafka dependency of storm-kafka to 0.8.1.1
* STORM-308: Add support for config_value to {supervisor,nimbus,ui,drpc,logviewer} childopts
* STORM-309: storm-starter Readme: windows documentation update
[4/5] git commit: Merge branch 'master' of
github.com:troyding/incubator-storm
Posted by pt...@apache.org.
Merge branch 'master' of github.com:troyding/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2cd8ee37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2cd8ee37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2cd8ee37
Branch: refs/heads/master
Commit: 2cd8ee37e60c180fe46f03d2bdbe3042db8bc517
Parents: bfdce7a 7346db2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu May 29 15:08:49 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu May 29 15:08:49 2014 -0400
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 42 ++++++++++----------
.../test/clj/backtype/storm/metrics_test.clj | 24 +++++++++++
2 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------