You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/02/05 21:25:25 UTC
[06/11] storm git commit: Addressing comments about reporter configs
Addressing comments about reporter configs
Conflicts:
storm-core/src/clj/org/apache/storm/daemon/common.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d5198155
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d5198155
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d5198155
Branch: refs/heads/1.x-branch
Commit: d51981559e015c5b3b26e05eb07988c3a34bb737
Parents: 2207d66
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 13:16:26 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:29:52 2016 +0000
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 2 +-
storm-core/src/jvm/org/apache/storm/Config.java | 24 ++++++
.../storm/daemon/metrics/MetricsUtils.java | 83 ++++++++++++++++++++
.../storm/daemon/metrics/StatisticsUtils.java | 58 --------------
.../reporters/ConsolePreparableReporter.java | 29 ++++---
.../reporters/CsvPreparableReporter.java | 32 ++++----
.../reporters/JmxPreparableReporter.java | 17 ++--
.../metrics/reporters/PreparableReporter.java | 6 +-
8 files changed, 148 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6b7d539..3c9eaca 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -40,7 +40,7 @@
(log-message "Started statistics report plugin..."))
(defn start-metrics-reporters [conf]
- (doseq [reporter (StatisticsUtils/getPreparableReporters conf)]
+ (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
(start-metrics-reporter reporter conf)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 100a824..49306eb 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -146,6 +146,30 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
/**
+ * A specify Locale for daemon metrics reporter plugin.
+ * Use the specified IETF BCP 47 language tag string for a Locale.
+ */
+ @isString
+ public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.local";
+
+ /**
+ * A specify domain for daemon metrics reporter plugin to limit reporting to specific domain.
+ */
+ @isString
+ public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain";
+
+ /**
+ * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin.
+ */
+ @isString
+ public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit";
+
+ /**
+ * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin.
+ */
+ @isString
+ public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit";
+ /**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
@isStringList
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
new file mode 100644
index 0000000..4425f59
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsUtils {
+ private final static Logger LOG = LoggerFactory.getLogger(MetricsUtils.class);
+
+ public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+ List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
+ List<PreparableReporter> reporterList = new ArrayList<>();
+
+ if (clazzes != null) {
+ for(String clazz: clazzes ) {
+ reporterList.add(getPreparableReporter(clazz));
+ }
+ }
+ if(reporterList.isEmpty()) {
+ reporterList.add(new JmxPreparableReporter());
+ }
+ return reporterList;
+ }
+
+ private static PreparableReporter getPreparableReporter(String clazz) {
+ PreparableReporter reporter = null;
+ LOG.info("Using statistics reporter plugin:" + clazz);
+ if(clazz != null) {
+ reporter = (PreparableReporter) Utils.newInstance(clazz);
+ }
+ return reporter;
+ }
+
+ public static Locale getMetricsReporterLocale(Map stormConf) {
+ String languageTag = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null);
+ if(languageTag != null) {
+ return Locale.forLanguageTag(languageTag);
+ }
+ return null;
+ }
+
+ public static TimeUnit getMetricsRateUnit(Map stormConf) {
+ return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT);
+ }
+
+ public static TimeUnit getMetricsDurationUnit(Map stormConf) {
+ return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT);
+ }
+
+ private static TimeUnit getTimeUnitForCofig(Map stormConf, String configName) {
+ String rateUnitString = Utils.getString(stormConf.get(configName), null);
+ if(rateUnitString != null) {
+ return TimeUnit.valueOf(rateUnitString);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
deleted file mode 100644
index d28e667..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StatisticsUtils {
- private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
-
- public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
- PreparableReporter reporter = new JmxPreparableReporter();
- List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
- List<PreparableReporter> reporterList = new ArrayList<>();
-
- if (clazzes != null) {
- for(String clazz: clazzes ) {
- reporterList.add(getPreparableReporter(clazz));
- }
- }
- if(reporterList.isEmpty()) {
- reporterList.add(new JmxPreparableReporter());
- }
- return reporterList;
- }
-
- private static PreparableReporter getPreparableReporter(String clazz) {
- PreparableReporter reporter = null;
- LOG.info("Using statistics reporter plugin:" + clazz);
- if(clazz != null) {
- reporter = (PreparableReporter) Utils.newInstance(clazz);
- }
- return reporter;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
index 1b987a8..2f466ce 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -18,9 +18,8 @@
package org.apache.storm.daemon.metrics.reporters;
import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.daemon.metrics.MetricsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,27 +34,27 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
@Override
public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
- LOG.info("Preparing...");
+ LOG.debug("Preparing...");
ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
- PrintStream stream = (PrintStream)stormConf.get(":stream");
+
+ PrintStream stream = System.out;
if (stream != null) {
builder.outputTo(stream);
}
- Locale locale = (Locale)stormConf.get(":locale");
+
+ Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
if (locale != null) {
builder.formattedFor(locale);
}
- String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
if (rateUnit != null) {
- builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+ builder.convertRatesTo(rateUnit);
}
- String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
if (durationUnit != null) {
- builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
- }
- MetricFilter filter = (MetricFilter) stormConf.get(":filter");
- if (filter != null) {
- builder.filter(filter);
+ builder.convertDurationsTo(durationUnit);
}
reporter = builder.build();
}
@@ -63,7 +62,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
@Override
public void start() {
if (reporter != null ) {
- LOG.info("Starting...");
+ LOG.debug("Starting...");
reporter.start(10, TimeUnit.SECONDS);
} else {
throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -73,7 +72,7 @@ public class ConsolePreparableReporter implements PreparableReporter<ConsoleRepo
@Override
public void stop() {
if (reporter !=null) {
- LOG.info("Stopping...");
+ LOG.debug("Stopping...");
reporter.stop();
} else {
throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
index 77d5393..28fd605 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -18,9 +18,9 @@
package org.apache.storm.daemon.metrics.reporters;
import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,35 +36,35 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
@Override
public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
- LOG.info("Preparing...");
+ LOG.debug("Preparing...");
CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
- Locale locale = (Locale) stormConf.get(":locale");
+ Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
if (locale != null) {
builder.formatFor(locale);
}
- String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
if (rateUnit != null) {
- builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+ builder.convertRatesTo(rateUnit);
}
- String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+
+ TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
if (durationUnit != null) {
- builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
- }
- MetricFilter filter = (MetricFilter) stormConf.get(":filter");
- if (filter != null) {
- builder.filter(filter);
+ builder.convertDurationsTo(durationUnit);
}
+
String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
- File logDir = new File(localStormDirLocation + "csvmetrics" );
- validateCreateOutputDir(logDir);
- reporter = builder.build(logDir);
+ File csvMetricsDir = new File(localStormDirLocation + System.getProperty("file.separator") + "csvmetrics" );
+ validateCreateOutputDir(csvMetricsDir);
+
+ reporter = builder.build(csvMetricsDir);
}
@Override
public void start() {
if (reporter != null) {
- LOG.info("Starting...");
+ LOG.debug("Starting...");
reporter.start(10, TimeUnit.SECONDS);
} else {
throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -74,7 +74,7 @@ public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
@Override
public void stop() {
if (reporter != null) {
- LOG.info("Stopping...");
+ LOG.debug("Stopping...");
reporter.stop();
} else {
throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
index 988bb47..eff6e5a 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -18,8 +18,9 @@
package org.apache.storm.daemon.metrics.reporters;
import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,17 +36,13 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
LOG.info("Preparing...");
JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
- String domain = Utils.getString(stormConf.get(":domain"), null);
+ String domain = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null);
if (domain != null) {
builder.inDomain(domain);
}
- String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+ TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
if (rateUnit != null) {
- builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
- }
- MetricFilter filter = (MetricFilter) stormConf.get(":filter");
- if (filter != null) {
- builder.filter(filter);
+ builder.convertRatesTo(rateUnit);
}
reporter = builder.build();
@@ -54,7 +51,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
@Override
public void start() {
if (reporter != null ) {
- LOG.info("Starting...");
+ LOG.debug("Starting...");
reporter.start();
} else {
throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
@@ -64,7 +61,7 @@ public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
@Override
public void stop() {
if (reporter !=null) {
- LOG.info("Stopping...");
+ LOG.debug("Stopping...");
reporter.stop();
} else {
throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
http://git-wip-us.apache.org/repos/asf/storm/blob/d5198155/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
index f19f8b1..2968bfb 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -25,8 +25,8 @@ import java.util.Map;
public interface PreparableReporter<T extends Reporter & Closeable> {
- public void prepare(MetricRegistry metricsRegistry, Map stormConf);
- public void start();
- public void stop();
+ void prepare(MetricRegistry metricsRegistry, Map stormConf);
+ void start();
+ void stop();
}