You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/02/05 20:19:00 UTC

[06/12] storm git commit: Addressing comments about reporter configs

Addressing comments about reporter configs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3337ce85
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3337ce85
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3337ce85

Branch: refs/heads/master
Commit: 3337ce8533f6613ab2b6d4690f53bbc00564b6f7
Parents: c13388b
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 13:16:26 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Thu Feb 4 13:16:26 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  6 +-
 storm-core/src/jvm/org/apache/storm/Config.java | 24 ++++++
 .../storm/daemon/metrics/MetricsUtils.java      | 83 ++++++++++++++++++++
 .../storm/daemon/metrics/StatisticsUtils.java   | 58 --------------
 .../reporters/ConsolePreparableReporter.java    | 29 ++++---
 .../reporters/CsvPreparableReporter.java        | 32 ++++----
 .../reporters/JmxPreparableReporter.java        | 17 ++--
 .../metrics/reporters/PreparableReporter.java   |  6 +-
 8 files changed, 150 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index c073260..d0f8dd9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -18,10 +18,10 @@
   (:import [org.apache.storm.generated StormTopology
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils]
-           [org.apache.storm.statistics.reporters PreparableReporter]
+           [org.apache.storm.daemon.metrics.reporters PreparableReporter]
            [com.codahale.metrics MetricRegistry])
   (:import [org.apache.storm.utils Utils ConfigUtils])
-  (:import [org.apache.storm.statistics StatisticsUtils])
+  (:import [org.apache.storm.daemon.metrics MetricsUtils])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.metric SystemBolt])
@@ -40,7 +40,7 @@
   (log-message "Started statistics report plugin..."))
 
 (defn start-metrics-reporters [conf]
-  (doseq [reporter (StatisticsUtils/getPreparableReporters conf)]
+  (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
     (start-metrics-reporter reporter conf)))
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 100a824..49306eb 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -146,6 +146,30 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
 
     /**
+     * A specify Locale for daemon metrics reporter plugin.
+     * Use the specified IETF BCP 47 language tag string for a Locale.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.local";
+
+    /**
+     * A specify domain for daemon metrics reporter plugin to limit reporting to specific domain.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain";
+
+    /**
+     * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit";
+
+    /**
+     * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin.
+     */
+    @isString
+    public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit";
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     @isStringList

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
new file mode 100644
index 0000000..4425f59
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(MetricsUtils.class);
+
+    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
+        List<PreparableReporter> reporterList = new ArrayList<>();
+
+        if (clazzes != null) {
+            for(String clazz: clazzes ) {
+                reporterList.add(getPreparableReporter(clazz));
+            }
+        }
+        if(reporterList.isEmpty()) {
+            reporterList.add(new JmxPreparableReporter());
+        }
+        return reporterList;
+    }
+
+    private static PreparableReporter getPreparableReporter(String clazz) {
+        PreparableReporter reporter = null;
+        LOG.info("Using statistics reporter plugin:" + clazz);
+        if(clazz != null) {
+            reporter = (PreparableReporter) Utils.newInstance(clazz);
+        }
+        return reporter;
+    }
+
+    public static Locale getMetricsReporterLocale(Map stormConf) {
+        String languageTag = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null);
+        if(languageTag != null) {
+            return Locale.forLanguageTag(languageTag);
+        }
+        return null;
+    }
+
+    public static TimeUnit getMetricsRateUnit(Map stormConf) {
+        return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT);
+    }
+
+    public static TimeUnit getMetricsDurationUnit(Map stormConf) {
+        return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT);
+    }
+
+    private static TimeUnit getTimeUnitForCofig(Map stormConf, String configName) {
+        String rateUnitString = Utils.getString(stormConf.get(configName), null);
+        if(rateUnitString != null) {
+            return TimeUnit.valueOf(rateUnitString);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
deleted file mode 100644
index d28e667..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StatisticsUtils {
-    private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
-
-    public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
-        PreparableReporter reporter = new JmxPreparableReporter();
-        List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
-        List<PreparableReporter> reporterList = new ArrayList<>();
-
-        if (clazzes != null) {
-            for(String clazz: clazzes ) {
-                reporterList.add(getPreparableReporter(clazz));
-            }
-        }
-        if(reporterList.isEmpty()) {
-            reporterList.add(new JmxPreparableReporter());
-        }
-        return reporterList;
-    }
-
-    private static PreparableReporter getPreparableReporter(String clazz) {
-        PreparableReporter reporter = null;
-        LOG.info("Using statistics reporter plugin:" + clazz);
-        if(clazz != null) {
-            reporter = (PreparableReporter) Utils.newInstance(clazz);
-        }
-        return reporter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index 1b987a8..2f466ce 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -18,9 +18,8 @@
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,27 +34,27 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
 
     @Override
     public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
+        LOG.debug("Preparing...");
         ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
-        PrintStream stream = (PrintStream)stormConf.get(":stream");
+
+        PrintStream stream = System.out;
         if (stream != null) {
             builder.outputTo(stream);
         }
-        Locale locale = (Locale)stormConf.get(":locale");
+
+        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
         if (locale != null) {
             builder.formattedFor(locale);
         }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
         if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+            builder.convertRatesTo(rateUnit);
         }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
         if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
+            builder.convertDurationsTo(durationUnit);
         }
         reporter = builder.build();
     }
