You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/24 03:30:46 UTC

[1/5] storm git commit: [STORM-1111] - Fix Validation for lots of different configs

Repository: storm
Updated Branches:
  refs/heads/master b3bc585a5 -> ff3b8affa


[STORM-1111] - Fix Validation for lots of different configs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/80df44ba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/80df44ba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/80df44ba

Branch: refs/heads/master
Commit: 80df44ba5fcfa63dc52bb919c8b0366350e35e58
Parents: 26f966c
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Mon Oct 19 11:31:59 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Mon Oct 19 11:32:48 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 94 +++++++++++++++-----
 .../storm/validation/ConfigValidation.java      | 74 ++++++++++++---
 .../validation/ConfigValidationAnnotations.java |  2 +
 .../jvm/backtype/storm/TestConfigValidate.java  | 78 ++++++++++++++++
 4 files changed, 213 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index a521b10..ea58538 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -66,12 +66,14 @@ public class Config extends HashMap<String, Object> {
      * Netty based messaging: The buffer size for send/recv buffer
      */
     @isInteger
+    @isPositiveNumber
     public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
 
     /**
      * Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
      */
     @isInteger
+    @isPositiveNumber
     public static final String STORM_MESSAGING_NETTY_SOCKET_BACKLOG = "storm.messaging.netty.socket.backlog";
 
     /**
@@ -86,18 +88,21 @@ public class Config extends HashMap<String, Object> {
      * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
     @isInteger
+    @isPositiveNumber(includeZero = true)
     public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
 
     /**
      * Netty based messaging: The max # of milliseconds that a peer will wait.
      */
     @isInteger
+    @isPositiveNumber(includeZero = true)
     public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
 
     /**
      * Netty based messaging: The # of worker threads for the server.
      */
     @isInteger
+    @isPositiveNumber(includeZero = true)
     public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
 
     /**
@@ -141,6 +146,7 @@ public class Config extends HashMap<String, Object> {
      * The port Storm will use to connect to each of the ZooKeeper servers.
      */
     @isInteger
+    @isPositiveNumber
     public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
 
     /**
@@ -208,7 +214,7 @@ public class Config extends HashMap<String, Object> {
     /**
      * Max no.of seconds group mapping service will cache user groups
      */
-    @isNumber
+    @isInteger
     public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
 
     /**
@@ -247,6 +253,7 @@ public class Config extends HashMap<String, Object> {
      *
      * Defaults to false.
      */
+    @Deprecated
     @isBoolean
     public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
 
@@ -365,12 +372,14 @@ public class Config extends HashMap<String, Object> {
      * connect to this port to upload jars and submit topologies.
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
 
     /**
      * The number of threads that should be used by the nimbus thrift server.
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_THRIFT_THREADS = "nimbus.thrift.threads";
 
     /**
@@ -406,6 +415,7 @@ public class Config extends HashMap<String, Object> {
      * The maximum buffer size thrift should use when reading messages.
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
 
     /**
@@ -421,6 +431,7 @@ public class Config extends HashMap<String, Object> {
      * task dead and reassign it to another location.
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
 
 
@@ -431,6 +442,7 @@ public class Config extends HashMap<String, Object> {
      * occuring.
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
 
     /**
@@ -438,6 +450,7 @@ public class Config extends HashMap<String, Object> {
      * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
 
     /**
@@ -457,6 +470,7 @@ public class Config extends HashMap<String, Object> {
      * and stops assigning new work to it.
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
 
     /**
@@ -467,6 +481,7 @@ public class Config extends HashMap<String, Object> {
      * to launching new JVM's and configuring them.</p>
      */
     @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
 
     /**
@@ -506,13 +521,14 @@ public class Config extends HashMap<String, Object> {
     /**
      * Impersonation user ACL config entries.
      */
-    @isImpersonationAcl
+    @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {ImpersonationAclUserEntryValidator.class})
     public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
 
     /**
      * How often nimbus should wake up to renew credentials if needed.
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String NIMBUS_CREDENTIAL_RENEW_FREQ_SECS = "nimbus.credential.renewers.freq.secs";
 
     /**
@@ -538,12 +554,14 @@ public class Config extends HashMap<String, Object> {
      * Storm UI binds to this port.
      */
     @isInteger
+    @isPositiveNumber
     public static final String UI_PORT = "ui.port";
 
     /**
      * HTTP UI port for log viewer
      */
     @isInteger
+    @isPositiveNumber
     public static final String LOGVIEWER_PORT = "logviewer.port";
 
     /**
@@ -569,7 +587,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * Storm Logviewer HTTPS port
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";
 
     /**
@@ -658,19 +677,21 @@ public class Config extends HashMap<String, Object> {
     /**
      * Initialization parameters for the javax.servlet.Filter
      */
-    @isType(type=Map.class)
+    @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String UI_FILTER_PARAMS = "ui.filter.params";
 
     /**
      * The size of the header buffer for the UI in bytes
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String UI_HEADER_BUFFER_BYTES = "ui.header.buffer.bytes";
 
     /**
      * This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String UI_HTTPS_PORT = "ui.https.port";
 
     /**
@@ -735,13 +756,13 @@ public class Config extends HashMap<String, Object> {
     /**
      * This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
      */
-    @isNumber
+    @isInteger
     public static final String DRPC_HTTP_PORT = "drpc.http.port";
 
     /**
      * This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
      */
-    @isNumber
+    @isInteger
     public static final String DRPC_HTTPS_PORT = "drpc.https.port";
 
     /**
@@ -807,6 +828,7 @@ public class Config extends HashMap<String, Object> {
      * This port is used by Storm DRPC for receiving DPRC requests from clients.
      */
     @isInteger
+    @isPositiveNumber
     public static final String DRPC_PORT = "drpc.port";
 
     /**
@@ -845,18 +867,21 @@ public class Config extends HashMap<String, Object> {
      * DRPC thrift server worker threads
      */
     @isInteger
+    @isPositiveNumber
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
 
     /**
      * The maximum buffer size thrift should use when reading messages for DRPC.
      */
     @isNumber
+    @isPositiveNumber
     public static final String DRPC_MAX_BUFFER_SIZE = "drpc.max_buffer_size";
 
     /**
      * DRPC thrift server queue size
      */
     @isInteger
+    @isPositiveNumber
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
 
     /**
@@ -869,12 +894,14 @@ public class Config extends HashMap<String, Object> {
      * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
      */
     @isInteger
+    @isPositiveNumber
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
 
     /**
      * DRPC invocations thrift server worker threads
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
 
     /**
@@ -925,7 +952,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * A number representing the maximum number of workers any single topology can acquire.
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber(includeZero = true)
     public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
 
     /**
@@ -938,13 +966,14 @@ public class Config extends HashMap<String, Object> {
      * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
      * service
      */
-    @isType(type=Map.class)
+    @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
 
     /**
      * A number representing the maximum number of executors any single topology can acquire.
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber(includeZero = true)
     public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
 
     /**
@@ -967,6 +996,7 @@ public class Config extends HashMap<String, Object> {
      * How many seconds to sleep for before shutting down threads on worker
      */
     @isInteger
+    @isPositiveNumber
     public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs";
 
     /**
@@ -1000,6 +1030,7 @@ public class Config extends HashMap<String, Object> {
      * need to be restarted.
      */
     @isInteger
+    @isPositiveNumber
     public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
 
     /**
@@ -1054,18 +1085,21 @@ public class Config extends HashMap<String, Object> {
      * control how many worker receiver threads we need per worker
      */
     @isInteger
+    @isPositiveNumber
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
 
     /**
      * How often this worker should heartbeat to the supervisor.
      */
     @isInteger
+    @isPositiveNumber
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
 
     /**
      * How often a task should heartbeat its status to the master.
      */
     @isInteger
+    @isPositiveNumber
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
 
     /**
@@ -1076,6 +1110,7 @@ public class Config extends HashMap<String, Object> {
      * come through.
      */
     @isInteger
+    @isPositiveNumber
     public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
 
     /**
@@ -1090,7 +1125,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * How often a task should sync credentials, worst case.
      */
-    @isNumber
+    @isInteger
+    @isPositiveNumber
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 
     /**
@@ -1157,6 +1193,7 @@ public class Config extends HashMap<String, Object> {
      * on each component in the topology to tune the performance of a topology.
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_WORKERS = "topology.workers";
 
     /**
@@ -1168,6 +1205,7 @@ public class Config extends HashMap<String, Object> {
      * guaranteeing that the same value goes to the same task).
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_TASKS = "topology.tasks";
 
     /**
@@ -1206,6 +1244,7 @@ public class Config extends HashMap<String, Object> {
      * then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.</p>
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
 
     /**
@@ -1215,6 +1254,7 @@ public class Config extends HashMap<String, Object> {
      * to be equal to the number of workers configured for this topology. If this variable is set to 0,
      * event logging will be disabled.</p>
      */
+    @isPositiveNumber
     @isInteger
     public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
 
@@ -1276,19 +1316,19 @@ public class Config extends HashMap<String, Object> {
      * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
      */
 
-    @isListEntryType(type=Map.class)
+    @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class})
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
 
     /**
      * A map of metric name to class name implementing IMetric that will be created once per worker JVM
      */
