You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/02/05 21:25:20 UTC

[01/11] storm git commit: Create stats plugin for JMX

Repository: storm
Updated Branches:
  refs/heads/1.x-branch ef7581b19 -> 1582c220a


Create stats plugin for JMX

Add config for stats reporter plugin and use it

Use Regular Map instead of Config in interface

Adding log entries for statiscs plugin actions.

Conflicts:
	storm-core/src/clj/org/apache/storm/daemon/common.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68682331
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68682331
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68682331

Branch: refs/heads/1.x-branch
Commit: 68682331a219c6d2ad706729389d6eba2585c32c
Parents: ef7581b
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Wed Feb 3 10:49:56 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:08 2016 +0000

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  | 12 +++--
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  2 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  2 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  2 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |  2 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  2 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  7 +++
 .../storm/statistics/StatisticsUtils.java       | 26 +++++++++++
 .../reporters/JMXPreparableReporter.java        | 49 ++++++++++++++++++++
 .../reporters/PreparableReporter.java           | 15 ++++++
 10 files changed, 111 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index dd761a5..44a1d43 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -19,6 +19,9 @@
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils])
   (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.statistics.reporters PreparableReporter]
+           [com.codahale.metrics MetricRegistry]
+           [org.apache.storm.statistics StatisticsUtils])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.metric SystemBolt])
@@ -28,10 +31,13 @@
   (:require [clojure.set :as set])  
   (:require [org.apache.storm.daemon.acker :as acker])
   (:require [org.apache.storm.thrift :as thrift])
-  (:require [metrics.reporters.jmx :as jmx]))
+  (:require [metrics.core  :refer [default-registry]]))
 
-(defn start-metrics-reporters []
-  (jmx/start (jmx/reporter {})))
+(defn start-metrics-reporters [conf]
+  (doto (StatisticsUtils/getPreparableReporter conf)
+    (.prepare default-registry conf)
+    (.start))
+  (log-message "Started statistics report plugin..."))
 
 (def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
 (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index d6f77c3..c2fadc6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -265,7 +265,7 @@
                                         https-need-client-auth
                                         https-want-client-auth)
                             (config-filter server app filters-confs))})))
-      (start-metrics-reporters)
+      (start-metrics-reporters conf)
       (when handler-server
         (.serve handler-server)))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 66a1899..1fcb5d5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -1198,4 +1198,4 @@
                  STORM-VERSION
                  "'")
     (start-logviewer! conf log-root daemonlog-root)
