You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/02/05 21:25:21 UTC
[02/11] storm git commit: Renaming the package and config
Renaming the package and config
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2207d662
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2207d662
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2207d662
Branch: refs/heads/1.x-branch
Commit: 2207d662b0c45fd84edf0f9a9c64d5651b182aab
Parents: 9368619
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Thu Feb 4 10:25:29 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 5 19:28:32 2016 +0000
----------------------------------------------------------------------
conf/defaults.yaml | 6 +-
storm-core/src/jvm/org/apache/storm/Config.java | 4 +-
.../storm/daemon/metrics/StatisticsUtils.java | 58 ++++++++++++
.../reporters/ConsolePreparableReporter.java | 82 +++++++++++++++++
.../reporters/CsvPreparableReporter.java | 97 ++++++++++++++++++++
.../reporters/JmxPreparableReporter.java | 73 +++++++++++++++
.../metrics/reporters/PreparableReporter.java | 32 +++++++
.../storm/statistics/StatisticsUtils.java | 58 ------------
.../reporters/ConsolePreparableReporter.java | 82 -----------------
.../reporters/CsvPreparableReporter.java | 97 --------------------
.../reporters/JmxPreparableReporter.java | 73 ---------------
.../reporters/PreparableReporter.java | 32 -------
12 files changed, 347 insertions(+), 347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 5df4a63..d381f0d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -282,6 +282,6 @@ pacemaker.childopts: "-Xmx1024m"
pacemaker.auth.method: "NONE"
pacemaker.kerberos.users: []
-#default plugin for daemon statistics reporter
-storm.statistics.preparable.reporter.plugins:
- - "org.apache.storm.statistics.reporters.JmxPreparableReporter"
+#default storm daemon metrics reporter plugins
+storm.daemon.metrics.reporter.plugins:
+ - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index adeb4d6..100a824 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -140,10 +140,10 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
/**
- * A list of daemon statistics reporter plugin class names.
+ * A list of daemon metrics reporter plugin class names.
*/
@isStringList
- public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS = "storm.statistics.preparable.reporter.plugins";
+ public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins";
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
new file mode 100644
index 0000000..d28e667
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/StatisticsUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StatisticsUtils {
+ private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
+
+ public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
+ PreparableReporter reporter = new JmxPreparableReporter();
+ List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
+ List<PreparableReporter> reporterList = new ArrayList<>();
+
+ if (clazzes != null) {
+ for(String clazz: clazzes ) {
+ reporterList.add(getPreparableReporter(clazz));
+ }
+ }
+ if(reporterList.isEmpty()) {
+ reporterList.add(new JmxPreparableReporter());
+ }
+ return reporterList;
+ }
+
+ private static PreparableReporter getPreparableReporter(String clazz) {
+ PreparableReporter reporter = null;
+ LOG.info("Using statistics reporter plugin:" + clazz);
+ if(clazz != null) {
+ reporter = (PreparableReporter) Utils.newInstance(clazz);
+ }
+ return reporter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
new file mode 100644
index 0000000..1b987a8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
+ ConsoleReporter reporter = null;
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+ LOG.info("Preparing...");
+ ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
+ PrintStream stream = (PrintStream)stormConf.get(":stream");
+ if (stream != null) {
+ builder.outputTo(stream);
+ }
+ Locale locale = (Locale)stormConf.get(":locale");
+ if (locale != null) {
+ builder.formattedFor(locale);
+ }
+ String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+ if (rateUnit != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+ }
+ String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+ }
+ MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+ if (filter != null) {
+ builder.filter(filter);
+ }
+ reporter = builder.build();
+ }
+
+ @Override
+ public void start() {
+ if (reporter != null ) {
+ LOG.info("Starting...");
+ reporter.start(10, TimeUnit.SECONDS);
+ } else {
+ throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (reporter !=null) {
+ LOG.info("Stopping...");
+ reporter.stop();
+ } else {
+ throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
new file mode 100644
index 0000000..77d5393
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
+ CsvReporter reporter = null;
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+ LOG.info("Preparing...");
+ CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
+
+ Locale locale = (Locale) stormConf.get(":locale");
+ if (locale != null) {
+ builder.formatFor(locale);
+ }
+ String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+ if (rateUnit != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+ }
+ String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
+ if (durationUnit != null) {
+ builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
+ }
+ MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+ if (filter != null) {
+ builder.filter(filter);
+ }
+ String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
+ File logDir = new File(localStormDirLocation + "csvmetrics" );
+ validateCreateOutputDir(logDir);
+ reporter = builder.build(logDir);
+ }
+
+ @Override
+ public void start() {
+ if (reporter != null) {
+ LOG.info("Starting...");
+ reporter.start(10, TimeUnit.SECONDS);
+ } else {
+ throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (reporter != null) {
+ LOG.info("Stopping...");
+ reporter.stop();
+ } else {
+ throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+ }
+ }
+
+
+ private void validateCreateOutputDir(File dir) {
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ if (!dir.canWrite()) {
+ throw new IllegalStateException(dir.getName() + " does not have write permissions.");
+ }
+ if (!dir.isDirectory()) {
+ throw new IllegalStateException(dir.getName() + " is not a directory.");
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
new file mode 100644
index 0000000..988bb47
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
+ private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
+ JmxReporter reporter = null;
+
+ @Override
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
+ LOG.info("Preparing...");
+ JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
+ String domain = Utils.getString(stormConf.get(":domain"), null);
+ if (domain != null) {
+ builder.inDomain(domain);
+ }
+ String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
+ if (rateUnit != null) {
+ builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
+ }
+ MetricFilter filter = (MetricFilter) stormConf.get(":filter");
+ if (filter != null) {
+ builder.filter(filter);
+ }
+ reporter = builder.build();
+
+ }
+
+ @Override
+ public void start() {
+ if (reporter != null ) {
+ LOG.info("Starting...");
+ reporter.start();
+ } else {
+ throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (reporter !=null) {
+ LOG.info("Stopping...");
+ reporter.stop();
+ } else {
+ throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
new file mode 100644
index 0000000..f19f8b1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics.reporters;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+
+import java.io.Closeable;
+import java.util.Map;
+
+
+public interface PreparableReporter<T extends Reporter & Closeable> {
+ public void prepare(MetricRegistry metricsRegistry, Map stormConf);
+ public void start();
+ public void stop();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
deleted file mode 100644
index 12d33c4..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics;
-
-import org.apache.storm.Config;
-import org.apache.storm.statistics.reporters.JmxPreparableReporter;
-import org.apache.storm.statistics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StatisticsUtils {
- private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class);
-
- public static List<PreparableReporter> getPreparableReporters(Map stormConf) {
- PreparableReporter reporter = new JmxPreparableReporter();
- List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGINS);
- List<PreparableReporter> reporterList = new ArrayList<>();
-
- if (clazzes != null) {
- for(String clazz: clazzes ) {
- reporterList.add(getPreparableReporter(clazz));
- }
- }
- if(reporterList.isEmpty()) {
- reporterList.add(new JmxPreparableReporter());
- }
- return reporterList;
- }
-
- private static PreparableReporter getPreparableReporter(String clazz) {
- PreparableReporter reporter = null;
- LOG.info("Using statistics reporter plugin:" + clazz);
- if(clazz != null) {
- reporter = (PreparableReporter) Utils.newInstance(clazz);
- }
- return reporter;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
deleted file mode 100644
index 35ae83f..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintStream;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> {
- private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
- ConsoleReporter reporter = null;
-
- @Override
- public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
- LOG.info("Preparing...");
- ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry);
- PrintStream stream = (PrintStream)stormConf.get(":stream");
- if (stream != null) {
- builder.outputTo(stream);
- }
- Locale locale = (Locale)stormConf.get(":locale");
- if (locale != null) {
- builder.formattedFor(locale);
- }
- String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
- if (rateUnit != null) {
- builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
- }
- String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
- if (durationUnit != null) {
- builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
- }
- MetricFilter filter = (MetricFilter) stormConf.get(":filter");
- if (filter != null) {
- builder.filter(filter);
- }
- reporter = builder.build();
- }
-
- @Override
- public void start() {
- if (reporter != null ) {
- LOG.info("Starting...");
- reporter.start(10, TimeUnit.SECONDS);
- } else {
- throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
- }
- }
-
- @Override
- public void stop() {
- if (reporter !=null) {
- LOG.info("Stopping...");
- reporter.stop();
- } else {
- throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
deleted file mode 100644
index 8ed0b3e..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.Config;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
- private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
- CsvReporter reporter = null;
-
- @Override
- public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
- LOG.info("Preparing...");
- CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
-
- Locale locale = (Locale) stormConf.get(":locale");
- if (locale != null) {
- builder.formatFor(locale);
- }
- String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
- if (rateUnit != null) {
- builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
- }
- String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null);
- if (durationUnit != null) {
- builder.convertDurationsTo(TimeUnit.valueOf(durationUnit));
- }
- MetricFilter filter = (MetricFilter) stormConf.get(":filter");
- if (filter != null) {
- builder.filter(filter);
- }
- String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), ".");
- File logDir = new File(localStormDirLocation + "csvmetrics" );
- validateCreateOutputDir(logDir);
- reporter = builder.build(logDir);
- }
-
- @Override
- public void start() {
- if (reporter != null) {
- LOG.info("Starting...");
- reporter.start(10, TimeUnit.SECONDS);
- } else {
- throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
- }
- }
-
- @Override
- public void stop() {
- if (reporter != null) {
- LOG.info("Stopping...");
- reporter.stop();
- } else {
- throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
- }
- }
-
-
- private void validateCreateOutputDir(File dir) {
- if (!dir.exists()) {
- dir.mkdirs();
- }
- if (!dir.canWrite()) {
- throw new IllegalStateException(dir.getName() + " does not have write permissions.");
- }
- if (!dir.isDirectory()) {
- throw new IllegalStateException(dir.getName() + " is not a directory.");
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
deleted file mode 100644
index 6b0cbda..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
- private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class);
- JmxReporter reporter = null;
-
- @Override
- public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
- LOG.info("Preparing...");
- JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
- String domain = Utils.getString(stormConf.get(":domain"), null);
- if (domain != null) {
- builder.inDomain(domain);
- }
- String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null);
- if (rateUnit != null) {
- builder.convertRatesTo(TimeUnit.valueOf(rateUnit));
- }
- MetricFilter filter = (MetricFilter) stormConf.get(":filter");
- if (filter != null) {
- builder.filter(filter);
- }
- reporter = builder.build();
-
- }
-
- @Override
- public void start() {
- if (reporter != null ) {
- LOG.info("Starting...");
- reporter.start();
- } else {
- throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
- }
- }
-
- @Override
- public void stop() {
- if (reporter !=null) {
- LOG.info("Stopping...");
- reporter.stop();
- } else {
- throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2207d662/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
deleted file mode 100644
index dc29a4a..0000000
--- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/PreparableReporter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.statistics.reporters;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-
-import java.io.Closeable;
-import java.util.Map;
-
-
-public interface PreparableReporter<T extends Reporter & Closeable> {
- public void prepare(MetricRegistry metricsRegistry, Map stormConf);
- public void start();
- public void stop();
-
-}