You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/02/05 21:25:21 UTC

[02/11] storm git commit: Renaming the package and config

Renaming the package and config


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2207d662
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2207d662
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2207d662

Branch: refs/heads/1.x-branch
Commit: 2207d662b0c45fd84edf0f9a9c64d5651b182aab
Parents: 9368619
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 10:25:29 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:32 2016 +0000

----------------------------------------------------------------------
 conf/defaults.yaml                              |  6 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  4 +-
 .../storm/daemon/metrics/StatisticsUtils.java   | 58 ++++++++++++
 .../reporters/ConsolePreparableReporter.java    | 82 +++++++++++++++++
 .../reporters/CsvPreparableReporter.java        | 97 ++++++++++++++++++++
 .../reporters/JmxPreparableReporter.java        | 73 +++++++++++++++
 .../metrics/reporters/PreparableReporter.java   | 32 +++++++
 .../storm/statistics/StatisticsUtils.java       | 58 ------------
 .../reporters/ConsolePreparableReporter.java    | 82 -----------------
 .../reporters/CsvPreparableReporter.java        | 97 --------------------
 .../reporters/JmxPreparableReporter.java        | 73 ---------------
 .../reporters/PreparableReporter.java           | 32 -------
 12 files changed, 347 insertions(+), 347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 5df4a63..d381f0d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -282,6 +282,6 @@ pacemaker.childopts: "-Xmx1024m"
 pacemaker.auth.method: "NONE"
 pacemaker.kerberos.users: []
 
-#default plugin for daemon statistics reporter
-storm.statistics.preparable.reporter.plugins:
-     - "org.apache.storm.statistics.reporters.JmxPreparableReporter"
+#default storm daemon metrics reporter plugins
+storm.daemon.metrics.reporter.plugins:
+     - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index adeb4d6..100a824 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,10 +140,10 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
     /**
-     * A list of daemon statistics  reporter plugin class names.
+     * A list of daemon metrics  reporter plugin class names.
      */
     @isStringList
-    public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS = "storm.statistics.preparable.reporter.plugins";
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
 
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
new file mode 100644
index 0000000..d28e667
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StatisticsUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
+
+    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+        PreparableReporter reporter = new JmxPreparableReporter();
+        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
+        List<PreparableReporter> reporterList = new ArrayList<>();
+
+        if (clazzes != null) {
+            for(String clazz: clazzes ) {
+                reporterList.add(getPreparableReporter(clazz));
+            }
+        }
+        if(reporterList.isEmpty()) {
+            reporterList.add(new JmxPreparableReporter());
+        }
+        return reporterList;
+    }
+
+    private static PreparableReporter getPreparableReporter(String clazz) {
+        PreparableReporter reporter = null;
+        LOG.info("Using statistics reporter plugin:" + clazz);
+        if(clazz != null) {
+            reporter = (PreparableReporter) Utils.newInstance(clazz);
+        }
+        return reporter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
new file mode 100644
index 0000000..1b987a8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
+    ConsoleReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
+        PrintStream stream = (PrintStream)stormConf.get(":stream");
+        if (stream != null) {
+            builder.outputTo(stream);
+        }
+        Locale locale = (Locale)stormConf.get(":locale");
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
new file mode 100644
index 0000000..77d5393
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
+    CsvReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = (Locale) stormConf.get(":locale");
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
+        File logDir = new File(localStormDirLocation + "csvmetrics" );
+        validateCreateOutputDir(logDir);
+        reporter = builder.build(logDir);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+
+
+    private void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a directory.");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
new file mode 100644
index 0000000..988bb47
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+        String domain = Utils.getString(stormConf.get(":domain"), null);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
new file mode 100644
index 0000000..f19f8b1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.io.Closeable;
+import java.util.Map;
+
+
+public interface PreparableReporter<T extends Reporter & Closeable> {
+  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
+  public void start();
+  public void stop();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
deleted file mode 100644
index 12d33c4..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics;
-
-import org.apache.storm.Config;
-import org.apache.storm.statistics.reporters.JmxPreparableReporter;
-import org.apache.storm.statistics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StatisticsUtils {
-    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
-
-    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
-        PreparableReporter reporter = new JmxPreparableReporter();
-        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS);
-        List<PreparableReporter> reporterList = new ArrayList<>();
-
-        if (clazzes != null) {
-            for(String clazz: clazzes ) {
-                reporterList.add(getPreparableReporter(clazz));
-            }
-        }
-        if(reporterList.isEmpty()) {
-            reporterList.add(new JmxPreparableReporter());
-        }
-        return reporterList;
-    }
-
-    private static PreparableReporter getPreparableReporter(String clazz) {
-        PreparableReporter reporter = null;
-        LOG.info("Using statistics reporter plugin:" + clazz);
-        if(clazz != null) {
-            reporter = (PreparableReporter) Utils.newInstance(clazz);
-        }
-        return reporter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
deleted file mode 100644
index 35ae83f..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintStream;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
-    ConsoleReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
-        PrintStream stream = (PrintStream)stormConf.get(":stream");
-        if (stream != null) {
-            builder.outputTo(stream);
-        }
-        Locale locale = (Locale)stormConf.get(":locale");
-        if (locale != null) {
-            builder.formattedFor(locale);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
-        if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        reporter = builder.build();
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null ) {
-            LOG.info("Starting...");
-            reporter.start(10, TimeUnit.SECONDS);
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter !=null) {
-            LOG.info("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
deleted file mode 100644
index 8ed0b3e..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.Config;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
-    CsvReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
-
-        Locale locale = (Locale) stormConf.get(":locale");
-        if (locale != null) {
-            builder.formatFor(locale);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
-        if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
-        File logDir = new File(localStormDirLocation + "csvmetrics" );
-        validateCreateOutputDir(logDir);
-        reporter = builder.build(logDir);
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null) {
-            LOG.info("Starting...");
-            reporter.start(10, TimeUnit.SECONDS);
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter != null) {
-            LOG.info("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-
-
-    private void validateCreateOutputDir(File dir) {
-        if (!dir.exists()) {
-            dir.mkdirs();
-        }
-        if (!dir.canWrite()) {
-            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
-        }
-        if (!dir.isDirectory()) {
-            throw new IllegalStateException(dir.getName() + " is not a directory.");
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
deleted file mode 100644
index 6b0cbda..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
-    JmxReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = Utils.getString(stormConf.get(":domain"), null);
-        if (domain != null) {
-            builder.inDomain(domain);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        reporter = builder.build();
-
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null ) {
-            LOG.info("Starting...");
-            reporter.start();
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter !=null) {
-            LOG.info("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
deleted file mode 100644
index dc29a4a..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-
-import java.io.Closeable;
-import java.util.Map;
-
-
-public interface PreparableReporter<T extends Reporter & Closeable> {
-  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
-  public void start();
-  public void stop();
-
-}