-    (start-metrics-reporters)))
+    (start-metrics-reporters conf)))

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 587da65..9376d6e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1450,7 +1450,7 @@
     (defgauge nimbus:num-supervisors
       (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
 
-    (start-metrics-reporters)
+    (start-metrics-reporters conf)
 
     (reify Nimbus$Iface
       (^void submitTopologyWithOpts

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 4bca23e..d27a609 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -1196,7 +1196,7 @@
     (let [supervisor (mk-supervisor conf nil supervisor)]
       (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))
     (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
-    (start-metrics-reporters)))
+    (start-metrics-reporters conf)))
 
 (defn standalone-supervisor []
   (let [conf-atom (atom nil)

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index cbea15a..bd1dba5 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -1260,7 +1260,7 @@
           https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
           https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
           https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
-      (start-metrics-reporters)
+      (start-metrics-reporters conf)
       (storm-run-jetty {:port (conf UI-PORT)
                         :host (conf UI-HOST)
                         :https-port https-port

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index f7f5169..bf50223 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,6 +140,13 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
     /**
+     * A list of statistics  preparable reporter class names.
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = org.apache.storm.statistics.reporters.PreparableReporter.class)
+    public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN = "storm.statistics.preparable.reporter.plugin";
+
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     @isStringList

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
new file mode 100644
index 0000000..19f7690
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
@@ -0,0 +1,26 @@
+package org.apache.storm.statistics;
+
+import org.apache.storm.Config;
+import org.apache.storm.statistics.reporters.JMXPreparableReporter;
+import org.apache.storm.statistics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class StatisticsUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
+
+    public static PreparableReporter getPreparableReporter(Map stormConf) {
+        PreparableReporter reporter = new JMXPreparableReporter();
+        String clazz = (String) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+        LOG.info("Using statistics reporter plugin:" + clazz);
+        if(clazz != null) {
+            reporter = (PreparableReporter) Utils.newInstance(clazz);
+        } else {
+            reporter = new JMXPreparableReporter();
+        }
+        return reporter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
new file mode 100644
index 0000000..5d94ffc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
@@ -0,0 +1,49 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JMXPreparableReporter implements PreparableReporter<JmxReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(JMXPreparableReporter.class);
+
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+        String domain = Utils.getString(stormConf.get(":domain"), null);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting...");
+        reporter.start();
+    }
+
+    @Override
+    public void stop() {
+        LOG.info("Stopping...");
+        reporter.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/68682331/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
new file mode 100644
index 0000000..f6e8b2b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
@@ -0,0 +1,15 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.io.Closeable;
+import java.util.Map;
+
+
+public interface PreparableReporter<T extends Reporter & Closeable> {
+  public abstract void prepare(MetricRegistry metricsRegistry, Map stormConf);
+  public abstract void start();
+  public abstract void stop();
+
+}


[07/11] storm git commit: Removing unnecessary null check on STDOUT stream

Posted by ki...@apache.org.
Removing unnecessary null check on STDOUT stream


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/137efb10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/137efb10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/137efb10

Branch: refs/heads/1.x-branch
Commit: 137efb108c2084d1cf371c899d6cd454b9166882
Parents: 257f1d3
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 15:29:17 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:30:06 2016 +0000

----------------------------------------------------------------------
 .../daemon/metrics/reporters/ConsolePreparableReporter.java   | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/137efb10/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index 3ef4237..1eacb63 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -23,7 +23,6 @@ import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.PrintStream;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -37,11 +36,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
         LOG.debug("Preparing...");
         ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
 
-        PrintStream stream = System.out;
-        if (stream != null) {
-            builder.outputTo(stream);
-        }
-
+        builder.outputTo(System.out);
         Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
         if (locale != null) {
             builder.formattedFor(locale);


[08/11] storm git commit: Addressing comments about reporter configs

Posted by ki...@apache.org.
Addressing comments about reporter configs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6811c9b8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6811c9b8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6811c9b8

Branch: refs/heads/1.x-branch
Commit: 6811c9b82873ca48efbc337e5493c7aba85d5731
Parents: d519815
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 14:19:02 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:30:06 2016 +0000

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/Config.java | 10 ++++-
 .../storm/daemon/metrics/MetricsUtils.java      | 39 ++++++++++++++++----
 .../reporters/ConsolePreparableReporter.java    |  8 ++--
 .../reporters/CsvPreparableReporter.java        | 23 ++----------
 .../reporters/JmxPreparableReporter.java        |  8 ++--
 5 files changed, 52 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6811c9b8/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 49306eb..a456bb2 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -150,7 +150,7 @@ public class Config extends HashMap<String, Object> {
      * Use the specified IETF BCP 47 language tag string for a Locale.
      */
     @isString
-    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.local";
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.locale";
 
     /**
      * A specify domain for daemon metrics reporter plugin to limit reporting to specific domain.
@@ -169,6 +169,14 @@ public class Config extends HashMap<String, Object> {
      */
     @isString
     public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit";
+
+
+    /**
+     * A specify csv reporter directory for CvsPreparableReporter daemon metrics reporter.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR = "storm.daemon.metrics.reporter.csv.log.dir";
+
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/6811c9b8/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
index 4425f59..aa5ce28 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,12 @@ package org.apache.storm.daemon.metrics;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -38,11 +40,11 @@ public class MetricsUtils {
         List<PreparableReporter> reporterList = new ArrayList<>();
 
         if (clazzes != null) {
-            for(String clazz: clazzes ) {
+            for (String clazz : clazzes) {
                 reporterList.add(getPreparableReporter(clazz));
             }
         }
-        if(reporterList.isEmpty()) {
+        if (reporterList.isEmpty()) {
             reporterList.add(new JmxPreparableReporter());
         }
         return reporterList;
@@ -51,7 +53,7 @@ public class MetricsUtils {
     private static PreparableReporter getPreparableReporter(String clazz) {
         PreparableReporter reporter = null;
         LOG.info("Using statistics reporter plugin:" + clazz);
-        if(clazz != null) {
+        if (clazz != null) {
             reporter = (PreparableReporter) Utils.newInstance(clazz);
         }
         return reporter;
@@ -59,7 +61,7 @@ public class MetricsUtils {
 
     public static Locale getMetricsReporterLocale(Map stormConf) {
         String languageTag = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null);
-        if(languageTag != null) {
+        if (languageTag != null) {
             return Locale.forLanguageTag(languageTag);
         }
         return null;
@@ -75,9 +77,32 @@ public class MetricsUtils {
 
     private static TimeUnit getTimeUnitForCofig(Map stormConf, String configName) {
         String rateUnitString = Utils.getString(stormConf.get(configName), null);
-        if(rateUnitString != null) {
+        if (rateUnitString != null) {
             return TimeUnit.valueOf(rateUnitString);
         }
         return null;
     }
+
+    public static File getCsvLogDir(Map stormConf) {
+        String csvMetricsLogDirectory = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR), null);
+        if (csvMetricsLogDirectory == null) {
+            csvMetricsLogDirectory = ConfigUtils.absoluteHealthCheckDir(stormConf);
+            csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics";
+        }
+        File csvMetricsDir = new File(csvMetricsLogDirectory);
+        validateCreateOutputDir(csvMetricsDir);
+        return csvMetricsDir;
+    }
+
+    private static void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a directory.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6811c9b8/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index 2f466ce..3ef4237 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -61,7 +61,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
 
     @Override
     public void start() {
-        if (reporter != null ) {
+        if (reporter != null) {
             LOG.debug("Starting...");
             reporter.start(10, TimeUnit.SECONDS);
         } else {
@@ -71,7 +71,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
 
     @Override
     public void stop() {
-        if (reporter !=null) {
+        if (reporter != null) {
             LOG.debug("Stopping...");
             reporter.stop();
         } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/6811c9b8/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
index 28fd605..605f389 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,9 +19,7 @@ package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.CsvReporter;
 import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.Config;
 import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,10 +52,7 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
             builder.convertDurationsTo(durationUnit);
         }
 
-        String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
-        File csvMetricsDir = new File(localStormDirLocation + System.getProperty("file.separator") + "csvmetrics" );
-        validateCreateOutputDir(csvMetricsDir);
-
+        File csvMetricsDir = MetricsUtils.getCsvLogDir(stormConf);
         reporter = builder.build(csvMetricsDir);
     }
 
@@ -81,17 +76,5 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
         }
     }
 
-
-    private void validateCreateOutputDir(File dir) {
-        if (!dir.exists()) {
-            dir.mkdirs();
-        }
-        if (!dir.canWrite()) {
-            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
-        }
-        if (!dir.isDirectory()) {
-            throw new IllegalStateException(dir.getName() + " is not a directory.");
-        }
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6811c9b8/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
index eff6e5a..cf4aa1c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -50,7 +50,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
 
     @Override
     public void start() {
-        if (reporter != null ) {
+        if (reporter != null) {
             LOG.debug("Starting...");
             reporter.start();
         } else {
@@ -60,7 +60,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
 
     @Override
     public void stop() {
-        if (reporter !=null) {
+        if (reporter != null) {
             LOG.debug("Stopping...");
             reporter.stop();
         } else {


[03/11] storm git commit: Addressing Code review comments

Posted by ki...@apache.org.
Addressing Code review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9368619f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9368619f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9368619f

Branch: refs/heads/1.x-branch
Commit: 9368619fb745da2e370881264f4c8278cb3b5bbf
Parents: e84528b
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Wed Feb 3 16:35:36 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:32 2016 +0000

----------------------------------------------------------------------
 conf/defaults.yaml                                             | 2 +-
 storm-core/src/jvm/org/apache/storm/Config.java                | 5 ++---
 .../src/jvm/org/apache/storm/statistics/StatisticsUtils.java   | 2 +-
 .../apache/storm/statistics/reporters/PreparableReporter.java  | 6 +++---
 4 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9368619f/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b468290..5df4a63 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -283,5 +283,5 @@ pacemaker.auth.method: "NONE"
 pacemaker.kerberos.users: []
 
 #default plugin for daemon statistics reporter
-storm.statistics.preparable.reporter.plugin:
+storm.statistics.preparable.reporter.plugins:
      - "org.apache.storm.statistics.reporters.JmxPreparableReporter"

http://git-wip-us.apache.org/repos/asf/storm/blob/9368619f/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 9d18667..adeb4d6 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,11 +140,10 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
     /**
-     * A list of statistics  preparable reporter class names.
+     * A list of daemon statistics  reporter plugin class names.
      */
-    @NotNull
     @isStringList
-    public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN = "storm.statistics.preparable.reporter.plugin";
+    public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS = "storm.statistics.preparable.reporter.plugins";
 
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/9368619f/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
index ba7edc4..12d33c4 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
@@ -33,7 +33,7 @@ public class StatisticsUtils {
 
     public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
         PreparableReporter reporter = new JmxPreparableReporter();
-        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS);
         List<PreparableReporter> reporterList = new ArrayList<>();
 
         if (clazzes != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9368619f/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
index ce3e8fe..dc29a4a 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
@@ -25,8 +25,8 @@ import java.util.Map;
 
 
 public interface PreparableReporter<T extends Reporter & Closeable> {
-  public abstract void prepare(MetricRegistry metricsRegistry, Map stormConf);
-  public abstract void start();
-  public abstract void stop();
+  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
+  public void start();
+  public void stop();
 
 }


[10/11] storm git commit: Fixing java docs

Posted by ki...@apache.org.
Fixing java docs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b8c8424
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b8c8424
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b8c8424

Branch: refs/heads/1.x-branch
Commit: 2b8c8424ec80b75c37bea46f75b8c44a8c095198
Parents: 137efb1
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 22:12:59 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 20:24:44 2016 +0000

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj   |  4 ++--
 storm-core/src/jvm/org/apache/storm/Config.java  |  2 +-
 .../storm/daemon/metrics/MetricsUtils.java       | 19 ++++++++++++++++---
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2b8c8424/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 3c9eaca..c1e261f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -19,9 +19,9 @@
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils])
   (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.statistics.reporters PreparableReporter]
+  (:import [org.apache.storm.daemon.metrics.reporters PreparableReporter]
            [com.codahale.metrics MetricRegistry]
-           [org.apache.storm.statistics StatisticsUtils])
+           [org.apache.storm.daemon.metrics MetricsUtils])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.metric SystemBolt])

http://git-wip-us.apache.org/repos/asf/storm/blob/2b8c8424/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index df0e64c..74231a0 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,7 +140,7 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
     /**
-     * A list of daemon metrics  reporter plugin class names. The classes should implement
+     * A list of daemon metrics  reporter plugin class names.
      * These plugins must implement {@link org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface.
      */
     @isStringList

http://git-wip-us.apache.org/repos/asf/storm/blob/2b8c8424/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
index aa5ce28..56b920b 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -20,7 +20,6 @@ package org.apache.storm.daemon.metrics;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,8 +85,8 @@ public class MetricsUtils {
     public static File getCsvLogDir(Map stormConf) {
         String csvMetricsLogDirectory = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR), null);
         if (csvMetricsLogDirectory == null) {
-            csvMetricsLogDirectory = ConfigUtils.absoluteHealthCheckDir(stormConf);
-            csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics";
+            csvMetricsLogDirectory = absoluteStormLocalDir(stormConf);
+            csvMetricsLogDirectory = csvMetricsLogDirectory + File.separator + "csvmetrics";
         }
         File csvMetricsDir = new File(csvMetricsLogDirectory);
         validateCreateOutputDir(csvMetricsDir);
@@ -105,4 +104,18 @@ public class MetricsUtils {
             throw new IllegalStateException(dir.getName() + " is not a directory.");
         }
     }
+
+    public static String absoluteStormLocalDir(Map conf) {
+        String stormHome = System.getProperty("storm.home");
+        String localDir = (String) conf.get(Config.STORM_LOCAL_DIR);
+        if (localDir == null) {
+            return (stormHome + File.separator + "storm-local");
+        } else {
+            if (new File(localDir).isAbsolute()) {
+                return localDir;
+            } else {
+                return (stormHome + File.separator + localDir);
+            }
+        }
+    }
 }


