You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:11 UTC

[05/38] storm git commit: implement metric filters

implement metric filters


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8de0f36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8de0f36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8de0f36

Branch: refs/heads/1.x-branch
Commit: b8de0f365a81a8ba3eadec466d777e47dacfcb44
Parents: 6eaa1a8
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 30 16:14:32 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 30 16:14:32 2017 -0400

----------------------------------------------------------------------
 conf/defaults.yaml                              | 46 +++++-----
 .../apache/storm/metrics2/Metrics2Utils.java    | 28 +++++++
 .../storm/metrics2/StormMetricRegistry.java     | 20 ++---
 .../storm/metrics2/filters/RegexFilter.java     | 47 +++++++++++
 .../metrics2/filters/StormMetricsFilter.java    | 32 +++++++
 .../reporters/ConsoleStormReporter.java         |  9 +-
 .../metrics2/reporters/CsvStormReporter.java    |  9 +-
 .../reporters/GangliaStormReporter.java         |  8 +-
 .../reporters/GraphiteStormReporter.java        |  8 +-
 .../metrics2/reporters/JmxStormReporter.java    |  6 +-
 .../reporters/ScheduledStormReporter.java       | 88 ++++++++++++++++++++
 .../reporters/SheduledStormReporter.java        | 71 ----------------
 12 files changed, 259 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b01e0b7..e51b50c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -295,26 +295,26 @@ storm.daemon.metrics.reporter.plugins:
 storm.cluster.metrics.consumer.publish.interval.secs: 60
 
 