-    @isType(type=Map.class)
+    @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String TOPOLOGY_WORKER_METRICS = "topology.worker.metrics";
 
     /**
      * A map of metric name to class name implementing IMetric that will be created once per worker JVM
      */
-    @isType(type=Map.class)
+    @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String WORKER_METRICS = "worker.metrics";
 
     /**
@@ -1296,6 +1336,7 @@ public class Config extends HashMap<String, Object> {
      * typically used in testing to limit the number of threads spawned in local mode.
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
 
     /**
@@ -1307,6 +1348,7 @@ public class Config extends HashMap<String, Object> {
      * their tuples with a message id.
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
 
     /**
@@ -1323,6 +1365,7 @@ public class Config extends HashMap<String, Object> {
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
     @isInteger
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
 
     /**
@@ -1380,7 +1423,7 @@ public class Config extends HashMap<String, Object> {
      * Topology-specific environment variables for the worker child process.
      * This is added to the existing environment (that of the supervisor)
      */
-    @isType(type=Map.class)
+    @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
 
     /*
@@ -1425,6 +1468,7 @@ public class Config extends HashMap<String, Object> {
      * The size of the Disruptor transfer queue for each worker.
      */
     @isInteger
+    @isPowerOf2
     public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
 
     /**
@@ -1460,12 +1504,14 @@ public class Config extends HashMap<String, Object> {
      * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
 
     /**
      * How often a batch can be emitted in a Trident topology.
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
 
     /**
@@ -1502,6 +1548,7 @@ public class Config extends HashMap<String, Object> {
      * Max pending tuples in one ShellBolt
      */
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
 
     /**
@@ -1512,7 +1559,7 @@ public class Config extends HashMap<String, Object> {
      *   S2 - Confidential
      *   S3 - Secret (default.)
      */