[02/11] storm git commit: Renaming the package and config

Posted by ki...@apache.org.
Renaming the package and config


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2207d662
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2207d662
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2207d662

Branch: refs/heads/1.x-branch
Commit: 2207d662b0c45fd84edf0f9a9c64d5651b182aab
Parents: 9368619
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 10:25:29 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:32 2016 +0000

----------------------------------------------------------------------
 conf/defaults.yaml                              |  6 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  4 +-
 .../storm/daemon/metrics/StatisticsUtils.java   | 58 ++++++++++++
 .../reporters/ConsolePreparableReporter.java    | 82 +++++++++++++++++
 .../reporters/CsvPreparableReporter.java        | 97 ++++++++++++++++++++
 .../reporters/JmxPreparableReporter.java        | 73 +++++++++++++++
 .../metrics/reporters/PreparableReporter.java   | 32 +++++++
 .../storm/statistics/StatisticsUtils.java       | 58 ------------
 .../reporters/ConsolePreparableReporter.java    | 82 -----------------
 .../reporters/CsvPreparableReporter.java        | 97 --------------------
 .../reporters/JmxPreparableReporter.java        | 73 ---------------
 .../reporters/PreparableReporter.java           | 32 -------
 12 files changed, 347 insertions(+), 347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 5df4a63..d381f0d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -282,6 +282,6 @@ pacemaker.childopts: "-Xmx1024m"
 pacemaker.auth.method: "NONE"
 pacemaker.kerberos.users: []
 
