You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/12/09 21:24:07 UTC
[1/4] storm git commit: add preliminary metrics for code hale metrics
Repository: storm
Updated Branches:
refs/heads/metrics_v2 08b0074c4 -> c7e1a38b4
add preliminary metrics for code hale metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c93a0db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c93a0db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c93a0db
Branch: refs/heads/metrics_v2
Commit: 6c93a0db67ea0fef83d048129934e6e5adf74e02
Parents: 08b0074
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Nov 14 15:02:37 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Nov 14 15:02:37 2016 -0500
----------------------------------------------------------------------
storm-core/pom.xml | 9 ++---
.../clj/org/apache/storm/daemon/executor.clj | 16 ++++++--
.../src/clj/org/apache/storm/daemon/task.clj | 7 +++-
.../src/clj/org/apache/storm/daemon/worker.clj | 4 ++
.../storm/metrics2/StormMetricRegistry.java | 40 ++++++++++++++++++++
5 files changed, 67 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6c93a0db/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b05fd35..38a0811 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -526,7 +526,6 @@
<include>org.clojure:tools.namespace</include>
<include>cheshire:cheshire</include>
<include>org.clojure:core.incubator</include>
- <include>io.dropwizard.metrics:*</include>
<include>metrics-clojure:*</include>
</includes>
</artifactSet>
@@ -700,10 +699,10 @@
<pattern>org.eclipse.jetty</pattern>
<shadedPattern>org.apache.storm.shade.org.eclipse.jetty</shadedPattern>
</relocation>
- <relocation>
- <pattern>com.codahale.metrics</pattern>
- <shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>
- </relocation>
+ <!--<relocation>-->
+ <!--<pattern>com.codahale.metrics</pattern>-->
+ <!--<shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>-->
+ <!--</relocation>-->
<relocation>
<pattern>metrics.core</pattern>
<shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern>
http://git-wip-us.apache.org/repos/asf/storm/blob/6c93a0db/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 3b20b84..f1af8e7 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -35,6 +35,8 @@
(:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
+ (:import [org.apache.storm.metrics2 StormMetricRegistry])
+ (:import [com.codahale.metrics Meter])
(:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
(:import [java.util.concurrent ConcurrentLinkedQueue])
(:require [org.apache.storm [thrift :as thrift]
@@ -274,7 +276,9 @@
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>))))
:sampler (mk-stats-sampler storm-conf)
- :spout-throttling-metrics (if (= executor-type :spout)
+ :failed-meter (StormMetricRegistry/meter "failed" worker-context component-id)
+ :acked-meter (StormMetricRegistry/meter "acked" worker-context component-id)
+ :spout-throttling-metrics (if (= executor-type :spout)
(builtin-metrics/make-spout-throttling-data)
nil)
;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
@@ -428,10 +432,12 @@
(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?]
(let [^ISpout spout (:object task-data)
storm-conf (:storm-conf executor-data)
- task-id (:task-id task-data)]
+ task-id (:task-id task-data)
+ failed-meter (:failed-meter executor-data)]
;;TODO: need to throttle these when there's lots of failures
(when debug?
(log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
+ (.mark failed-meter)
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
@@ -439,8 +445,10 @@
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
(let [^ISpout spout (:object task-data)
- task-id (:task-id task-data)]
+ task-id (:task-id task-data)
+ acked-meter (:acked-meter executor-data)]
(when debug? (log-message "SPOUT Acking message " id " " msg-id))
+ (.mark acked-meter)
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
@@ -808,6 +816,7 @@
(let [delta (tuple-time-delta! tuple)]
(when debug?
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+ (.mark ^Meter (:acked-meter (:executor-data task-data)))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
(stats/bolt-acked-tuple! executor-stats
@@ -823,6 +832,7 @@
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+ (.mark ^Meter (:failed-meter (:executor-data task-data)))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
(stats/bolt-failed-tuple! executor-stats
http://git-wip-us.apache.org/repos/asf/storm/blob/6c93a0db/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 1ae9b22..2e4df75 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -23,10 +23,12 @@
(:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo])
(:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
+ (:import [org.apache.storm.metrics2 StormMetricRegistry])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.generated ShellComponent JavaObject])
(:import [org.apache.storm.spout ShellSpout])
(:import [java.util Collection List ArrayList])
+ (:import [com.codahale.metrics Meter])
(:require [org.apache.storm
[thrift :as thrift]
[stats :as stats]])
@@ -128,9 +130,11 @@
stream->component->grouper (:stream->component->grouper executor-data)
user-context (:user-context task-data)
executor-stats (:stats executor-data)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))
+ ^Meter emitted-meter (StormMetricRegistry/meter "emitted" worker-context component-id)]
(fn ([^Integer out-task-id ^String stream ^List values]
+ (.mark emitted-meter)
(when debug?
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
(let [target-component (.getComponentId worker-context out-task-id)
@@ -147,6 +151,7 @@
(if out-task-id [out-task-id])
))
([^String stream ^List values]
+ (.mark emitted-meter)
(when debug?
(log-message "Emitting: " component-id " " stream " " values))
(let [out-tasks (ArrayList.)]
http://git-wip-us.apache.org/repos/asf/storm/blob/6c93a0db/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 6626272..cca0011 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -43,6 +43,7 @@
(:import [org.apache.logging.log4j Level])
(:import [org.apache.logging.log4j.core.config LoggerConfig])
(:import [org.apache.storm.generated LogConfig LogLevelAction])
+ (:import [org.apache.storm.metrics2 StormMetricRegistry])
(:gen-class))
(defmulti mk-suicide-fn cluster-mode)
@@ -215,6 +216,7 @@
(into {})
))
+
(defn- stream->fields [^StormTopology topology component]
(->> (ThriftTopologyUtils/getComponentCommon topology component)
.get_streams
@@ -689,6 +691,8 @@
(close-resources worker)
+ (StormMetricRegistry/shutdown)
+
(log-message "Trigger any worker shutdown hooks")
(run-worker-shutdown-hooks worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/6c93a0db/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
new file mode 100644
index 0000000..02dfac3
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -0,0 +1,40 @@
+package org.apache.storm.metrics2;
+
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class StormMetricRegistry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class);
+
+ private static final MetricRegistry REGISTRY = new MetricRegistry();
+
+ private static ConsoleReporter REPORTER;
+ static {
+ REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ REPORTER.start(15, TimeUnit.SECONDS);
+ }
+
+
+ public static Meter meter(String name, WorkerTopologyContext context, String componentId){
+ // storm.worker.{topology}.{host}.{port}
+ // TODO: hostname
+ String metricName = String.format("storm.worker.%s.%s.%s-%s", context.getStormId(), componentId, context.getThisWorkerPort(), name);
+
+ return REGISTRY.meter(metricName);
+ }
+
+ public static void shutdown(){
+ REPORTER.stop();
+ }
+}
[4/4] storm git commit: incoroporate and refactor work from abellina
Posted by pt...@apache.org.
incoroporate and refactor work from abellina
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7e1a38b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7e1a38b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7e1a38b
Branch: refs/heads/metrics_v2
Commit: c7e1a38b43531549a4efcfcd0cbd161de7753925
Parents: de399c2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Dec 9 16:22:46 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Dec 9 16:22:46 2016 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 25 ++++
pom.xml | 5 +
storm-core/pom.xml | 4 +
.../src/clj/org/apache/storm/daemon/worker.clj | 4 +-
storm-core/src/jvm/org/apache/storm/Config.java | 3 +
.../apache/storm/metrics2/DisruptorMetrics.java | 17 +++
.../org/apache/storm/metrics2/SimpleGauge.java | 17 +++
.../storm/metrics2/StormMetricRegistry.java | 96 +++++++++----
.../reporters/ConsoleStormReporter.java | 63 +++++++++
.../metrics2/reporters/CsvStormReporter.java | 93 +++++++++++++
.../reporters/GangliaStormReporter.java | 133 +++++++++++++++++++
.../reporters/GraphiteStormReporter.java | 100 ++++++++++++++
.../metrics2/reporters/JmxStormReporter.java | 88 ++++++++++++
.../reporters/SheduledStormReporter.java | 71 ++++++++++
.../storm/metrics2/reporters/StormReporter.java | 32 +++++
15 files changed, 722 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 041a628..9ff665a 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -291,3 +291,28 @@ storm.daemon.metrics.reporter.plugins:
# configuration of cluster metrics consumer
storm.cluster.metrics.consumer.publish.interval.secs: 60
+
+
+storm.metrics.reporters:
+ # Graphite Reporter
+ - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
+ daemons:
+ - "supervisor"
+ - "nimbus"
+ - "worker"
+ report.period: 60
+ report.period.units: "SECONDS"
+ graphite.host: "localhost"
+ graphite.port: 2003
+
+ # Console Reporter
+ - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
+ daemons:
+ - "worker"
+ report.period: 10
+ report.period.units: "SECONDS"
+
+ #TODO: not funtional, but you get the idea
+ filters:
+ "org.apache.storm.metrics2.filters.RegexFilter":
+ expression: ".*my_component.*emitted.*"
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d040099..6463bb5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -845,6 +845,11 @@
<version>${metrics.version}</version>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
<version>${metrics-clojure.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 21b50af..4d2fd19 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -285,6 +285,10 @@
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ </dependency>
+ <dependency>
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 3031513..b2810db 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -599,7 +599,7 @@
(spit (worker-artifacts-pid-path conf storm-id port) pid)))
(declare establish-log-setting-callback)
-
+ (StormMetricRegistry/start conf DaemonType/WORKER)
;; start out with empty list of timeouts
(def latest-log-config (atom {}))
(def original-log-levels (atom {}))
@@ -693,7 +693,7 @@
(close-resources worker)
- (StormMetricRegistry/shutdown)
+ (StormMetricRegistry/stop)
(log-message "Trigger any worker shutdown hooks")
(run-worker-shutdown-hooks worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 0854f35..a5153e4 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -139,6 +139,9 @@ public class Config extends HashMap<String, Object> {
@isString
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
+ @isType(type=List.class)
+ public static final String STORM_METRICS_REPORTERS = "storm.metrics.reporters";
+
/**
* A list of daemon metrics reporter plugin class names.
* These plugins must implement {@link org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface.
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
index 3c11f1a..994a965 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.metrics2;
import org.apache.storm.utils.DisruptorQueue;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
index b88cc7f..5240f26 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.metrics2;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index dd430ac..a718739 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -1,15 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.metrics2;
-
-import com.codahale.metrics.*;
-import com.codahale.metrics.graphite.Graphite;
-import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
import org.apache.storm.task.WorkerTopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
public class StormMetricRegistry {
@@ -17,24 +37,7 @@ public class StormMetricRegistry {
private static final MetricRegistry REGISTRY = new MetricRegistry();
- private static ScheduledReporter REPORTER;
- static {
-// REPORTER = ConsoleReporter.forRegistry(REGISTRY)
-// .convertRatesTo(TimeUnit.SECONDS)
-// .convertDurationsTo(TimeUnit.MILLISECONDS)
-// .build();
-
-
- final Graphite graphite = new Graphite(new InetSocketAddress("graphite", 2003));
- REPORTER = GraphiteReporter.forRegistry(REGISTRY)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .filter(MetricFilter.ALL)
- .build(graphite);
-
- REPORTER.start(15, TimeUnit.SECONDS);
- }
-
+ private static final List<StormReporter> REPORTERS = new ArrayList<>();
public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, Integer port){
SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
@@ -55,7 +58,6 @@ public class StormMetricRegistry {
);
}
-
public static Meter meter(String name, WorkerTopologyContext context, String componentId){
// storm.worker.{topology}.{host}.{port}
// TODO: hostname
@@ -63,7 +65,47 @@ public class StormMetricRegistry {
return REGISTRY.meter(metricName);
}
- public static void shutdown(){
- REPORTER.stop();
+ public static void start(Map<String, Object> stormConfig, DaemonType type){
+ LOG.info("Starting metrics reporters...");
+ List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+ for(Map<String, Object> reporterConfig : reporterList){
+ // only start those requested
+ List<String> daemons = (List<String>)reporterConfig.get("daemons");
+ for(String daemon : daemons){
+ if(DaemonType.valueOf(daemon.toUpperCase()) == type){
+ startReporter(stormConfig, reporterConfig);
+ }
+ }
+ }
+ }
+
+ private static void startReporter(Map<String, Object> stormConfig, Map<String, Object> reporterConfig){
+ String clazz = (String)reporterConfig.get("class");
+ StormReporter reporter = null;
+ LOG.info("Attempting to instantiate reporter class: {}", clazz);
+ try{
+ reporter = instantiate(clazz);
+ } catch(Exception e){
+ LOG.warn("Unable to instantiate metrics reporter class: {}. Will skip this reporter.", clazz, e);
+ }
+ if(reporter != null){
+ reporter.prepare(REGISTRY, stormConfig, reporterConfig);
+ reporter.start();
+ REPORTERS.add(reporter);
+ }
+
+ }
+
+
+ private static StormReporter instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ Class<?> c = Class.forName(klass);
+ return (StormReporter) c.newInstance();
+ }
+
+
+ public static void stop(){
+ for(StormReporter sr : REPORTERS){
+ sr.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
new file mode 100644
index 0000000..5322bf8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class);
+
+ @Override
+ public void prepare(MetricRegistry registry, Map stormConf, Map reporterConf) {
+ LOG.debug("Preparing ConsoleReporter");
+ ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry);
+
+ builder.outputTo(System.out);
+ Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
+ if (locale != null) {
+ builder.formattedFor(locale);
+ }
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
+ if (rateUnit != null) {
+ builder.convertRatesTo(rateUnit);
+ }
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(durationUnit);
+ }
+
+ //defaults to 10
+ reportingPeriod = getReportPeriod(reporterConf);
+
+ //defaults to seconds
+ reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+ reporter = builder.build();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
new file mode 100644
index 0000000..4225b7c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvStormReporter extends SheduledStormReporter<CsvReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class);
+
+ public static final String CSV_LOG_DIR = "csv.log.dir";
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
+ LOG.debug("Preparing...");
+ CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+ Locale locale = MetricsUtils.getMetricsReporterLocale(reporterConf);
+ if (locale != null) {
+ builder.formatFor(locale);
+ }
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+ if (rateUnit != null) {
+ builder.convertRatesTo(rateUnit);
+ }
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(durationUnit);
+ }
+
+ //TODO: expose some simple MetricFilters
+
+ //defaults to 10
+ reportingPeriod = getReportPeriod(reporterConf);
+
+ //defaults to seconds
+ reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+ File csvMetricsDir = getCsvLogDir(stormConf, reporterConf);
+ reporter = builder.build(csvMetricsDir);
+ }
+
+
+ private static File getCsvLogDir(Map stormConf, Map reporterConf) {
+ String csvMetricsLogDirectory = Utils.getString(reporterConf.get(CSV_LOG_DIR), null);
+ if (csvMetricsLogDirectory == null) {
+ csvMetricsLogDirectory = ConfigUtils.absoluteStormLocalDir(stormConf);
+ csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics";
+ }
+ File csvMetricsDir = new File(csvMetricsLogDirectory);
+ validateCreateOutputDir(csvMetricsDir);
+ return csvMetricsDir;
+ }
+
+ private static void validateCreateOutputDir(File dir) {
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ if (!dir.canWrite()) {
+ throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+ }
+ if (!dir.isDirectory()) {
+ throw new IllegalStateException(dir.getName() + " is not a directory.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
new file mode 100644
index 0000000..d8d0269
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class);
+
+ public static final String GANGLIA_HOST = "ganglia.host";
+ public static final String GANGLIA_PORT = "ganglia.port";
+ public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with";
+ public static final String GANGLIA_DMAX = "ganglia.dmax";
+ public static final String GANGLIA_TMAX = "ganglia.tmax";
+ public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode";
+ public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+ public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit";
+ public static final String GANGLIA_TTL = "ganglia.ttl";
+ public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
+ LOG.debug("Preparing...");
+ GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry);
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(durationUnit);
+ }
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+ if (rateUnit != null) {
+ builder.convertRatesTo(rateUnit);
+ }
+
+ //TODO: expose some simple MetricFilters
+ String prefix = getMetricsPrefixedWith(reporterConf);
+ if (prefix != null) {
+ builder.prefixedWith(prefix);
+ }
+
+ Integer dmax = getGangliaDMax(reporterConf);
+ if (prefix != null) {
+ builder.withDMax(dmax);
+ }
+
+ Integer tmax = getGangliaTMax(reporterConf);
+ if (prefix != null) {
+ builder.withTMax(tmax);
+ }
+
+ //defaults to 10
+ reportingPeriod = getReportPeriod(reporterConf);
+
+ //defaults to seconds
+ reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+ // Not exposed:
+ // * withClock(Clock)
+
+ String group = getMetricsTargetUDPGroup(reporterConf);
+ Integer port = getMetricsTargetPort(reporterConf);
+ String udpAddressingMode = getMetricsTargetUDPAddressingMode(reporterConf);
+ Integer ttl = getMetricsTargetTtl(reporterConf);
+
+ GMetric.UDPAddressingMode mode = udpAddressingMode.equalsIgnoreCase("multicast") ?
+ GMetric.UDPAddressingMode.MULTICAST : GMetric.UDPAddressingMode.UNICAST;
+
+ try {
+ GMetric sender = new GMetric(group, port, mode, ttl);
+ reporter = builder.build(sender);
+ }catch (IOException ioe){
+ //TODO
+ LOG.error("Exception in GangliaReporter config", ioe);
+ }
+ }
+
+
+ public static String getMetricsTargetUDPGroup(Map reporterConf) {
+ return Utils.getString(reporterConf.get(GANGLIA_UDP_GROUP), null);
+ }
+
+ public static String getMetricsTargetUDPAddressingMode(Map reporterConf) {
+ return Utils.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), null);
+ }
+
+ public static Integer getMetricsTargetTtl(Map reporterConf) {
+ return Utils.getInt(reporterConf.get(GANGLIA_TTL), null);
+ }
+
+ public static Integer getGangliaDMax(Map reporterConf) {
+ return Utils.getInt(reporterConf.get(GANGLIA_DMAX), null);
+ }
+
+ public static Integer getGangliaTMax(Map reporterConf) {
+ return Utils.getInt(reporterConf.get(GANGLIA_TMAX), null);
+ }
+
+
+ private static Integer getMetricsTargetPort(Map reporterConf) {
+ return Utils.getInt(reporterConf.get(GANGLIA_PORT), null);
+ }
+
+ private static String getMetricsPrefixedWith(Map reporterConf) {
+ return Utils.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), null);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
new file mode 100644
index 0000000..7a2b31b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import com.codahale.metrics.graphite.GraphiteUDP;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);
+
+ public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
+ public static final String GRAPHITE_HOST = "graphite.host";
+ public static final String GRAPHITE_PORT = "graphite.port";
+ public static final String GRAPHITE_TRANSPORT = "graphite.transport";
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
+ LOG.debug("Preparing...");
+ GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry);
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(durationUnit);
+ }
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+ if (rateUnit != null) {
+ builder.convertRatesTo(rateUnit);
+ }
+
+ //TODO: expose some simple MetricFilters
+ String prefix = getMetricsPrefixedWith(reporterConf);
+ if (prefix != null) {
+ builder.prefixedWith(prefix);
+ }
+
+ //defaults to 10
+ reportingPeriod = getReportPeriod(reporterConf);
+
+ //defaults to seconds
+ reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+ // Not exposed:
+ // * withClock(Clock)
+
+ String host = getMetricsTargetHost(reporterConf);
+ Integer port = getMetricsTargetPort(reporterConf);
+ String transport = getMetricsTargetTransport(reporterConf);
+ GraphiteSender sender = null;
+ //TODO: error checking
+ if (transport.equalsIgnoreCase("udp")) {
+ sender = new GraphiteUDP(host, port);
+ } else {
+ //TODO: pickled support
+ sender = new Graphite(host, port);
+ }
+ reporter = builder.build(sender);
+ }
+
+ private static String getMetricsPrefixedWith(Map reporterConf) {
+ return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
+ }
+
+ private static String getMetricsTargetHost(Map reporterConf) {
+ return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
+ }
+
+ private static Integer getMetricsTargetPort(Map reporterConf) {
+ return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
+ }
+
+ private static String getMetricsTargetTransport(Map reporterConf) {
+ return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
new file mode 100644
index 0000000..7ac6cde
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxStormReporter implements StormReporter<JmxReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(JmxStormReporter.class);
+ public static final String JMX_DOMAIN = "jmx.domain";
+ JmxReporter reporter = null;
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormConf, Map<String, Object> reporterConf) {
+ LOG.info("Preparing...");
+ JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(durationUnit);
+ }
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+ if (rateUnit != null) {
+ builder.convertRatesTo(rateUnit);
+ }
+
+ String domain = getMetricsJMXDomain(reporterConf);
+ if (domain != null) {
+ builder.inDomain(domain);
+ }
+
+ // TODO: expose some simple MetricFilters
+ // other builder functions not exposed:
+ // * createsObjectNamesWith(ObjectNameFactory onFactory)
+ // * registerWith (MBeanServer)
+ // * specificDurationUnits (Map<String,TimeUnit> specificDurationUnits)
+ // * specificRateUnits(Map<String,TimeUnit> specificRateUnits)
+
+ reporter = builder.build();
+ }
+
+ public static String getMetricsJMXDomain(Map reporterConf) {
+ return Utils.getString(reporterConf, JMX_DOMAIN);
+ }
+
+ @Override
+ public void start() {
+ if (reporter != null) {
+ LOG.debug("Starting...");
+ reporter.start();
+ } else {
+ throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (reporter != null) {
+ LOG.debug("Stopping...");
+ reporter.stop();
+ } else {
+ throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
new file mode 100644
index 0000000..1b1e7a0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SheduledStormReporter<T extends ScheduledReporter> implements StormReporter{
+ private static final Logger LOG = LoggerFactory.getLogger(SheduledStormReporter.class);
+ protected ScheduledReporter reporter;
+ long reportingPeriod;
+ TimeUnit reportingPeriodUnit;
+
+ @Override
+ public void start() {
+ if (reporter != null) {
+ LOG.debug("Starting...");
+ reporter.start(reportingPeriod, reportingPeriodUnit);
+ } else {
+ throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (reporter != null) {
+ LOG.debug("Stopping...");
+ reporter.stop();
+ } else {
+ throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+ }
+ }
+
+
+ static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
+ TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
+ return unit == null ? TimeUnit.SECONDS : unit;
+ }
+
+ private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
+ String rateUnitString = Utils.getString(reporterConf.get(configName), null);
+ if (rateUnitString != null) {
+ return TimeUnit.valueOf(rateUnitString);
+ }
+ return null;
+ }
+
+ static long getReportPeriod(Map reporterConf) {
+ return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
new file mode 100644
index 0000000..c36e44e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter<T extends Reporter> {
+ String REPORT_PERIOD = "report.period";
+ String REPORT_PERIOD_UNITS = "report.period.units";
+
+ void prepare(MetricRegistry metricsRegistry, Map<String, Object> conf, Map<String, Object> reporterConf);
+ void start();
+ void stop();
+}
\ No newline at end of file
[2/4] storm git commit: add disruptor queue metrics
Posted by pt...@apache.org.
add disruptor queue metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9f632b64
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9f632b64
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9f632b64
Branch: refs/heads/metrics_v2
Commit: 9f632b64a1583b3953753cc47756dc801457e65c
Parents: 6c93a0d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Nov 18 15:18:57 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Nov 18 15:18:57 2016 -0500
----------------------------------------------------------------------
.../storm/starter/ExclamationTopology.java | 2 +-
.../apache/storm/starter/WordCountTopology.java | 2 +-
.../hdfs/avro/ConfluentAvroSerializer.java | 2 +-
pom.xml | 5 ++
storm-core/pom.xml | 4 ++
.../clj/org/apache/storm/daemon/executor.clj | 2 +
.../src/clj/org/apache/storm/daemon/worker.clj | 6 +-
.../src/clj/org/apache/storm/disruptor.clj | 5 +-
.../apache/storm/metrics2/DisruptorMetrics.java | 76 ++++++++++++++++++++
.../org/apache/storm/metrics2/SimpleGauge.java | 21 ++++++
.../storm/metrics2/StormMetricRegistry.java | 43 +++++++++--
.../org/apache/storm/utils/DisruptorQueue.java | 43 +++++++----
.../utils/DisruptorQueueBackpressureTest.java | 2 +-
.../apache/storm/utils/DisruptorQueueTest.java | 4 +-
14 files changed, 187 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 26e0430..9284b52 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -79,7 +79,7 @@ public class ExclamationTopology {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
+ Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index e4a5711..0611894 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -98,7 +98,7 @@ public class WordCountTopology {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
- Thread.sleep(10000);
+ Thread.sleep(60000);
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 2008a3e..087aec5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -27,7 +27,7 @@ import java.io.IOException;
import java.util.Map;
/**
- * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry)
+ * This class provides a mechanism to utilize the Confluent Schema StormMetricRegistry (https://github.com/confluentinc/schema-registry)
* for Storm to (de)serialize Avro generic records across a topology. It assumes the schema registry is up and running
* completely independent of Storm.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2aec4d..d040099 100644
--- a/pom.xml
+++ b/pom.xml
@@ -840,6 +840,11 @@
<version>${metrics.version}</version>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
<version>${metrics-clojure.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 38a0811..21b50af 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -281,6 +281,10 @@
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ </dependency>
+ <dependency>
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index f1af8e7..cdb9c7f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -234,6 +234,8 @@
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ (.getStormId worker-context)
+ (.getThisWorkerPort worker-context)
:producer-type :single-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index cca0011..3031513 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -205,12 +205,13 @@
(transfer-fn serializer tuple-batch)))
transfer-fn)))
-(defn- mk-receive-queue-map [storm-conf executors]
+(defn- mk-receive-queue-map [storm-conf executors storm-id port]
(->> executors
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ storm-id port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(into {})
@@ -255,9 +256,10 @@
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ storm-id port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
- executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+ executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id port)
receive-queue-map (->> executor-receive-queue-map
(mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj
index 1546b3f..73a9d84 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -16,6 +16,7 @@
(ns org.apache.storm.disruptor
(:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
+ (:import [org.apache.storm.task WorkerTopologyContext])
(:import [com.lmax.disruptor.dsl ProducerType])
(:require [clojure [string :as str]])
(:require [clojure [set :as set]])
@@ -27,10 +28,10 @@
:single-threaded ProducerType/SINGLE})
(defnk disruptor-queue
- [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+ [^String queue-name buffer-size timeout ^String storm-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
(DisruptorQueue. queue-name
(PRODUCER-TYPE producer-type) buffer-size
- timeout batch-size batch-timeout))
+ timeout batch-size batch-timeout storm-id worker-port))
(defn clojure-handler
[afn]
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
new file mode 100644
index 0000000..3c11f1a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
@@ -0,0 +1,76 @@
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+ private SimpleGauge<Long> capacity;
+ private SimpleGauge<Long> population;
+ private SimpleGauge<Long> writePosition;
+ private SimpleGauge<Long> readPosition;
+ private SimpleGauge<Double> arrivalRate; // TODO: Change to meter
+ private SimpleGauge<Double> sojournTime;
+ private SimpleGauge<Long> overflow;
+ private SimpleGauge<Float> pctFull;
+
+
+ DisruptorMetrics(SimpleGauge<Long> capacity,
+ SimpleGauge<Long> population,
+ SimpleGauge<Long> writePosition,
+ SimpleGauge<Long> readPosition,
+ SimpleGauge<Double> arrivalRate,
+ SimpleGauge<Double> sojournTime,
+ SimpleGauge<Long> overflow,
+ SimpleGauge<Float> pctFull) {
+ this.capacity = capacity;
+ this.population = population;
+ this.writePosition = writePosition;
+ this.readPosition = readPosition;
+ this.arrivalRate = arrivalRate;
+ this.sojournTime = sojournTime;
+ this.overflow = overflow;
+ this.pctFull = pctFull;
+ }
+
+ public void setCapacity(Long capacity) {
+ this.capacity.set(capacity);
+ }
+
+ public void setPopulation(Long population) {
+ this.population.set(population);
+ }
+
+ public void setWritePosition(Long writePosition) {
+ this.writePosition.set(writePosition);
+ }
+
+ public void setReadPosition(Long readPosition) {
+ this.readPosition.set(readPosition);
+ }
+
+ public void setArrivalRate(Double arrivalRate) {
+ this.arrivalRate.set(arrivalRate);
+ }
+
+ public void setSojournTime(Double soujournTime) {
+ this.sojournTime.set(soujournTime);
+ }
+
+ public void setOverflow(Long overflow) {
+ this.overflow.set(overflow);
+ }
+
+ public void setPercentFull(Float pctFull){
+ this.pctFull.set(pctFull);
+ }
+
+ public void set(DisruptorQueue.QueueMetrics metrics){
+ this.capacity.set(metrics.capacity());
+ this.population.set(metrics.population());
+ this.writePosition.set(metrics.writePos());
+ this.readPosition.set(metrics.readPos());
+ this.arrivalRate.set(metrics.arrivalRate());
+ this.sojournTime.set(metrics.sojournTime());
+ this.overflow.set(metrics.overflow());
+ this.pctFull.set(metrics.pctFull());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
new file mode 100644
index 0000000..b88cc7f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -0,0 +1,21 @@
+package org.apache.storm.metrics2;
+
+
+import com.codahale.metrics.Gauge;
+
+public class SimpleGauge<T> implements Gauge<T> {
+ private T value;
+
+ public SimpleGauge(T value){
+ this.value = value;
+ }
+
+ @Override
+ public T getValue() {
+ return this.value;
+ }
+
+ public void set(T value){
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 02dfac3..dd430ac 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -1,13 +1,14 @@
package org.apache.storm.metrics2;
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.*;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
import org.apache.storm.task.WorkerTopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class StormMetricRegistry {
@@ -16,21 +17,49 @@ public class StormMetricRegistry {
private static final MetricRegistry REGISTRY = new MetricRegistry();
- private static ConsoleReporter REPORTER;
+ private static ScheduledReporter REPORTER;
static {
- REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+// REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+// .convertRatesTo(TimeUnit.SECONDS)
+// .convertDurationsTo(TimeUnit.MILLISECONDS)
+// .build();
+
+
+ final Graphite graphite = new Graphite(new InetSocketAddress("graphite", 2003));
+ REPORTER = GraphiteReporter.forRegistry(REGISTRY)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
+ .filter(MetricFilter.ALL)
+ .build(graphite);
+
REPORTER.start(15, TimeUnit.SECONDS);
}
+ public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, Integer port){
+ SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
+ String metricName = String.format("storm.worker.%s.%s-%s", topologyId, port, name);
+ return REGISTRY.register(metricName, gauge);
+ }
+
+ public static DisruptorMetrics disruptorMetrics(String name, String topologyId, Integer port){
+ return new DisruptorMetrics(
+ StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-population", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, port),
+ StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, port),
+ StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, port),
+ StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, port)
+ );
+ }
+
+
public static Meter meter(String name, WorkerTopologyContext context, String componentId){
// storm.worker.{topology}.{host}.{port}
// TODO: hostname
String metricName = String.format("storm.worker.%s.%s.%s-%s", context.getStormId(), componentId, context.getThisWorkerPort(), name);
-
return REGISTRY.meter(metricName);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fbae1d1..09c0e0b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -33,6 +33,9 @@ import com.lmax.disruptor.dsl.ProducerType;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.DisruptorMetrics;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ public class DisruptorQueue implements IStatefulObject {
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
private static final FlusherPool FLUSHER = new FlusherPool();
+ private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
private static class FlusherPool {
private static final String THREAD_PREFIX = "disruptor-flush";
@@ -326,27 +330,31 @@ public class DisruptorQueue implements IStatefulObject {
return (1.0F * population() / capacity());
}
- public Object getState() {
- Map state = new HashMap<String, Object>();
+ public double arrivalRate(){
+ return _rateTracker.reportRate();
+ }
+ public double sojournTime(){
// get readPos then writePos so it's never an under-estimate
long rp = readPos();
long wp = writePos();
-
- final double arrivalRateInSecs = _rateTracker.reportRate();
+ final double arrivalRateInSecs = arrivalRate();
//Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
// If this assumption does not hold, the calculation of sojourn time should also consider
// departure rate according to Queuing Theory.
- final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+ return (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+ }
+ public Object getState() {
+ Map state = new HashMap<String, Object>();
state.put("capacity", capacity());
- state.put("population", wp - rp);
- state.put("write_pos", wp);
- state.put("read_pos", rp);
- state.put("arrival_rate_secs", arrivalRateInSecs);
- state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
- state.put("overflow", _overflowCount.get());
+ state.put("population", population());
+ state.put("write_pos", writePos());
+ state.put("read_pos", readPos());
+ state.put("arrival_rate_secs", arrivalRate());
+ state.put("sojourn_time_ms", sojournTime()); //element sojourn time in milliseconds
+ state.put("overflow", overflow());
return state;
}
@@ -366,7 +374,8 @@ public class DisruptorQueue implements IStatefulObject {
private final int _inputBatchSize;
private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
private final Flusher _flusher;
- private final QueueMetrics _metrics;
+ private final QueueMetrics _metrics; // old metrics API
+ private final DisruptorMetrics _disruptorMetrics;
private String _queueName = "";
private DisruptorBackpressureCallback _cb = null;
@@ -376,7 +385,7 @@ public class DisruptorQueue implements IStatefulObject {
private final AtomicLong _overflowCount = new AtomicLong(0);
private volatile boolean _throttleOn = false;
- public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
+ public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, int port) {
this._queueName = PREFIX + queueName;
WaitStrategy wait;
if (readTimeout <= 0) {
@@ -390,12 +399,20 @@ public class DisruptorQueue implements IStatefulObject {
_barrier = _buffer.newBarrier();
_buffer.addGatingSequences(_consumer);
_metrics = new QueueMetrics();
+ _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, port);
//The batch size can be no larger than half the full queue size.
//This is mostly to avoid contention issues.
_inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
+
+ METRICS_TIMER.schedule(new TimerTask(){
+ @Override
+ public void run() {
+ _disruptorMetrics.set(_metrics);
+ }
+ }, 15000, 15000); // TODO: Configurable interval
}
public String getName() {
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index 7072e55..110fe88 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase {
}
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index e7ac54e..c834cbb 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
}
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000);
}
private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", 1000);
}
}
[3/4] storm git commit: add simple word count example that uses acking
Posted by pt...@apache.org.
add simple word count example that uses acking
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de399c2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de399c2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de399c2c
Branch: refs/heads/metrics_v2
Commit: de399c2c3bbc9ae3e693b252847ea482e12ae1c3
Parents: 9f632b6
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Dec 7 15:48:41 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Dec 7 15:48:41 2016 -0500
----------------------------------------------------------------------
.../apache/storm/starter/ReliableWordCount.java | 123 +++++++++++++++++++
1 file changed, 123 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/de399c2c/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
new file mode 100644
index 0000000..4cbae07
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java
@@ -0,0 +1,123 @@
+package org.apache.storm.starter;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Created by ptgoetz on 11/11/16.
+ */
+public class ReliableWordCount {
+ public static class RandomSentenceSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
+
+ SpoutOutputCollector _collector;
+ Random _rand;
+
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ _rand = new Random();
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(10);
+ String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
+ sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
+ final String sentence = sentences[_rand.nextInt(sentences.length)];
+
+ _collector.emit(new Values(sentence), UUID.randomUUID());
+ }
+
+ protected String sentence(String input) {
+ return input;
+ }
+
+ @Override
+ public void ack(Object id) {
+ }
+
+ @Override
+ public void fail(Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+ }
+
+
+ public static class SplitSentence extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word: sentence.split("\\s+")) {
+ collector.emit(new Values(word, 1));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentenceSpout(), 4);
+
+ builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(600000);
+
+ cluster.shutdown();
+ }
+}