-storm.metrics.reporters:
-  # Graphite Reporter
-  - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
-    daemons:
-        - "supervisor"
-        - "nimbus"
-        - "worker"
-    report.period: 60
-    report.period.units: "SECONDS"
-    graphite.host: "localhost"
-    graphite.port: 2003
-
-  # Console Reporter
-  - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
-    daemons:
-        - "worker"
-    report.period: 10
-    report.period.units: "SECONDS"
-
-    #TODO: not funtional, but you get the idea
-    filters:
-        "org.apache.storm.metrics2.filters.RegexFilter":
-            expression: ".*my_component.*emitted.*"
+# Metrics v2 configuration (optional)
+#storm.metrics.reporters:
+#  # Graphite Reporter
+#  - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
+#    daemons:
+#        - "supervisor"
+#        - "nimbus"
+#        - "worker"
+#    report.period: 60
+#    report.period.units: "SECONDS"
+#    graphite.host: "localhost"
+#    graphite.port: 2003
+#
+#  # Console Reporter
+#  - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter"
+#    daemons:
+#        - "worker"
+#    report.period: 10
+#    report.period.units: "SECONDS"
+#
+#    filter:
+#        class: "org.apache.storm.metrics2.filters.RegexFilter"
+#        expression: ".*my_component.*emitted.*"

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java b/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java
new file mode 100644
index 0000000..716b8b7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/Metrics2Utils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2;
+
+public class Metrics2Utils {
+    private Metrics2Utils(){}
+
+    public static Object instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        Class<?> c = Class.forName(klass);
+        return  c.newInstance();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 4c975a3..a3b0db9 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -86,12 +86,14 @@ public class StormMetricRegistry {
 
         LOG.info("Starting metrics reporters...");
         List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
-        for(Map<String, Object> reporterConfig : reporterList){
-            // only start those requested
-            List<String> daemons = (List<String>)reporterConfig.get("daemons");
-            for(String daemon : daemons){
-                if(DaemonType.valueOf(daemon.toUpperCase()) == type){
-                    startReporter(stormConfig, reporterConfig);
+        if(reporterList != null && reporterList.size() > 0) {
+            for (Map<String, Object> reporterConfig : reporterList) {
+                // only start those requested
+                List<String> daemons = (List<String>) reporterConfig.get("daemons");
+                for (String daemon : daemons) {
+                    if (DaemonType.valueOf(daemon.toUpperCase()) == type) {
+                        startReporter(stormConfig, reporterConfig);
+                    }
                 }
             }
         }
@@ -106,7 +108,7 @@ public class StormMetricRegistry {
         StormReporter reporter = null;
         LOG.info("Attempting to instantiate reporter class: {}", clazz);
         try{
-            reporter = instantiate(clazz);
+            reporter = (StormReporter)Metrics2Utils.instantiate(clazz);
         } catch(Exception e){
             LOG.warn("Unable to instantiate metrics reporter class: {}. Will skip this reporter.", clazz, e);
         }
@@ -118,10 +120,6 @@ public class StormMetricRegistry {
 
     }
 
-    private static StormReporter instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-        Class<?> c = Class.forName(klass);
-        return  (StormReporter) c.newInstance();
-    }
 
     public static void stop(){
         for(StormReporter sr : REPORTERS){

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java b/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java
new file mode 100644
index 0000000..e6997c6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.filters;
+
+import com.codahale.metrics.Metric;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexFilter implements StormMetricsFilter {
+
+    private Pattern pattern;
+
+
+    @Override
+    public void prepare(Map<String, Object> config) {
+        String expression = (String) config.get("expression");
+        if(expression != null){
+            this.pattern = Pattern.compile(expression);
+        } else {
+            throw new IllegalStateException("RegexFilter requires an 'expression' parameter.");
+        }
+    }
+
+    @Override
+    public boolean matches(String name, Metric metric) {
+        Matcher matcher = this.pattern.matcher(name);
+        return matcher.matches();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java b/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
new file mode 100644
index 0000000..57f7255
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.filters;
+
+import com.codahale.metrics.MetricFilter;
+
+import java.util.Map;
+
+public interface StormMetricsFilter extends MetricFilter {
+
+    /**
+     * Called after the filter is instantiated.
+     * @param config an arbitrary configuration map pulled from the yaml configuration.
+     */
+    void prepare(Map<String, Object> config);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
index 5322bf8..abb5226 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -20,6 +20,7 @@ package org.apache.storm.metrics2.reporters;
 import com.codahale.metrics.ConsoleReporter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,7 +28,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter> {
+public class ConsoleStormReporter extends ScheduledStormReporter<ConsoleReporter> {
     private final static Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class);
 
     @Override
@@ -51,6 +52,12 @@ public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter>
             builder.convertDurationsTo(durationUnit);
         }
 
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if(filter != null){
+            builder.filter(filter);
+        }
+
+
         //defaults to 10
         reportingPeriod = getReportPeriod(reporterConf);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
index 4225b7c..24c6eed 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
@@ -20,6 +20,7 @@ package org.apache.storm.metrics2.reporters;
 import com.codahale.metrics.CsvReporter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -30,7 +31,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class CsvStormReporter extends SheduledStormReporter<CsvReporter> {
+public class CsvStormReporter extends ScheduledStormReporter<CsvReporter> {
     private final static Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class);
 
     public static final String CSV_LOG_DIR = "csv.log.dir";
@@ -55,7 +56,11 @@ public class CsvStormReporter extends SheduledStormReporter<CsvReporter> {
             builder.convertDurationsTo(durationUnit);
         }
 
-        //TODO: expose some simple MetricFilters 
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if(filter != null){
+            builder.filter(filter);
+        }
+
 
         //defaults to 10
         reportingPeriod = getReportPeriod(reporterConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
index d8d0269..e7dc5f4 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
@@ -21,6 +21,7 @@ import com.codahale.metrics.ganglia.GangliaReporter;
 import com.codahale.metrics.MetricRegistry;
 import info.ganglia.gmetric4j.gmetric.GMetric;
 import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,7 +30,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter> {
+public class GangliaStormReporter extends ScheduledStormReporter<GangliaReporter> {
     private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class);
 
     public static final String GANGLIA_HOST = "ganglia.host";
@@ -58,7 +59,10 @@ public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter>
             builder.convertRatesTo(rateUnit);
         }
 
-        //TODO: expose some simple MetricFilters 
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if(filter != null){
+            builder.filter(filter);
+        }
         String prefix = getMetricsPrefixedWith(reporterConf);
         if (prefix != null) {
             builder.prefixedWith(prefix);

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
index 7a2b31b..0f88fc4 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.graphite.GraphiteUDP;
 import com.codahale.metrics.graphite.Graphite;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporter> {
+public class GraphiteStormReporter extends ScheduledStormReporter<GraphiteReporter> {
     private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);
 
     public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
@@ -53,7 +54,10 @@ public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporte
             builder.convertRatesTo(rateUnit);
         }
 
-        //TODO: expose some simple MetricFilters 
+        StormMetricsFilter filter = getMetricsFilter(reporterConf);
+        if(filter != null){
+            builder.filter(filter);
+        }
         String prefix = getMetricsPrefixedWith(reporterConf);
         if (prefix != null) {
             builder.prefixedWith(prefix);

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
index 7ac6cde..5b932ea 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
@@ -20,6 +20,7 @@ package org.apache.storm.metrics2.reporters;
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,10 @@ public class JmxStormReporter implements StormReporter<JmxReporter> {
             builder.inDomain(domain);
         }
 
-        // TODO: expose some simple MetricFilters
+        StormMetricsFilter filter = ScheduledStormReporter.getMetricsFilter(reporterConf);
+        if(filter != null){
+            builder.filter(filter);
+        }
         // other builder functions not exposed:
         //  * createsObjectNamesWith(ObjectNameFactory onFactory) 
         //  * registerWith (MBeanServer)

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
new file mode 100644
index 0000000..940cb19
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics2.reporters;
+
+import com.codahale.metrics.ScheduledReporter;
+import org.apache.storm.metrics2.Metrics2Utils;
+import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ScheduledStormReporter<T extends ScheduledReporter> implements StormReporter{
+    private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class);
+    protected ScheduledReporter reporter;
+    long reportingPeriod;
+    TimeUnit reportingPeriodUnit;
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.debug("Starting...");
+            reporter.start(reportingPeriod, reportingPeriodUnit);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.debug("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+
+
+    static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
+        TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
+        return unit == null ? TimeUnit.SECONDS : unit;
+    }
+
+    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
+        String rateUnitString = Utils.getString(reporterConf.get(configName), null);
+        if (rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+
+    static long getReportPeriod(Map reporterConf) {
+        return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
+    }
+
+    static StormMetricsFilter getMetricsFilter(Map reporterConf){
+        StormMetricsFilter filter = null;
+        Map<String, Object> filterConf = (Map)reporterConf.get("filter");
+        String clazz = (String) filterConf.get("class");
+        if(filterConf != null && clazz != null){
+            try {
+                filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz);
+                filter.prepare(filterConf);
+            } catch (Exception e) {
+                LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz);
+            }
+        }
+        return filter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b8de0f36/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
deleted file mode 100644
index 1b1e7a0..0000000
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.metrics2.reporters;
-
-import com.codahale.metrics.ScheduledReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public abstract class SheduledStormReporter<T extends ScheduledReporter> implements StormReporter{
-    private static final Logger LOG = LoggerFactory.getLogger(SheduledStormReporter.class);
-    protected ScheduledReporter reporter;
-    long reportingPeriod;
-    TimeUnit reportingPeriodUnit;
-
-    @Override
-    public void start() {
-        if (reporter != null) {
-            LOG.debug("Starting...");
-            reporter.start(reportingPeriod, reportingPeriodUnit);
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter != null) {
-            LOG.debug("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-
-
-    static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
-        TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
-        return unit == null ? TimeUnit.SECONDS : unit;
-    }
-
-    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
-        String rateUnitString = Utils.getString(reporterConf.get(configName), null);
-        if (rateUnitString != null) {
-            return TimeUnit.valueOf(rateUnitString);
-        }
-        return null;
-    }
-
-    static long getReportPeriod(Map reporterConf) {
-        return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
-    }
-}