You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/12/09 21:24:10 UTC

[4/4] storm git commit: incoroporate and refactor work from abellina

incoroporate and refactor work from abellina


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7e1a38b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7e1a38b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7e1a38b

Branch: refs/heads/metrics_v2
Commit: c7e1a38b43531549a4efcfcd0cbd161de7753925
Parents: de399c2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Dec 9 16:22:46 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Dec 9 16:22:46 2016 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  25 ++++
 pom.xml                                         |   5 +
 storm-core/pom.xml                              |   4 +
 .../src/clj/org/apache/storm/daemon/worker.clj  |   4 +-
 storm-core/src/jvm/org/apache/storm/Config.java |   3 +
 .../apache/storm/metrics2/DisruptorMetrics.java |  17 +++
 .../org/apache/storm/metrics2/SimpleGauge.java  |  17 +++
 .../storm/metrics2/StormMetricRegistry.java     |  96 +++++++++----
 .../reporters/ConsoleStormReporter.java         |  63 +++++++++
 .../metrics2/reporters/CsvStormReporter.java    |  93 +++++++++++++
 .../reporters/GangliaStormReporter.java         | 133 +++++++++++++++++++
 .../reporters/GraphiteStormReporter.java        | 100 ++++++++++++++
 .../metrics2/reporters/JmxStormReporter.java    |  88 ++++++++++++
 .../reporters/SheduledStormReporter.java        |  71 ++++++++++
 .../storm/metrics2/reporters/StormReporter.java |  32 +++++
 15 files changed, 722 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 041a628..9ff665a 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -291,3 +291,28 @@ storm.daemon.metrics.reporter.plugins:
 
 # configuration of cluster metrics consumer
 storm.cluster.metrics.consumer.publish.interval.secs: 60
+
+
+storm.metrics.reporters:
+  # Graphite Reporter
+  - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
+    daemons:
+        - "supervisor"
+        - "nimbus"
+        - "worker"
+    report.period: 60
+    report.period.units: "SECONDS"
+    graphite.host: "localhost"
+    graphite.port: 2003
+
+  # Console Reporter
+  - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
+    daemons:
+        - "worker"
+    report.period: 10
+    report.period.units: "SECONDS"
+
+    #TODO: not funtional, but you get the idea
+    filters:
+        "org.apache.storm.metrics2.filters.RegexFilter":
+            expression: ".*my_component.*emitted.*"

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d040099..6463bb5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -845,6 +845,11 @@
                 <version>${metrics.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-ganglia</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>metrics-clojure</groupId>
                 <artifactId>metrics-clojure</artifactId>
                 <version>${metrics-clojure.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 21b50af..4d2fd19 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -285,6 +285,10 @@
             <artifactId>metrics-graphite</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-ganglia</artifactId>
+        </dependency>
+        <dependency>
             <groupId>metrics-clojure</groupId>
             <artifactId>metrics-clojure</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 3031513..b2810db 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -599,7 +599,7 @@
       (spit (worker-artifacts-pid-path conf storm-id port) pid)))
 
   (declare establish-log-setting-callback)
-
+  (StormMetricRegistry/start conf DaemonType/WORKER)
   ;; start out with empty list of timeouts 
   (def latest-log-config (atom {}))
   (def original-log-levels (atom {}))
@@ -693,7 +693,7 @@
 
                     (close-resources worker)
 
