You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/05/29 21:13:32 UTC
[3/5] git commit: Add test case to metrics_test.clj
Add test case to metrics_test.clj
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7346db2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7346db2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7346db2f
Branch: refs/heads/master
Commit: 7346db2f4dc10e59965d3f9dfa9a9611ccefe41a
Parents: 0d6b438
Author: troyding <di...@gmail.com>
Authored: Wed May 21 16:58:20 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Wed May 21 16:58:20 2014 +0800
----------------------------------------------------------------------
.../starter/ExclamationTopologyWithMetrics.java | 88 --------------------
.../test/clj/backtype/storm/metrics_test.clj | 24 ++++++
2 files changed, 24 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
deleted file mode 100644
index 96939c3..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.metric.LoggingMetricsConsumer;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology with metrics logging to the file.
- */
-public class ExclamationTopologyWithMetrics {
-
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
-
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("word", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word").setNumTasks(6);
- builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1").setNumTasks(6);
-
- Config conf = new Config();
- conf.setDebug(true);
- conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index edd3a45..2f5b0e1 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -118,6 +118,30 @@
(advance-cluster-time cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+(deftest test-custom-metric-with-multi-tasks
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+ (let [feeder (feeder-spout ["field1"])
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
+ (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 6)
+ (assert-buckets! "2" "my-custom-metric" [1])
+
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-metric" [1 0])
+
+ (advance-cluster-time cluster 20)
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
(deftest test-builtin-metrics-1
(with-simulated-time-local-cluster