You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/05/29 21:13:32 UTC

[3/5] git commit: Add test case to metrics_test.clj

Add test case to metrics_test.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7346db2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7346db2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7346db2f

Branch: refs/heads/master
Commit: 7346db2f4dc10e59965d3f9dfa9a9611ccefe41a
Parents: 0d6b438
Author: troyding <di...@gmail.com>
Authored: Wed May 21 16:58:20 2014 +0800
Committer: troyding <di...@gmail.com>
Committed: Wed May 21 16:58:20 2014 +0800

----------------------------------------------------------------------
 .../starter/ExclamationTopologyWithMetrics.java | 88 --------------------
 .../test/clj/backtype/storm/metrics_test.clj    | 24 ++++++
 2 files changed, 24 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
deleted file mode 100644
index 96939c3..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopologyWithMetrics.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.metric.LoggingMetricsConsumer;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology with metrics logging to the file.
- */
-public class ExclamationTopologyWithMetrics {
-
-  public static class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-      _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-
-
-  }
-
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("word", new TestWordSpout(), 10);
-    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word").setNumTasks(6);
-    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1").setNumTasks(6);
-
-    Config conf = new Config();
-    conf.setDebug(true);
-    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
-      cluster.killTopology("test");
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7346db2f/storm-core/test/clj/backtype/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/metrics_test.clj b/storm-core/test/clj/backtype/storm/metrics_test.clj
index edd3a45..2f5b0e1 100644
--- a/storm-core/test/clj/backtype/storm/metrics_test.clj
+++ b/storm-core/test/clj/backtype/storm/metrics_test.clj
@@ -118,6 +118,30 @@
       (advance-cluster-time cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
 
+(deftest test-custom-metric-with-multi-tasks
+  (with-simulated-time-local-cluster
+    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                           [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+    (let [feeder (feeder-spout ["field1"])
+          topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec feeder)}
+                     {"2" (thrift/mk-bolt-spec {"1" :all} count-acks :p 1 :conf {TOPOLOGY-TASKS 2})})]
+      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+      (.feed feeder ["a"] 1)
+      (advance-cluster-time cluster 6)
+      (assert-buckets! "2" "my-custom-metric" [1])
+
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-metric" [1 0])
+
+      (advance-cluster-time cluster 20)
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)
+      (advance-cluster-time cluster 5)
+      (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
 
 (deftest test-builtin-metrics-1
   (with-simulated-time-local-cluster