-                    (StormMetricRegistry/shutdown)
+                    (StormMetricRegistry/stop)
 
                     (log-message "Trigger any worker shutdown hooks")
                     (run-worker-shutdown-hooks worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 0854f35..a5153e4 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -139,6 +139,9 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
+    @isType(type=List.class)
+    public static final String STORM_METRICS_REPORTERS = "storm.metrics.reporters";
+
     /**
      * A list of daemon metrics  reporter plugin class names.
      * These plugins must implement {@link org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface.

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
index 3c11f1a..994a965 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.metrics2;
 
 import org.apache.storm.utils.DisruptorQueue;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
index b88cc7f..5240f26 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.metrics2;
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index dd430ac..a718739 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -1,15 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.metrics2;
 
-
-import com.codahale.metrics.*;
-import com.codahale.metrics.graphite.Graphite;
-import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.metrics2.reporters.StormReporter;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 
 public class StormMetricRegistry {
 
@@ -17,24 +37,7 @@ public class StormMetricRegistry {
 
     private static final MetricRegistry REGISTRY = new MetricRegistry();
 
-    private static ScheduledReporter REPORTER;
-    static {
-//        REPORTER = ConsoleReporter.forRegistry(REGISTRY)
-//                .convertRatesTo(TimeUnit.SECONDS)
-//                .convertDurationsTo(TimeUnit.MILLISECONDS)
-//                .build();
-
-
-        final Graphite graphite = new Graphite(new InetSocketAddress("graphite", 2003));
-        REPORTER = GraphiteReporter.forRegistry(REGISTRY)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .filter(MetricFilter.ALL)
-                .build(graphite);
-
-        REPORTER.start(15, TimeUnit.SECONDS);
-    }
-
+    private static final List<StormReporter> REPORTERS = new ArrayList<>();
 
     public static <T> SimpleGauge<T>  gauge(T initialValue, String name, String topologyId, Integer port){
         SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
@@ -55,7 +58,6 @@ public class StormMetricRegistry {
         );
     }
 
-
     public static Meter meter(String name, WorkerTopologyContext context, String componentId){
         // storm.worker.{topology}.{host}.{port}
         // TODO: hostname
@@ -63,7 +65,47 @@ public class StormMetricRegistry {
         return REGISTRY.meter(metricName);
     }
 
-    public static void shutdown(){
-        REPORTER.stop();
+    public static void start(Map<String, Object> stormConfig, DaemonType type){
+        LOG.info("Starting metrics reporters...");
+        List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
+        for(Map<String, Object> reporterConfig : reporterList){
+            // only start those requested
+            List<String> daemons = (List<String>)reporterConfig.get("daemons");
+            for(String daemon : daemons){
+                if(DaemonType.valueOf(daemon.toUpperCase()) == type){
+                    startReporter(stormConfig, reporterConfig);
+                }
+            }
+        }
+    }
+
+    private static void startReporter(Map<String, Object> stormConfig, Map<String, Object> reporterConfig){
+        String clazz = (String)reporterConfig.get("class");
+        StormReporter reporter = null;
+        LOG.info("Attempting to instantiate reporter class: {}", clazz);
+        try{
+            reporter = instantiate(clazz);
+        } catch(Exception e){
+            LOG.warn("Unable to instantiate metrics reporter class: {}. Will skip this reporter.", clazz, e);
+        }
+        if(reporter != null){
+            reporter.prepare(REGISTRY, stormConfig, reporterConfig);
+            reporter.start();
+            REPORTERS.add(reporter);
+        }
+
+    }
+
+
+    private static StormReporter instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        Class<?> c = Class.forName(klass);
+        return  (StormReporter) c.newInstance();
+    }
+
+
+    public static void stop(){
+        for(StormReporter sr : REPORTERS){
+            sr.stop();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
new file mode 100644
index 0000000..5322bf8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class);
+
+    @Override
+    public void prepare(MetricRegistry registry, Map stormConf, Map reporterConf) {
+        LOG.debug("Preparing ConsoleReporter");
+        ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry);
+
+        builder.outputTo(System.out);
+        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        reporter = builder.build();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
new file mode 100644
index 0000000..4225b7c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvStormReporter extends SheduledStormReporter<CsvReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class);
+
+    public static final String CSV_LOG_DIR = "csv.log.dir";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
+        LOG.debug("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = MetricsUtils.getMetricsReporterLocale(reporterConf);
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        //TODO: expose some simple MetricFilters 
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        File csvMetricsDir = getCsvLogDir(stormConf, reporterConf);
+        reporter = builder.build(csvMetricsDir);
+    }
+
+
+    private static File getCsvLogDir(Map stormConf, Map reporterConf) {
+        String csvMetricsLogDirectory = Utils.getString(reporterConf.get(CSV_LOG_DIR), null);
+        if (csvMetricsLogDirectory == null) {
+            csvMetricsLogDirectory = ConfigUtils.absoluteStormLocalDir(stormConf);
+            csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics";
+        }
+        File csvMetricsDir = new File(csvMetricsLogDirectory);
+        validateCreateOutputDir(csvMetricsDir);
+        return csvMetricsDir;
+    }
+
+    private static void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a directory.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
new file mode 100644
index 0000000..d8d0269
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ganglia.GangliaReporter;
+import com.codahale.metrics.MetricRegistry;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class);
+
+    public static final String GANGLIA_HOST = "ganglia.host";
+    public static final String GANGLIA_PORT = "ganglia.port";
+    public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with";
+    public static final String GANGLIA_DMAX = "ganglia.dmax";
+    public static final String GANGLIA_TMAX = "ganglia.tmax";
+    public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode";
+    public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit";
+    public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit";
+    public static final String GANGLIA_TTL = "ganglia.ttl";
+    public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
+        LOG.debug("Preparing...");
+        GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        //TODO: expose some simple MetricFilters 
+        String prefix = getMetricsPrefixedWith(reporterConf);
+        if (prefix != null) {
+            builder.prefixedWith(prefix);
+        }
+
+        Integer dmax = getGangliaDMax(reporterConf);
+        if (prefix != null) {
+            builder.withDMax(dmax);
+        }
+
+        Integer tmax = getGangliaTMax(reporterConf);
+        if (prefix != null) {
+            builder.withTMax(tmax);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        // Not exposed:
+        // * withClock(Clock)
+
+        String group = getMetricsTargetUDPGroup(reporterConf);
+        Integer port = getMetricsTargetPort(reporterConf);
+        String udpAddressingMode = getMetricsTargetUDPAddressingMode(reporterConf);
+        Integer ttl = getMetricsTargetTtl(reporterConf);
+
+        GMetric.UDPAddressingMode mode = udpAddressingMode.equalsIgnoreCase("multicast") ?
+                GMetric.UDPAddressingMode.MULTICAST : GMetric.UDPAddressingMode.UNICAST;
+
+        try {
+            GMetric sender = new GMetric(group, port, mode, ttl);
+            reporter = builder.build(sender);
+        }catch (IOException ioe){
+            //TODO
+            LOG.error("Exception in GangliaReporter config", ioe);
+        }
+    }
+
+
+    public static String getMetricsTargetUDPGroup(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GANGLIA_UDP_GROUP), null);
+    }
+
+    public static String getMetricsTargetUDPAddressingMode(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), null);
+    }
+
+    public static Integer getMetricsTargetTtl(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_TTL), null);
+    }
+
+    public static Integer getGangliaDMax(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_DMAX), null);
+    }
+
+    public static Integer getGangliaTMax(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_TMAX), null);
+    }
+
+
+    private static Integer getMetricsTargetPort(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GANGLIA_PORT), null);
+    }
+
+    private static String getMetricsPrefixedWith(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
new file mode 100644
index 0000000..7a2b31b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.graphite.GraphiteReporter;
+import com.codahale.metrics.graphite.GraphiteSender;
+import com.codahale.metrics.graphite.GraphiteUDP;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);
+
+    public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
+    public static final String GRAPHITE_HOST = "graphite.host";
+    public static final String GRAPHITE_PORT = "graphite.port";
+    public static final String GRAPHITE_TRANSPORT = "graphite.transport";
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
+        LOG.debug("Preparing...");
+        GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        //TODO: expose some simple MetricFilters 
+        String prefix = getMetricsPrefixedWith(reporterConf);
+        if (prefix != null) {
+            builder.prefixedWith(prefix);
+        }
+
+        //defaults to 10
+        reportingPeriod = getReportPeriod(reporterConf);
+
+        //defaults to seconds
+        reportingPeriodUnit = getReportPeriodUnit(reporterConf);
+
+        // Not exposed:
+        // * withClock(Clock)
+
+        String host = getMetricsTargetHost(reporterConf);
+        Integer port = getMetricsTargetPort(reporterConf);
+        String transport = getMetricsTargetTransport(reporterConf);
+        GraphiteSender sender = null;
+        //TODO: error checking
+        if (transport.equalsIgnoreCase("udp")) {
+            sender = new GraphiteUDP(host, port);
+        } else {
+            //TODO: pickled support
+            sender = new Graphite(host, port);
+        }
+        reporter = builder.build(sender);
+    }
+
+    private static String getMetricsPrefixedWith(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
+    }
+
+    private static String getMetricsTargetHost(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
+    }
+
+    private static Integer getMetricsTargetPort(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
+    }
+
+    private static String getMetricsTargetTransport(Map reporterConf) {
+        return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
new file mode 100644
index 0000000..7ac6cde
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxStormReporter implements StormReporter<JmxReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(JmxStormReporter.class);
+    public static final String JMX_DOMAIN = "jmx.domain";
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormConf, Map<String, Object> reporterConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(durationUnit);
+        }
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
+        if (rateUnit != null) {
+            builder.convertRatesTo(rateUnit);
+        }
+
+        String domain = getMetricsJMXDomain(reporterConf);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+
+        // TODO: expose some simple MetricFilters
+        // other builder functions not exposed:
+        //  * createsObjectNamesWith(ObjectNameFactory onFactory) 
+        //  * registerWith (MBeanServer)
+        //  * specificDurationUnits (Map<String,TimeUnit> specificDurationUnits)
+        //  * specificRateUnits(Map<String,TimeUnit> specificRateUnits)
+
+        reporter = builder.build();
+    }
+
+    public static String getMetricsJMXDomain(Map reporterConf) {
+        return Utils.getString(reporterConf, JMX_DOMAIN);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
new file mode 100644
index 0000000..1b1e7a0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SheduledStormReporter<T extends ScheduledReporter> implements StormReporter{
+    private static final Logger LOG = LoggerFactory.getLogger(SheduledStormReporter.class);
+    protected ScheduledReporter reporter;
+    long reportingPeriod;
+    TimeUnit reportingPeriodUnit;
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start(reportingPeriod, reportingPeriodUnit);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+
+
+    static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
+        TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
+        return unit == null ? TimeUnit.SECONDS : unit;
+    }
+
+    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
+        String rateUnitString = Utils.getString(reporterConf.get(configName), null);
+        if (rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+
+    static long getReportPeriod(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e1a38b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
new file mode 100644
index 0000000..c36e44e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.util.Map;
+
+public interface StormReporter<T extends Reporter> {
+    String REPORT_PERIOD = "report.period";
+    String REPORT_PERIOD_UNITS = "report.period.units";
+
+    void prepare(MetricRegistry metricsRegistry, Map<String, Object> conf, Map<String, Object> reporterConf);
+    void start();
+    void stop();
+}
\ No newline at end of file