You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:29 UTC

[23/38] storm git commit: STORM-2153: address review comments

STORM-2153: address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/868de5b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/868de5b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/868de5b3

Branch: refs/heads/1.x-branch
Commit: 868de5b33b8145d787a9b3d08bdac6908591790d
Parents: b5ae9c3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Dec 22 14:42:35 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Dec 22 14:42:35 2017 -0500

----------------------------------------------------------------------
 .../reporters/ScheduledStormReporter.java       |  2 +-
 .../org/apache/storm/utils/DisruptorQueue.java  | 18 ++++++++-----
 .../storm/validation/ConfigValidation.java      | 28 ++++++++++++--------
 3 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
index e88b41b..b7ffa61 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -81,7 +81,7 @@ public abstract class ScheduledStormReporter implements StormReporter{
                     filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz);
                     filter.prepare(filterConf);
                 } catch (Exception e) {
-                    LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz);
+                    throw new RuntimeException("Unable to instantiate StormMetricsFilter class: " + clazz);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index ca8568c..d7cf401 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -437,12 +437,17 @@ public class DisruptorQueue implements IStatefulObject {
 
         _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
         _flusher.start();
-        METRICS_TIMER.schedule(new TimerTask(){
-            @Override
-            public void run() {
-                _disruptorMetrics.set(_metrics);
-            }
-        }, 15000, 15000);
+        try {
+            METRICS_TIMER.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    _disruptorMetrics.set(_metrics);
+                }
+            }, 15000, 15000);
+        } catch (IllegalStateException e){
+            // Ignore. IllegalStateException is thrown by Timer.schedule() if the timer
+            // has been cancelled. (This happens in unit tests)
+        }
     }
 
     public String getName() {
@@ -458,6 +463,7 @@ public class DisruptorQueue implements IStatefulObject {
             publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
             _flusher.close();
             _metrics.close();
+            METRICS_TIMER.cancel();
         } catch (InsufficientCapacityException e) {
             //This should be impossible
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
index d7ca48d..9d9db33 100644
--- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -494,6 +494,12 @@ public class ConfigValidation {
     }
 
     public static class MetricReportersValidator extends Validator {
+        private static final String NIMBUS = "nimbus";
+        private static final String SUPERVISOR = "supervisor";
+        private static final String WORKER = "worker";
+        private static final String CLASS = "class";
+        private static final String FILTER = "filter";
+        private static final String DAEMONS = "daemons";
 
         @Override
         public void validateField(String name, Object o) {
@@ -501,23 +507,23 @@ public class ConfigValidation {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if(!((Map) o).containsKey("class") ) {
+            if(!((Map) o).containsKey(CLASS) ) {
                 throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class");
             }
-            if(!((Map) o).containsKey("daemons") ) {
+            if(!((Map) o).containsKey(DAEMONS) ) {
                 throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons");
             } else {
                 // daemons can only be 'nimbus', 'supervisor', or 'worker'
-                Object list = ((Map)o).get("daemons");
-                if(list == null || !(list instanceof List)){
+                Object list = ((Map)o).get(DAEMONS);
+                if(!(list instanceof List)){
                     throw new IllegalArgumentException("Field 'daemons' must be a non-null list.");
                 }
                 List daemonList = (List)list;
                 for(Object string : daemonList){
                     if (string instanceof String &&
-                            (((String) string).equals("nimbus") ||
-                                    ((String) string).equals("supervisor") ||
-                                    ((String) string).equals("worker"))) {
+                            (string.equals(NIMBUS) ||
+                                    string.equals(SUPERVISOR) ||
+                                    string.equals(WORKER))) {
                         continue;
                     }
                     throw new IllegalArgumentException("Field 'daemons' must contain at least one of the following:" +
@@ -525,11 +531,11 @@ public class ConfigValidation {
                 }
 
             }
-            if(((Map)o).containsKey("filter")){
-                Map filterMap = (Map)((Map)o).get("filter");
-                SimpleTypeValidator.validateField("class", String.class, filterMap.get("class"));
+            if(((Map)o).containsKey(FILTER)){
+                Map filterMap = (Map)((Map)o).get(FILTER);
+                SimpleTypeValidator.validateField(CLASS, String.class, filterMap.get(CLASS));
             }
-            SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
+            SimpleTypeValidator.validateField(name, String.class, ((Map) o).get(CLASS));
 
         }
     }