-    @isString
+    @isString(acceptedValues = {"S0", "S1", "S2", "S3"})
     public static final String TOPOLOGY_LOGGING_SENSITIVITY="topology.logging.sensitivity";
 
     /**
@@ -1532,6 +1579,7 @@ public class Config extends HashMap<String, Object> {
      * The port to use to connect to the transactional zookeeper servers. If null (which is default),
      * will use storm.zookeeper.port
      */
+    @isPositiveNumber
     @isInteger
     public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
 
@@ -1544,6 +1592,7 @@ public class Config extends HashMap<String, Object> {
     /**
      * The number of threads that should be used by the zeromq context in each worker process.
      */
+    @Deprecated
     @isInteger
     public static final String ZMQ_THREADS = "zmq.threads";
 
@@ -1552,6 +1601,7 @@ public class Config extends HashMap<String, Object> {
      * the connection is closed. This is an advanced configuration and can almost
      * certainly be ignored.
      */
+    @Deprecated
     @isInteger
     public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
 
@@ -1559,6 +1609,7 @@ public class Config extends HashMap<String, Object> {
      * The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion
      * on the networking layer.
      */
+    @Deprecated
     @isInteger
     public static final String ZMQ_HWM = "zmq.hwm";
 
@@ -1597,7 +1648,8 @@ public class Config extends HashMap<String, Object> {
      * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
-    @isNumber
+    @isPositiveNumber
+    @isInteger
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index b9244ef..406362f 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -25,8 +25,10 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -41,7 +43,7 @@ public class ConfigValidation {
     public static abstract class Validator {
         public Validator(Map<String, Object> params) {}
         public Validator() {}
-        public abstract void validateField(String name, Object o) throws InstantiationException, IllegalAccessException;
+        public abstract void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException;
     }
 
     /**
@@ -91,9 +93,27 @@ public class ConfigValidation {
 
     public static class StringValidator extends Validator {
 
+        private HashSet<String> acceptedValues = null;
+
+        public StringValidator(){}
+
+        public StringValidator(Map<String, Object> params) {
+
+            this.acceptedValues = new HashSet<String>(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
+
+            if(this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) {
+                this.acceptedValues = null;
+            }
+        }
+
         @Override
         public void validateField(String name, Object o) {
             SimpleTypeValidator.validateField(name, String.class, o);
+            if(this.acceptedValues != null) {
+                if (!this.acceptedValues.contains((String) o)) {
+                    throw new IllegalArgumentException("Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues);
+                }
+            }
         }
     }
 
@@ -147,20 +167,26 @@ public class ConfigValidation {
     }
 
     /**
-     * Validates a map of Strings to a map of Strings to a list.
-     * {str -> {str -> [str,str]}
+     * Validates an entry for ImpersonationAclUser
      */
-    public static class ImpersonationAclValidator extends Validator {
+    public static class ImpersonationAclUserEntryValidator extends Validator {
 
         @Override
-        public void validateField(String name, Object o) {
+        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
+            LOG.info("object: {}", o);
             ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
-                    ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
-                            ConfigValidationUtils.listFv(String.class, false), false), true);
+                    ConfigValidationUtils.listFv(String.class, false), false);
             validator.validateField(name, o);
+            Map<String, List<String>> mapObject = (Map<String, List<String>>) o;
+            if (!mapObject.containsKey("hosts")) {
+                throw new IllegalArgumentException(name + " should contain Map entry with key: hosts");
+            }
+            if (!mapObject.containsKey("groups")) {
+                throw new IllegalArgumentException(name + " should contain Map entry with key: groups");
+            }
         }
     }
 
@@ -293,12 +319,12 @@ public class ConfigValidation {
         }
 
         @Override
-        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException {
+        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
 
             validateField(name, this.entryValidators, o);
         }
 
-        public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException {
+        public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
@@ -306,7 +332,7 @@ public class ConfigValidation {
             SimpleTypeValidator.validateField(name, Iterable.class, o);
             for (Object entry : (Iterable) o) {
                 for (Class validator : validators) {
-                    Object v = validator.newInstance();
+                    Object v = validator.getConstructor().newInstance();
                     if (v instanceof Validator) {
                         ((Validator) v).validateField(name + " list entry", entry);
                     } else {
@@ -355,11 +381,11 @@ public class ConfigValidation {
         }
 
         @Override
-        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException {
+        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
             validateField(name, this.keyValidators, this.valueValidators, o);
         }
 
-        public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException {
+        public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
@@ -367,7 +393,7 @@ public class ConfigValidation {
             SimpleTypeValidator.validateField(name, Map.class, o);
             for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
                 for (Class kv : keyValidators) {
-                    Object keyValidator = kv.newInstance();
+                    Object keyValidator = kv.getConstructor().newInstance();
                     if (keyValidator instanceof Validator) {
                         ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
                     } else {
@@ -375,7 +401,7 @@ public class ConfigValidation {
                     }
                 }
                 for (Class vv : valueValidators) {
-                    Object valueValidator = vv.newInstance();
+                    Object valueValidator = vv.getConstructor().newInstance();;
                     if (valueValidator instanceof Validator) {
                         ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
                     } else {
@@ -425,6 +451,26 @@ public class ConfigValidation {
         }
     }
 
+    public static class MetricRegistryValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) throws IllegalAccessException {
+            if(o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, Map.class, o);
+            if(!((Map) o).containsKey("class") ) {
+                throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
+            }
+            if(!((Map) o).containsKey("parallelism.hint") ) {
+                throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
+            }
+
+            SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
+            SimpleTypeValidator.validateField(name, long.class, ((Map) o).get("parallelism.hint"));
+        }
+    }
+
     /**
      * Methods for validating confs
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index 0b64479..44d9c51 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -45,6 +45,7 @@ public class ConfigValidationAnnotations {
         static final String KEY_TYPE = "keyType";
         static final String VALUE_TYPE = "valueType";
         static final String INCLUDE_ZERO = "includeZero";
+        static final String ACCEPTED_VALUES = "acceptedValues";
     }
 
     /**
@@ -86,6 +87,7 @@ public class ConfigValidationAnnotations {
     @Target(ElementType.FIELD)
     public @interface isString {
         Class validatorClass() default ConfigValidation.StringValidator.class;
+        String[] acceptedValues() default "";
     }
 
     @Retention(RetentionPolicy.RUNTIME)

http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 3ac1d47..0f53634 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -543,6 +543,78 @@ public class TestConfigValidate {
         }
     }
 
+    @Test
+    public void TestAcceptedStrings() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        TestConfig config = new TestConfig();
+        String[] passCases = {"aaa", "bbb", "ccc"};
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_5, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        String[] failCases = {"aa", "bb", "cc", "abc", "a", "b", "c", ""};
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_5, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void TestImpersonationAclUserEntryValidator() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+
+        Map<String, Map<String, List<String>>> passCase1 = new HashMap<String, Map<String, List<String>>>();
+        Map<String, List<String>> passCase1_hostsAndGroups = new HashMap<String, List<String>>();
+        String[] hosts = {"host.1", "host.2", "host.3"};
+        passCase1_hostsAndGroups.put("hosts", Arrays.asList(hosts));
+        String[] groups = {"group.1", "group.2", "group.3"};
+        passCase1_hostsAndGroups.put("groups", Arrays.asList(groups));
+        passCase1.put("jerry", passCase1_hostsAndGroups);
+        passCases.add(passCase1);
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_6, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        Map<String, Map<String, List<String>>> failCase1 = new HashMap<String, Map<String, List<String>>>();
+        Map<String, List<String>> failCase1_hostsAndGroups = new HashMap<String, List<String>>();
+        String[] failhosts = {"host.1", "host.2", "host.3"};
+        failCase1_hostsAndGroups.put("hosts", Arrays.asList(hosts));
+        failCase1.put("jerry", failCase1_hostsAndGroups);
+
+
+        Map<String, Map<String, List<String>>> failCase2 = new HashMap<String, Map<String, List<String>>>();
+        Map<String, List<String>> failCase2_hostsAndGroups = new HashMap<String, List<String>>();
+        String[] failgroups = {"group.1", "group.2", "group.3"};
+        failCase2_hostsAndGroups.put("groups", Arrays.asList(groups));
+        failCase2.put("jerry", failCase2_hostsAndGroups);
+
+        failCases.add(failCase1);
+        failCases.add(failCase2);
+        failCases.add("stuff");
+        failCases.add(5);
+
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_6, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+
+
+
+    }
+
     public class TestConfig extends HashMap<String, Object> {
         @isMapEntryType(keyType = String.class, valueType = Integer.class)
         public static final String TEST_MAP_CONFIG = "test.map.config";
@@ -560,5 +632,11 @@ public class TestConfigValidate {
                 entryValidatorClasses = {PositiveNumberValidator.class, NotNullValidator.class})
         @isNoDuplicateInList
         public static final String TEST_MAP_CONFIG_4 = "test.map.config.4";
+
+        @isString(acceptedValues = {"aaa", "bbb", "ccc"})
+        public static final String TEST_MAP_CONFIG_5 = "test.map.config.5";
+
+        @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {ImpersonationAclUserEntryValidator.class})
+        public static final String TEST_MAP_CONFIG_6 = "test.map.config.6";
     }
 }


[3/5] storm git commit: addressing comments

Posted by ka...@apache.org.
addressing comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b3e8617
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b3e8617
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b3e8617

Branch: refs/heads/master
Commit: 2b3e8617b88779c20524d87fe4d546ba59cc888f
Parents: b57c830
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Oct 23 17:09:24 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Oct 23 17:10:09 2015 -0500

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/validation/ConfigValidation.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2b3e8617/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 406362f..1b0cd7f 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -401,7 +401,7 @@ public class ConfigValidation {
                     }
                 }
                 for (Class vv : valueValidators) {
-                    Object valueValidator = vv.getConstructor().newInstance();;
+                    Object valueValidator = vv.getConstructor().newInstance();
                     if (valueValidator instanceof Validator) {
                         ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
                     } else {
@@ -463,7 +463,7 @@ public class ConfigValidation {
                 throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
             }
             if(!((Map) o).containsKey("parallelism.hint") ) {
-                throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
+                throw new IllegalAccessException("Field " + name + " must have map entry with key: parallelism.hint");
             }
 
             SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));


[5/5] storm git commit: add STORM-1111 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1111 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ff3b8aff
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ff3b8aff
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ff3b8aff

Branch: refs/heads/master
Commit: ff3b8affaeed5b94fa5b3e57dee54ca48569d738
Parents: aca8bfb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 10:30:35 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 10:30:35 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ff3b8aff/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5d308d7..8adfa0e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1111: Fix Validation for lots of different configs
  * STORM-1125: Adding separate ZK client for read in Nimbus ZK State
  * STORM-1121: Remove method call to avoid overhead during topology submission time
  * STORM-1120: Fix keyword (schema -> scheme) from main-routes


[2/5] storm git commit: delete unneccessary annotations

Posted by ka...@apache.org.
delete unneccessary annotations


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b57c8305
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b57c8305
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b57c8305

Branch: refs/heads/master
Commit: b57c8305460f4de46d92129c961ed46a53a216db
Parents: 80df44b
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Mon Oct 19 12:44:41 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Oct 22 11:27:56 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java                 | 7 +++----
 .../storm/validation/ConfigValidationAnnotations.java         | 6 ------
 2 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b57c8305/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ea58538..5f5fdec 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -253,7 +253,6 @@ public class Config extends HashMap<String, Object> {
      *
      * Defaults to false.
      */
-    @Deprecated
     @isBoolean
     public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
 
@@ -1254,8 +1253,8 @@ public class Config extends HashMap<String, Object> {
      * to be equal to the number of workers configured for this topology. If this variable is set to 0,
      * event logging will be disabled.</p>
      */
-    @isPositiveNumber
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
 
     /**
@@ -1579,8 +1578,8 @@ public class Config extends HashMap<String, Object> {
      * The port to use to connect to the transactional zookeeper servers. If null (which is default),
      * will use storm.zookeeper.port
      */
-    @isPositiveNumber
     @isInteger
+    @isPositiveNumber
     public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
 
     /**
@@ -1648,8 +1647,8 @@ public class Config extends HashMap<String, Object> {
      * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
-    @isPositiveNumber
     @isInteger
+    @isPositiveNumber
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/b57c8305/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index 44d9c51..ed93370 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -185,12 +185,6 @@ public class ConfigValidationAnnotations {
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
-    public @interface isImpersonationAcl {
-        Class validatorClass() default ConfigValidation.ImpersonationAclValidator.class;
-    }
-
-    @Retention(RetentionPolicy.RUNTIME)
-    @Target(ElementType.FIELD)
     public @interface isStringOrStringList {
         Class validatorClass() default ConfigValidation.StringOrStringListValidator.class;
     }


[4/5] storm git commit: Merge branch 'STORM-1111' of https://github.com/jerrypeng/storm into STORM-1111

Posted by ka...@apache.org.
Merge branch 'STORM-1111' of https://github.com/jerrypeng/storm into STORM-1111


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aca8bfb7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aca8bfb7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aca8bfb7

Branch: refs/heads/master
Commit: aca8bfb7dd76997340dc3d6a54479686a3ee31b0
Parents: b3bc585 2b3e861
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 10:29:23 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 10:29:23 2015 +0900

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 93 +++++++++++++++-----
 .../storm/validation/ConfigValidation.java      | 74 +++++++++++++---
 .../validation/ConfigValidationAnnotations.java |  8 +-
 .../jvm/backtype/storm/TestConfigValidate.java  | 78 ++++++++++++++++
 4 files changed, 212 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aca8bfb7/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------