You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:29 UTC
[23/38] storm git commit: STORM-2153: address review comments
STORM-2153: address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/868de5b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/868de5b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/868de5b3
Branch: refs/heads/1.x-branch
Commit: 868de5b33b8145d787a9b3d08bdac6908591790d
Parents: b5ae9c3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Dec 22 14:42:35 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Dec 22 14:42:35 2017 -0500
----------------------------------------------------------------------
.../reporters/ScheduledStormReporter.java | 2 +-
.../org/apache/storm/utils/DisruptorQueue.java | 18 ++++++++-----
.../storm/validation/ConfigValidation.java | 28 ++++++++++++--------
3 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
index e88b41b..b7ffa61 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -81,7 +81,7 @@ public abstract class ScheduledStormReporter implements StormReporter{
filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz);
filter.prepare(filterConf);
} catch (Exception e) {
- LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz);
+ throw new RuntimeException("Unable to instantiate StormMetricsFilter class: " + clazz);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index ca8568c..d7cf401 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -437,12 +437,17 @@ public class DisruptorQueue implements IStatefulObject {
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
- METRICS_TIMER.schedule(new TimerTask(){
- @Override
- public void run() {
- _disruptorMetrics.set(_metrics);
- }
- }, 15000, 15000);
+ try {
+ METRICS_TIMER.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ _disruptorMetrics.set(_metrics);
+ }
+ }, 15000, 15000);
+ } catch (IllegalStateException e){
+ // Ignore. IllegalStateException is thrown by Timer.schedule() if the timer
+ // has been cancelled. (This happens in unit tests)
+ }
}
public String getName() {
@@ -458,6 +463,7 @@ public class DisruptorQueue implements IStatefulObject {
publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
_flusher.close();
_metrics.close();
+ METRICS_TIMER.cancel();
} catch (InsufficientCapacityException e) {
//This should be impossible
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
index d7ca48d..9d9db33 100644
--- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -494,6 +494,12 @@ public class ConfigValidation {
}
public static class MetricReportersValidator extends Validator {
+ private static final String NIMBUS = "nimbus";
+ private static final String SUPERVISOR = "supervisor";
+ private static final String WORKER = "worker";
+ private static final String CLASS = "class";
+ private static final String FILTER = "filter";
+ private static final String DAEMONS = "daemons";
@Override
public void validateField(String name, Object o) {
@@ -501,23 +507,23 @@ public class ConfigValidation {
return;
}
SimpleTypeValidator.validateField(name, Map.class, o);
- if(!((Map) o).containsKey("class") ) {
+ if(!((Map) o).containsKey(CLASS) ) {
throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class");
}
- if(!((Map) o).containsKey("daemons") ) {
+ if(!((Map) o).containsKey(DAEMONS) ) {
throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons");
} else {
// daemons can only be 'nimbus', 'supervisor', or 'worker'
- Object list = ((Map)o).get("daemons");
- if(list == null || !(list instanceof List)){
+ Object list = ((Map)o).get(DAEMONS);
+ if(!(list instanceof List)){
throw new IllegalArgumentException("Field 'daemons' must be a non-null list.");
}
List daemonList = (List)list;
for(Object string : daemonList){
if (string instanceof String &&
- (((String) string).equals("nimbus") ||
- ((String) string).equals("supervisor") ||
- ((String) string).equals("worker"))) {
+ (string.equals(NIMBUS) ||
+ string.equals(SUPERVISOR) ||
+ string.equals(WORKER))) {
continue;
}
throw new IllegalArgumentException("Field 'daemons' must contain at least one of the following:" +
@@ -525,11 +531,11 @@ public class ConfigValidation {
}
}
- if(((Map)o).containsKey("filter")){
- Map filterMap = (Map)((Map)o).get("filter");
- SimpleTypeValidator.validateField("class", String.class, filterMap.get("class"));
+ if(((Map)o).containsKey(FILTER)){
+ Map filterMap = (Map)((Map)o).get(FILTER);
+ SimpleTypeValidator.validateField(CLASS, String.class, filterMap.get(CLASS));
}
- SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
+ SimpleTypeValidator.validateField(name, String.class, ((Map) o).get(CLASS));
}
}