@@ -63,7 +62,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
     @Override
     public void start() {
         if (reporter != null ) {
-            LOG.info("Starting...");
+            LOG.debug("Starting...");
             reporter.start(10, TimeUnit.SECONDS);
         } else {
             throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -73,7 +72,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
     @Override
     public void stop() {
         if (reporter !=null) {
-            LOG.info("Stopping...");
+            LOG.debug("Stopping...");
             reporter.stop();
         } else {
             throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
index 77d5393..28fd605 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -18,9 +18,9 @@
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,35 +36,35 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
 
     @Override
     public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
+        LOG.debug("Preparing...");
         CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
 
-        Locale locale = (Locale) stormConf.get(":locale");
+        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
         if (locale != null) {
             builder.formatFor(locale);
         }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
         if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+            builder.convertRatesTo(rateUnit);
         }
-        String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+
+        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
         if (durationUnit != null) {
-            builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
+            builder.convertDurationsTo(durationUnit);
         }
+
         String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
-        File logDir = new File(localStormDirLocation + "csvmetrics" );
-        validateCreateOutputDir(logDir);
-        reporter = builder.build(logDir);
+        File csvMetricsDir = new File(localStormDirLocation + System.getProperty("file.separator") + "csvmetrics" );
+        validateCreateOutputDir(csvMetricsDir);
+
+        reporter = builder.build(csvMetricsDir);
     }
 
     @Override
     public void start() {
         if (reporter != null) {
-            LOG.info("Starting...");
+            LOG.debug("Starting...");
             reporter.start(10, TimeUnit.SECONDS);
         } else {
             throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -74,7 +74,7 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
     @Override
     public void stop() {
         if (reporter != null) {
-            LOG.info("Stopping...");
+            LOG.debug("Stopping...");
             reporter.stop();
         } else {
             throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
index 988bb47..eff6e5a 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -18,8 +18,9 @@
 package org.apache.storm.daemon.metrics.reporters;
 
 import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,17 +36,13 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
     public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
         LOG.info("Preparing...");
         JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = Utils.getString(stormConf.get(":domain"), null);
+        String domain = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null);
         if (domain != null) {
             builder.inDomain(domain);
         }
-        String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
         if (rateUnit != null) {
-            builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
-        }
-        MetricFilter filter = (MetricFilter) stormConf.get(":filter");
-        if (filter != null) {
-            builder.filter(filter);
+            builder.convertRatesTo(rateUnit);
         }
         reporter = builder.build();
 
@@ -54,7 +51,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
     @Override
     public void start() {
         if (reporter != null ) {
-            LOG.info("Starting...");
+            LOG.debug("Starting...");
             reporter.start();
         } else {
             throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -64,7 +61,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
     @Override
     public void stop() {
         if (reporter !=null) {
-            LOG.info("Stopping...");
+            LOG.debug("Stopping...");
             reporter.stop();
         } else {
             throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/storm/blob/3337ce85/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
index f19f8b1..2968bfb 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -25,8 +25,8 @@ import java.util.Map;
 
 
 public interface PreparableReporter<T extends Reporter & Closeable> {
-  public void prepare(MetricRegistry metricsRegistry, Map stormConf);
-  public void start();
-  public void stop();
+  void prepare(MetricRegistry metricsRegistry, Map stormConf);
+  void start();
+  void stop();
 
 }