You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:15 UTC
[09/38] storm git commit: address review comments
address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cfa6cd63
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cfa6cd63
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cfa6cd63
Branch: refs/heads/1.x-branch
Commit: cfa6cd63158db66a951f804ded75575b2b4f3d4a
Parents: de20cbd
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Sep 29 13:58:41 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Sep 29 13:58:41 2017 -0400
----------------------------------------------------------------------
.../apache/storm/starter/AnchoredWordCount.java | 12 +++++------
.../storm/metrics2/StormMetricRegistry.java | 21 +++++++++-----------
.../reporters/ConsoleStormReporter.java | 2 +-
.../metrics2/reporters/CsvStormReporter.java | 2 +-
.../reporters/GangliaStormReporter.java | 3 +--
.../reporters/GraphiteStormReporter.java | 4 +---
.../metrics2/reporters/JmxStormReporter.java | 2 +-
.../reporters/ScheduledStormReporter.java | 12 +++++------
.../storm/metrics2/reporters/StormReporter.java | 2 +-
9 files changed, 27 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
index 3b22c9f..c56473a 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java
@@ -43,14 +43,14 @@ public class AnchoredWordCount {
public static class RandomSentenceSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
- SpoutOutputCollector _collector;
- Random _rand;
+ SpoutOutputCollector collector;
+ Random random;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = new Random();
+ this.collector = collector;
+ this.random = new Random();
}
@Override
@@ -58,9 +58,9 @@ public class AnchoredWordCount {
Utils.sleep(10);
String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
- final String sentence = sentences[_rand.nextInt(sentences.length)];
+ final String sentence = sentences[random.nextInt(sentences.length)];
- _collector.emit(new Values(sentence), UUID.randomUUID());
+ this.collector.emit(new Values(sentence), UUID.randomUUID());
}
protected String sentence(String input) {
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index a3b0db9..60d4191 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -23,6 +23,7 @@ import org.apache.storm.Config;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.metrics2.reporters.StormReporter;
import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +47,8 @@ public class StormMetricRegistry {
public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer port){
SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
String metricName = metricName(name, topologyId, componentId, port);
- if(REGISTRY.getGauges().containsKey(metricName)){
- return (SimpleGauge)REGISTRY.getGauges().get(metricName);
+ if(REGISTRY.getGauges().containsKey(metricName)){
+ return (SimpleGauge)REGISTRY.getGauges().get(metricName);
} else {
return REGISTRY.register(metricName, gauge);
}
@@ -72,16 +73,12 @@ public class StormMetricRegistry {
}
public static void start(Map<String, Object> stormConfig, DaemonType type){
- String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME);
- if(localHost != null){
- hostName = localHost;
- } else {
- try {
- hostName = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" +
- " as 'localhost'.");
- }
+ String localHost = "localhost";
+ try {
+ hostName = Utils.localHostname();
+ } catch (UnknownHostException e) {
+ LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" +
+ " as 'localhost'.");
}
LOG.info("Starting metrics reporters...");
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
index abb5226..4c91f03 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -28,7 +28,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class ConsoleStormReporter extends ScheduledStormReporter<ConsoleReporter> {
+public class ConsoleStormReporter extends ScheduledStormReporter {
private final static Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class);
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
index 24c6eed..5d9ff4e 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java
@@ -31,7 +31,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class CsvStormReporter extends ScheduledStormReporter<CsvReporter> {
+public class CsvStormReporter extends ScheduledStormReporter {
private final static Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class);
public static final String CSV_LOG_DIR = "csv.log.dir";
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
index e7dc5f4..09af2e1 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java
@@ -30,7 +30,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class GangliaStormReporter extends ScheduledStormReporter<GangliaReporter> {
+public class GangliaStormReporter extends ScheduledStormReporter {
private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class);
public static final String GANGLIA_HOST = "ganglia.host";
@@ -99,7 +99,6 @@ public class GangliaStormReporter extends ScheduledStormReporter<GangliaReporter
GMetric sender = new GMetric(group, port, mode, ttl);
reporter = builder.build(sender);
}catch (IOException ioe){
- //TODO
LOG.error("Exception in GangliaReporter config", ioe);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
index 0f88fc4..ba3c0c5 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class GraphiteStormReporter extends ScheduledStormReporter<GraphiteReporter> {
+public class GraphiteStormReporter extends ScheduledStormReporter {
private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);
public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
@@ -76,11 +76,9 @@ public class GraphiteStormReporter extends ScheduledStormReporter<GraphiteReport
Integer port = getMetricsTargetPort(reporterConf);
String transport = getMetricsTargetTransport(reporterConf);
GraphiteSender sender = null;
- //TODO: error checking
if (transport.equalsIgnoreCase("udp")) {
sender = new GraphiteUDP(host, port);
} else {
- //TODO: pickled support
sender = new Graphite(host, port);
}
reporter = builder.build(sender);
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
index 5b932ea..325ab1d 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class JmxStormReporter implements StormReporter<JmxReporter> {
+public class JmxStormReporter implements StormReporter {
private final static Logger LOG = LoggerFactory.getLogger(JmxStormReporter.class);
public static final String JMX_DOMAIN = "jmx.domain";
JmxReporter reporter = null;
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
index 940cb19..6ef39b6 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -27,11 +27,11 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-public abstract class ScheduledStormReporter<T extends ScheduledReporter> implements StormReporter{
+public abstract class ScheduledStormReporter implements StormReporter{
private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class);
protected ScheduledReporter reporter;
- long reportingPeriod;
- TimeUnit reportingPeriodUnit;
+ protected long reportingPeriod;
+ protected TimeUnit reportingPeriodUnit;
@Override
public void start() {
@@ -54,7 +54,7 @@ public abstract class ScheduledStormReporter<T extends ScheduledReporter> implem
}
- static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
+ public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
return unit == null ? TimeUnit.SECONDS : unit;
}
@@ -67,11 +67,11 @@ public abstract class ScheduledStormReporter<T extends ScheduledReporter> implem
return null;
}
- static long getReportPeriod(Map reporterConf) {
+ public static long getReportPeriod(Map reporterConf) {
return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
}
- static StormMetricsFilter getMetricsFilter(Map reporterConf){
+ public static StormMetricsFilter getMetricsFilter(Map reporterConf){
StormMetricsFilter filter = null;
Map<String, Object> filterConf = (Map)reporterConf.get("filter");
String clazz = (String) filterConf.get("class");
http://git-wip-us.apache.org/repos/asf/storm/blob/cfa6cd63/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
index c36e44e..a5d9798 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -22,7 +22,7 @@ import com.codahale.metrics.Reporter;
import java.util.Map;
-public interface StormReporter<T extends Reporter> {
+public interface StormReporter extends Reporter {
String REPORT_PERIOD = "report.period";
String REPORT_PERIOD_UNITS = "report.period.units";