-#default plugin for daemon statistics reporter
-storm.statistics.preparable.reporter.plugins:
-     - "org.apache.storm.statistics.reporters.JmxPreparableReporter"
+#default storm daemon metrics reporter plugins
+storm.daemon.metrics.reporter.plugins:
+     - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index adeb4d6..100a824 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,10 +140,10 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
     /**
-     * A list of daemon statistics  reporter plugin class names.
+     * A list of daemon metrics  reporter plugin class names.
      */
     @isStringList
-    public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS = "storm.statistics.preparable.reporter.plugins";
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
 
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
new file mode 100644
index 0000000..d28e667
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StatisticsUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
+
+    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+        PreparableReporter reporter = new JmxPreparableReporter();
+        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
+        List<PreparableReporter> reporterList = new ArrayList<>();
+
+        if (clazzes != null) {
+            for(String clazz: clazzes ) {
+                reporterList.add(getPreparableReporter(clazz));
+            }
+        }
+        if(reporterList.isEmpty()) {
+            reporterList.add(new JmxPreparableReporter());
+        }
+        return reporterList;
+    }
+
+    private static PreparableReporter getPreparableReporter(String clazz) {
+        PreparableReporter reporter = null;
+        LOG.info("Using statistics reporter plugin:" + clazz);
+        if(clazz != null) {
+            reporter = (PreparableReporter) Utils.newInstance(clazz);
+        }
+        return reporter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
new file mode 100644
index 0000000..1b987a8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
+    ConsoleReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
+        PrintStream stream = (PrintStream)stormConf.get(":stream");
+        if (stream != null) {
+            builder.outputTo(stream);
+        }
+        Locale locale = (Locale)stormConf.get(":locale");
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
new file mode 100644
index 0000000..77d5393
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
+    CsvReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = (Locale) stormConf.get(":locale");
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
+        File logDir = new File(localStormDirLocation + "csvmetrics" );
+        validateCreateOutputDir(logDir);
+        reporter = builder.build(logDir);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+
+
+    private void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a directory.");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
new file mode 100644
index 0000000..988bb47
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+        String domain = Utils.getString(stormConf.get(":domain"), null);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
new file mode 100644
index 0000000..f19f8b1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.io.Closeable;
+import java.util.Map;
+
+
+public interface PreparableReporter<T extends Reporter & Closeable> {
+  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
+  public void start();
+  public void stop();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
deleted file mode 100644
index 12d33c4..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics;
-
-import org.apache.storm.Config;
-import org.apache.storm.statistics.reporters.JmxPreparableReporter;
-import org.apache.storm.statistics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StatisticsUtils {
-    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
-
-    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
-        PreparableReporter reporter = new JmxPreparableReporter();
-        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS);
-        List<PreparableReporter> reporterList = new ArrayList<>();
-
-        if (clazzes != null) {
-            for(String clazz: clazzes ) {
-                reporterList.add(getPreparableReporter(clazz));
-            }
-        }
-        if(reporterList.isEmpty()) {
-            reporterList.add(new JmxPreparableReporter());
-        }
-        return reporterList;
-    }
-
-    private static PreparableReporter getPreparableReporter(String clazz) {
-        PreparableReporter reporter = null;
-        LOG.info("Using statistics reporter plugin:" + clazz);
-        if(clazz != null) {
-            reporter = (PreparableReporter) Utils.newInstance(clazz);
-        }
-        return reporter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
deleted file mode 100644
index 35ae83f..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintStream;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
-    ConsoleReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
-        PrintStream stream = (PrintStream)stormConf.get(":stream");
-        if (stream != null) {
-            builder.outputTo(stream);
-        }
-        Locale locale = (Locale)stormConf.get(":locale");
-        if (locale != null) {
-            builder.formattedFor(locale);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
-        if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        reporter = builder.build();
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null ) {
-            LOG.info("Starting...");
-            reporter.start(10, TimeUnit.SECONDS);
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter !=null) {
-            LOG.info("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
deleted file mode 100644
index 8ed0b3e..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.Config;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
-    CsvReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
-
-        Locale locale = (Locale) stormConf.get(":locale");
-        if (locale != null) {
-            builder.formatFor(locale);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
-        if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
-        File logDir = new File(localStormDirLocation + "csvmetrics" );
-        validateCreateOutputDir(logDir);
-        reporter = builder.build(logDir);
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null) {
-            LOG.info("Starting...");
-            reporter.start(10, TimeUnit.SECONDS);
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter != null) {
-            LOG.info("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-
-
-    private void validateCreateOutputDir(File dir) {
-        if (!dir.exists()) {
-            dir.mkdirs();
-        }
-        if (!dir.canWrite()) {
-            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
-        }
-        if (!dir.isDirectory()) {
-            throw new IllegalStateException(dir.getName() + " is not a directory.");
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
deleted file mode 100644
index 6b0cbda..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
-    JmxReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = Utils.getString(stormConf.get(":domain"), null);
-        if (domain != null) {
-            builder.inDomain(domain);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        reporter = builder.build();
-
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null ) {
-            LOG.info("Starting...");
-            reporter.start();
-        } else {
-            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter !=null) {
-            LOG.info("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
deleted file mode 100644
index dc29a4a..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-
-import java.io.Closeable;
-import java.util.Map;
-
-
-public interface PreparableReporter<T extends Reporter & Closeable> {
-  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
-  public void start();
-  public void stop();
-
-}


[09/11] storm git commit: Modify config variable documentation

Posted by ki...@apache.org.
Modify config variable documentation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/257f1d35
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/257f1d35
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/257f1d35

Branch: refs/heads/1.x-branch
Commit: 257f1d355367a3ea24e099f65a3b11c3907e037a
Parents: 6811c9b
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 14:47:32 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:30:06 2016 +0000

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/Config.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/257f1d35/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index a456bb2..df0e64c 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,7 +140,8 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
 
     /**
-     * A list of daemon metrics  reporter plugin class names.
+     * A list of daemon metrics  reporter plugin class names. The classes should implement
+     * These plugins must implement {@link org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface.
      */
     @isStringList
     public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";


[06/11] storm git commit: Addressing comments about reporter configs

Posted by ki...@apache.org.
Addressing comments about reporter configs

Conflicts:
	storm-core/src/clj/org/apache/storm/daemon/common.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d5198155
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d5198155
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d5198155

Branch: refs/heads/1.x-branch
Commit: d51981559e015c5b3b26e05eb07988c3a34bb737
Parents: 2207d66
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 13:16:26 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:29:52 2016 +0000

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  2 +-
 storm-core/src/jvm/org/apache/storm/Config.java | 24 ++++++
 .../storm/daemon/metrics/MetricsUtils.java      | 83 ++++++++++++++++++++
 .../storm/daemon/metrics/StatisticsUtils.java   | 58 --------------
 .../reporters/ConsolePreparableReporter.java    | 29 ++++---
 .../reporters/CsvPreparableReporter.java        | 32 ++++----
 .../reporters/JmxPreparableReporter.java        | 17 ++--
 .../metrics/reporters/PreparableReporter.java   |  6 +-
 8 files changed, 148 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6b7d539..3c9eaca 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -40,7 +40,7 @@
   (log-message "Started statistics report plugin..."))
 
 (defn start-metrics-reporters [conf]
-  (doseq [reporter (StatisticsUtils/getPreparableReporters conf)]
+  (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
     (start-metrics-reporter reporter conf)))
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 100a824..49306eb 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -146,6 +146,30 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
 
     /**
+     * A specify Locale for daemon metrics reporter plugin.
+     * Use the specified IETF BCP 47 language tag string for a Locale.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.local";
+
+    /**
+     * A specify domain for daemon metrics reporter plugin to limit reporting to specific domain.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain";
+
+    /**
+     * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit";
+
+    /**
+     * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit";
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     @isStringList

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
new file mode 100644
index 0000000..4425f59
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(MetricsUtils.class);
+
+    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
+        List<PreparableReporter> reporterList = new ArrayList<>();
+
+        if (clazzes != null) {
+            for(String clazz: clazzes ) {
+                reporterList.add(getPreparableReporter(clazz));
+            }
+        }
+        if(reporterList.isEmpty()) {
+            reporterList.add(new JmxPreparableReporter());
+        }
+        return reporterList;
+    }
+
+    private static PreparableReporter getPreparableReporter(String clazz) {
+        PreparableReporter reporter = null;
+        LOG.info("Using statistics reporter plugin:" + clazz);
+        if(clazz != null) {
+            reporter = (PreparableReporter) Utils.newInstance(clazz);
+        }
+        return reporter;
+    }
+
+    public static Locale getMetricsReporterLocale(Map stormConf) {
+        String languageTag = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null);
+        if(languageTag != null) {
+            return Locale.forLanguageTag(languageTag);
+        }
+        return null;
+    }
+
+    public static TimeUnit getMetricsRateUnit(Map stormConf) {
+        return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT);
+    }
+
+    public static TimeUnit getMetricsDurationUnit(Map stormConf) {
+        return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT);
+    }
+
+    private static TimeUnit getTimeUnitForCofig(Map stormConf, String configName) {
+        String rateUnitString = Utils.getString(stormConf.get(configName), null);
+        if(rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
deleted file mode 100644
index d28e667..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StatisticsUtils {
-    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
-
-    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
-        PreparableReporter reporter = new JmxPreparableReporter();
-        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
-        List<PreparableReporter> reporterList = new ArrayList<>();
-
-        if (clazzes != null) {
-            for(String clazz: clazzes ) {
-                reporterList.add(getPreparableReporter(clazz));
-            }
-        }
-        if(reporterList.isEmpty()) {
-            reporterList.add(new JmxPreparableReporter());
-        }
-        return reporterList;
-    }
-
-    private static PreparableReporter getPreparableReporter(String clazz) {
-        PreparableReporter reporter = null;
-        LOG.info("Using statistics reporter plugin:" + clazz);
-        if(clazz != null) {
-            reporter = (PreparableReporter) Utils.newInstance(clazz);
-        }
-        return reporter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index 1b987a8..2f466ce 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -18,9 +18,8 @@
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,27 +34,27 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
 
     @Override
     public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
+        LOG.debug("Preparing...");
         ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
-        PrintStream stream = (PrintStream)stormConf.get(":stream");
+
+        PrintStream stream = System.out;
         if (stream != null) {
             builder.outputTo(stream);
         }
-        Locale locale = (Locale)stormConf.get(":locale");
+
+        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
         if (locale != null) {
             builder.formattedFor(locale);
         }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
         if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+            builder.convertRatesTo(rateUnit);
         }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
         if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
+            builder.convertDurationsTo(durationUnit);
         }
         reporter = builder.build();
     }
@@ -63,7 +62,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
     @Override
     public void start() {
         if (reporter != null ) {
-            LOG.info("Starting...");
+            LOG.debug("Starting...");
             reporter.start(10, TimeUnit.SECONDS);
         } else {
             throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -73,7 +72,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
     @Override
     public void stop() {
         if (reporter !=null) {
-            LOG.info("Stopping...");
+            LOG.debug("Stopping...");
             reporter.stop();
         } else {
             throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
index 77d5393..28fd605 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -18,9 +18,9 @@
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,35 +36,35 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
 
     @Override
     public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
+        LOG.debug("Preparing...");
         CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
 
-        Locale locale = (Locale) stormConf.get(":locale");
+        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
         if (locale != null) {
             builder.formatFor(locale);
         }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
         if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+            builder.convertRatesTo(rateUnit);
         }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
         if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
+            builder.convertDurationsTo(durationUnit);
         }
+
         String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
-        File logDir = new File(localStormDirLocation + "csvmetrics" );
-        validateCreateOutputDir(logDir);
-        reporter = builder.build(logDir);
+        File csvMetricsDir = new File(localStormDirLocation + System.getProperty("file.separator") + "csvmetrics" );
+        validateCreateOutputDir(csvMetricsDir);
+
+        reporter = builder.build(csvMetricsDir);
     }
 
     @Override
     public void start() {
         if (reporter != null) {
-            LOG.info("Starting...");
+            LOG.debug("Starting...");
             reporter.start(10, TimeUnit.SECONDS);
         } else {
             throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -74,7 +74,7 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
     @Override
     public void stop() {
         if (reporter != null) {
-            LOG.info("Stopping...");
+            LOG.debug("Stopping...");
             reporter.stop();
         } else {
             throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
index 988bb47..eff6e5a 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -18,8 +18,9 @@
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,17 +36,13 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
     public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
         LOG.info("Preparing...");
         JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = Utils.getString(stormConf.get(":domain"), null);
+        String domain = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null);
         if (domain != null) {
             builder.inDomain(domain);
         }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
         if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
+            builder.convertRatesTo(rateUnit);
         }
         reporter = builder.build();
 
@@ -54,7 +51,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
     @Override
     public void start() {
         if (reporter != null ) {
-            LOG.info("Starting...");
+            LOG.debug("Starting...");
             reporter.start();
         } else {
             throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -64,7 +61,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
     @Override
     public void stop() {
         if (reporter !=null) {
-            LOG.info("Stopping...");
+            LOG.debug("Stopping...");
             reporter.stop();
         } else {
             throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
index f19f8b1..2968bfb 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -25,8 +25,8 @@ import java.util.Map;
 
 
 public interface PreparableReporter<T extends Reporter & Closeable> {
-  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
-  public void start();
-  public void stop();
+  void prepare(MetricRegistry metricsRegistry, Map stormConf);
+  void start();
+  void stop();
 
 }


[11/11] storm git commit: add STORM-1524 to CHANGELOG.md

Posted by ki...@apache.org.
add STORM-1524 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1582c220
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1582c220
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1582c220

Branch: refs/heads/1.x-branch
Commit: 1582c220a92170590a58450457ac5007c757a396
Parents: 2b8c842
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Feb 5 19:33:05 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 20:24:46 2016 +0000

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1582c220/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 56529ce..2aa6882 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1524: Add Pluggable daemon metrics Reporters
  * STORM-1517: Add peek api in trident stream
  * STORM-1455: kafka spout should not reset to the beginning of partition when offsetoutofrange exception occurs
  * STORM-1518: Backport of STORM-1504


[04/11] storm git commit: Adding Cvs and Console statistics reporter plugins

Posted by ki...@apache.org.
Adding Cvs and Console statistics reporter plugins

Make statistics reporter plugins a list.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea6cccfc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea6cccfc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea6cccfc

Branch: refs/heads/1.x-branch
Commit: ea6cccfcab257fe6854867c2b54029378953080a
Parents: 6868233
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Wed Feb 3 13:47:09 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:32 2016 +0000

----------------------------------------------------------------------
 conf/defaults.yaml                              |  4 +
 .../src/clj/org/apache/storm/daemon/common.clj  |  9 ++-
 storm-core/src/jvm/org/apache/storm/Config.java |  2 +-
 .../storm/statistics/StatisticsUtils.java       | 27 +++++--
 .../reporters/ConsolePreparableReporter.java    | 65 ++++++++++++++++
 .../reporters/CsvPreparableReporter.java        | 80 ++++++++++++++++++++
 .../reporters/JMXPreparableReporter.java        | 49 ------------
 .../reporters/JmxPreparableReporter.java        | 56 ++++++++++++++
 8 files changed, 234 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8873d12..b468290 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -281,3 +281,7 @@ pacemaker.thread.timeout: 10
 pacemaker.childopts: "-Xmx1024m"
 pacemaker.auth.method: "NONE"
 pacemaker.kerberos.users: []
+
+#default plugin for daemon statistics reporter
+storm.statistics.preparable.reporter.plugin:
+     - "org.apache.storm.statistics.reporters.JmxPreparableReporter"

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 44a1d43..6b7d539 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -33,12 +33,17 @@
   (:require [org.apache.storm.thrift :as thrift])
   (:require [metrics.core  :refer [default-registry]]))
 
-(defn start-metrics-reporters [conf]
-  (doto (StatisticsUtils/getPreparableReporter conf)
+(defn start-metrics-reporter [reporter conf]
+  (doto reporter
     (.prepare default-registry conf)
     (.start))
   (log-message "Started statistics report plugin..."))
 
+(defn start-metrics-reporters [conf]
+  (doseq [reporter (StatisticsUtils/getPreparableReporters conf)]
+    (start-metrics-reporter reporter conf)))
+
+
 (def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
 (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
 (def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index bf50223..9d18667 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -143,7 +143,7 @@ public class Config extends HashMap<String, Object> {
      * A list of statistics  preparable reporter class names.
      */
     @NotNull
-    @isImplementationOfClass(implementsClass = org.apache.storm.statistics.reporters.PreparableReporter.class)
+    @isStringList
     public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN = "storm.statistics.preparable.reporter.plugin";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
index 19f7690..666e44d 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
@@ -1,25 +1,40 @@
 package org.apache.storm.statistics;
 
 import org.apache.storm.Config;
-import org.apache.storm.statistics.reporters.JMXPreparableReporter;
+import org.apache.storm.statistics.reporters.JmxPreparableReporter;
 import org.apache.storm.statistics.reporters.PreparableReporter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class StatisticsUtils {
     private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
 
-    public static PreparableReporter getPreparableReporter(Map stormConf) {
-        PreparableReporter reporter = new JMXPreparableReporter();
-        String clazz = (String) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+        PreparableReporter reporter = new JmxPreparableReporter();
+        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN);
+        List<PreparableReporter> reporterList = new ArrayList<>();
+
+        if (clazzes != null) {
+            for(String clazz: clazzes ) {
+                reporterList.add(getPreparableReporter(clazz));
+            }
+        }
+        if(reporterList.isEmpty()) {
+            reporterList.add(new JmxPreparableReporter());
+        }
+        return reporterList;
+    }
+
+    private static PreparableReporter getPreparableReporter(String clazz) {
+        PreparableReporter reporter = null;
         LOG.info("Using statistics reporter plugin:" + clazz);
         if(clazz != null) {
             reporter = (PreparableReporter) Utils.newInstance(clazz);
-        } else {
-            reporter = new JMXPreparableReporter();
         }
         return reporter;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
new file mode 100644
index 0000000..f545b5b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
@@ -0,0 +1,65 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
+    ConsoleReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
+        PrintStream stream = (PrintStream)stormConf.get(":stream");
+        if (stream != null) {
+            builder.outputTo(stream);
+        }
+        Locale locale = (Locale)stormConf.get(":locale");
+        if (locale != null) {
+            builder.formattedFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
new file mode 100644
index 0000000..610df33
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
@@ -0,0 +1,80 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
+    CsvReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+        Locale locale = (Locale) stormConf.get(":locale");
+        if (locale != null) {
+            builder.formatFor(locale);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+        if (durationUnit != null) {
+            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
+        File logDir = new File(localStormDirLocation + "csvmetrics" );
+        validateCreateOutputDir(logDir);
+        reporter = builder.build(logDir);
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null) {
+            LOG.info("Starting...");
+            reporter.start(10, TimeUnit.SECONDS);
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter != null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+
+
+    private void validateCreateOutputDir(File dir) {
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+        if (!dir.canWrite()) {
+            throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+        }
+        if (!dir.isDirectory()) {
+            throw new IllegalStateException(dir.getName() + " is not a directory.");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
deleted file mode 100644
index 5d94ffc..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class JMXPreparableReporter implements PreparableReporter<JmxReporter> {
-    private final static Logger LOG = LoggerFactory.getLogger(JMXPreparableReporter.class);
-
-    JmxReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = Utils.getString(stormConf.get(":domain"), null);
-        if (domain != null) {
-            builder.inDomain(domain);
-        }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
-        if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
-        }
-        reporter = builder.build();
-
-    }
-
-    @Override
-    public void start() {
-        LOG.info("Starting...");
-        reporter.start();
-    }
-
-    @Override
-    public void stop() {
-        LOG.info("Stopping...");
-        reporter.stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
new file mode 100644
index 0000000..ba59611
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
@@ -0,0 +1,56 @@
+package org.apache.storm.statistics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
+    private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
+    JmxReporter reporter = null;
+
+    @Override
+    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+        LOG.info("Preparing...");
+        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+        String domain = Utils.getString(stormConf.get(":domain"), null);
+        if (domain != null) {
+            builder.inDomain(domain);
+        }
+        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        if (rateUnit != null) {
+            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+        }
+        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+        if (filter != null) {
+            builder.filter(filter);
+        }
+        reporter = builder.build();
+
+    }
+
+    @Override
+    public void start() {
+        if (reporter != null ) {
+            LOG.info("Starting...");
+            reporter.start();
+        } else {
+            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (reporter !=null) {
+            LOG.info("Stopping...");
+            reporter.stop();
+        } else {
+            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+        }
+    }
+}


[05/11] storm git commit: Adding Apache license header to new files.

Posted by ki...@apache.org.
Adding Apache license header to new files.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e84528bb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e84528bb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e84528bb

Branch: refs/heads/1.x-branch
Commit: e84528bb10196e6441d8784d5b8125b690027129
Parents: ea6cccf
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Wed Feb 3 16:21:06 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:32 2016 +0000

----------------------------------------------------------------------
 .../apache/storm/statistics/StatisticsUtils.java   | 17 +++++++++++++++++
 .../reporters/ConsolePreparableReporter.java       | 17 +++++++++++++++++
 .../reporters/CsvPreparableReporter.java           | 17 +++++++++++++++++
 .../reporters/JmxPreparableReporter.java           | 17 +++++++++++++++++
 .../statistics/reporters/PreparableReporter.java   | 17 +++++++++++++++++
 5 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e84528bb/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
index 666e44d..ba7edc4 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.statistics;
 
 import org.apache.storm.Config;

http://git-wip-us.apache.org/repos/asf/storm/blob/e84528bb/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
index f545b5b..35ae83f 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.statistics.reporters;
 
 import com.codahale.metrics.ConsoleReporter;

http://git-wip-us.apache.org/repos/asf/storm/blob/e84528bb/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
index 610df33..8ed0b3e 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.statistics.reporters;
 
 import com.codahale.metrics.CsvReporter;

http://git-wip-us.apache.org/repos/asf/storm/blob/e84528bb/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
index ba59611..6b0cbda 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.statistics.reporters;
 
 import com.codahale.metrics.JmxReporter;

http://git-wip-us.apache.org/repos/asf/storm/blob/e84528bb/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
index f6e8b2b..ce3e8fe 100644
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.statistics.reporters;
 
 import com.codahale.metrics.MetricRegistry;