You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/24 03:30:46 UTC
[1/5] storm git commit: [STORM-1111] - Fix Validation for lots of
different configs
Repository: storm
Updated Branches:
refs/heads/master b3bc585a5 -> ff3b8affa
[STORM-1111] - Fix Validation for lots of different configs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/80df44ba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/80df44ba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/80df44ba
Branch: refs/heads/master
Commit: 80df44ba5fcfa63dc52bb919c8b0366350e35e58
Parents: 26f966c
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Mon Oct 19 11:31:59 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Mon Oct 19 11:32:48 2015 -0500
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 94 +++++++++++++++-----
.../storm/validation/ConfigValidation.java | 74 ++++++++++++---
.../validation/ConfigValidationAnnotations.java | 2 +
.../jvm/backtype/storm/TestConfigValidate.java | 78 ++++++++++++++++
4 files changed, 213 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index a521b10..ea58538 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -66,12 +66,14 @@ public class Config extends HashMap<String, Object> {
* Netty based messaging: The buffer size for send/recv buffer
*/
@isInteger
+ @isPositiveNumber
public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
/**
* Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
*/
@isInteger
+ @isPositiveNumber
public static final String STORM_MESSAGING_NETTY_SOCKET_BACKLOG = "storm.messaging.netty.socket.backlog";
/**
@@ -86,18 +88,21 @@ public class Config extends HashMap<String, Object> {
* Netty based messaging: The min # of milliseconds that a peer will wait.
*/
@isInteger
+ @isPositiveNumber(includeZero = true)
public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
/**
* Netty based messaging: The max # of milliseconds that a peer will wait.
*/
@isInteger
+ @isPositiveNumber(includeZero = true)
public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
/**
* Netty based messaging: The # of worker threads for the server.
*/
@isInteger
+ @isPositiveNumber(includeZero = true)
public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
/**
@@ -141,6 +146,7 @@ public class Config extends HashMap<String, Object> {
* The port Storm will use to connect to each of the ZooKeeper servers.
*/
@isInteger
+ @isPositiveNumber
public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
/**
@@ -208,7 +214,7 @@ public class Config extends HashMap<String, Object> {
/**
* Max no.of seconds group mapping service will cache user groups
*/
- @isNumber
+ @isInteger
public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
/**
@@ -247,6 +253,7 @@ public class Config extends HashMap<String, Object> {
*
* Defaults to false.
*/
+ @Deprecated
@isBoolean
public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
@@ -365,12 +372,14 @@ public class Config extends HashMap<String, Object> {
* connect to this port to upload jars and submit topologies.
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
/**
* The number of threads that should be used by the nimbus thrift server.
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String NIMBUS_THRIFT_THREADS = "nimbus.thrift.threads";
/**
@@ -406,6 +415,7 @@ public class Config extends HashMap<String, Object> {
* The maximum buffer size thrift should use when reading messages.
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
/**
@@ -421,6 +431,7 @@ public class Config extends HashMap<String, Object> {
* task dead and reassign it to another location.
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
@@ -431,6 +442,7 @@ public class Config extends HashMap<String, Object> {
* occuring.
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
/**
@@ -438,6 +450,7 @@ public class Config extends HashMap<String, Object> {
* @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
/**
@@ -457,6 +470,7 @@ public class Config extends HashMap<String, Object> {
* and stops assigning new work to it.
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
/**
@@ -467,6 +481,7 @@ public class Config extends HashMap<String, Object> {
* to launching new JVM's and configuring them.</p>
*/
@isInteger
+ @isPositiveNumber
public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
/**
@@ -506,13 +521,14 @@ public class Config extends HashMap<String, Object> {
/**
* Impersonation user ACL config entries.
*/
- @isImpersonationAcl
+ @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {ImpersonationAclUserEntryValidator.class})
public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
/**
* How often nimbus should wake up to renew credentials if needed.
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String NIMBUS_CREDENTIAL_RENEW_FREQ_SECS = "nimbus.credential.renewers.freq.secs";
/**
@@ -538,12 +554,14 @@ public class Config extends HashMap<String, Object> {
* Storm UI binds to this port.
*/
@isInteger
+ @isPositiveNumber
public static final String UI_PORT = "ui.port";
/**
* HTTP UI port for log viewer
*/
@isInteger
+ @isPositiveNumber
public static final String LOGVIEWER_PORT = "logviewer.port";
/**
@@ -569,7 +587,8 @@ public class Config extends HashMap<String, Object> {
/**
* Storm Logviewer HTTPS port
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";
/**
@@ -658,19 +677,21 @@ public class Config extends HashMap<String, Object> {
/**
* Initialization parameters for the javax.servlet.Filter
*/
- @isType(type=Map.class)
+ @isMapEntryType(keyType = String.class, valueType = String.class)
public static final String UI_FILTER_PARAMS = "ui.filter.params";
/**
* The size of the header buffer for the UI in bytes
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String UI_HEADER_BUFFER_BYTES = "ui.header.buffer.bytes";
/**
* This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String UI_HTTPS_PORT = "ui.https.port";
/**
@@ -735,13 +756,13 @@ public class Config extends HashMap<String, Object> {
/**
* This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
*/
- @isNumber
+ @isInteger
public static final String DRPC_HTTP_PORT = "drpc.http.port";
/**
* This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
*/
- @isNumber
+ @isInteger
public static final String DRPC_HTTPS_PORT = "drpc.https.port";
/**
@@ -807,6 +828,7 @@ public class Config extends HashMap<String, Object> {
* This port is used by Storm DRPC for receiving DPRC requests from clients.
*/
@isInteger
+ @isPositiveNumber
public static final String DRPC_PORT = "drpc.port";
/**
@@ -845,18 +867,21 @@ public class Config extends HashMap<String, Object> {
* DRPC thrift server worker threads
*/
@isInteger
+ @isPositiveNumber
public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
/**
* The maximum buffer size thrift should use when reading messages for DRPC.
*/
@isNumber
+ @isPositiveNumber
public static final String DRPC_MAX_BUFFER_SIZE = "drpc.max_buffer_size";
/**
* DRPC thrift server queue size
*/
@isInteger
+ @isPositiveNumber
public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
/**
@@ -869,12 +894,14 @@ public class Config extends HashMap<String, Object> {
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
@isInteger
+ @isPositiveNumber
public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
/**
* DRPC invocations thrift server worker threads
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
/**
@@ -925,7 +952,8 @@ public class Config extends HashMap<String, Object> {
/**
* A number representing the maximum number of workers any single topology can acquire.
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber(includeZero = true)
public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
/**
@@ -938,13 +966,14 @@ public class Config extends HashMap<String, Object> {
* Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
* service
*/
- @isType(type=Map.class)
+ @isMapEntryType(keyType = String.class, valueType = String.class)
public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
/**
* A number representing the maximum number of executors any single topology can acquire.
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber(includeZero = true)
public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
/**
@@ -967,6 +996,7 @@ public class Config extends HashMap<String, Object> {
* How many seconds to sleep for before shutting down threads on worker
*/
@isInteger
+ @isPositiveNumber
public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs";
/**
@@ -1000,6 +1030,7 @@ public class Config extends HashMap<String, Object> {
* need to be restarted.
*/
@isInteger
+ @isPositiveNumber
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
/**
@@ -1054,18 +1085,21 @@ public class Config extends HashMap<String, Object> {
* control how many worker receiver threads we need per worker
*/
@isInteger
+ @isPositiveNumber
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
/**
* How often this worker should heartbeat to the supervisor.
*/
@isInteger
+ @isPositiveNumber
public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
/**
* How often a task should heartbeat its status to the master.
*/
@isInteger
+ @isPositiveNumber
public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
/**
@@ -1076,6 +1110,7 @@ public class Config extends HashMap<String, Object> {
* come through.
*/
@isInteger
+ @isPositiveNumber
public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
/**
@@ -1090,7 +1125,8 @@ public class Config extends HashMap<String, Object> {
/**
* How often a task should sync credentials, worst case.
*/
- @isNumber
+ @isInteger
+ @isPositiveNumber
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
/**
@@ -1157,6 +1193,7 @@ public class Config extends HashMap<String, Object> {
* on each component in the topology to tune the performance of a topology.
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_WORKERS = "topology.workers";
/**
@@ -1168,6 +1205,7 @@ public class Config extends HashMap<String, Object> {
* guaranteeing that the same value goes to the same task).
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_TASKS = "topology.tasks";
/**
@@ -1206,6 +1244,7 @@ public class Config extends HashMap<String, Object> {
* then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.</p>
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
/**
@@ -1215,6 +1254,7 @@ public class Config extends HashMap<String, Object> {
* to be equal to the number of workers configured for this topology. If this variable is set to 0,
* event logging will be disabled.</p>
*/
+ @isPositiveNumber
@isInteger
public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
@@ -1276,19 +1316,19 @@ public class Config extends HashMap<String, Object> {
* Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
*/
- @isListEntryType(type=Map.class)
+ @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class})
public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
/**
* A map of metric name to class name implementing IMetric that will be created once per worker JVM
*/
- @isType(type=Map.class)
+ @isMapEntryType(keyType = String.class, valueType = String.class)
public static final String TOPOLOGY_WORKER_METRICS = "topology.worker.metrics";
/**
* A map of metric name to class name implementing IMetric that will be created once per worker JVM
*/
- @isType(type=Map.class)
+ @isMapEntryType(keyType = String.class, valueType = String.class)
public static final String WORKER_METRICS = "worker.metrics";
/**
@@ -1296,6 +1336,7 @@ public class Config extends HashMap<String, Object> {
* typically used in testing to limit the number of threads spawned in local mode.
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
/**
@@ -1307,6 +1348,7 @@ public class Config extends HashMap<String, Object> {
* their tuples with a message id.
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
/**
@@ -1323,6 +1365,7 @@ public class Config extends HashMap<String, Object> {
* The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
*/
@isInteger
+ @isPositiveNumber(includeZero = true)
public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
/**
@@ -1380,7 +1423,7 @@ public class Config extends HashMap<String, Object> {
* Topology-specific environment variables for the worker child process.
* This is added to the existing environment (that of the supervisor)
*/
- @isType(type=Map.class)
+ @isMapEntryType(keyType = String.class, valueType = String.class)
public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
/*
@@ -1425,6 +1468,7 @@ public class Config extends HashMap<String, Object> {
* The size of the Disruptor transfer queue for each worker.
*/
@isInteger
+ @isPowerOf2
public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
/**
@@ -1460,12 +1504,14 @@ public class Config extends HashMap<String, Object> {
* See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
/**
* How often a batch can be emitted in a Trident topology.
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
/**
@@ -1502,6 +1548,7 @@ public class Config extends HashMap<String, Object> {
* Max pending tuples in one ShellBolt
*/
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
/**
@@ -1512,7 +1559,7 @@ public class Config extends HashMap<String, Object> {
* S2 - Confidential
* S3 - Secret (default.)
*/
- @isString
+ @isString(acceptedValues = {"S0", "S1", "S2", "S3"})
public static final String TOPOLOGY_LOGGING_SENSITIVITY="topology.logging.sensitivity";
/**
@@ -1532,6 +1579,7 @@ public class Config extends HashMap<String, Object> {
* The port to use to connect to the transactional zookeeper servers. If null (which is default),
* will use storm.zookeeper.port
*/
+ @isPositiveNumber
@isInteger
public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
@@ -1544,6 +1592,7 @@ public class Config extends HashMap<String, Object> {
/**
* The number of threads that should be used by the zeromq context in each worker process.
*/
+ @Deprecated
@isInteger
public static final String ZMQ_THREADS = "zmq.threads";
@@ -1552,6 +1601,7 @@ public class Config extends HashMap<String, Object> {
* the connection is closed. This is an advanced configuration and can almost
* certainly be ignored.
*/
+ @Deprecated
@isInteger
public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
@@ -1559,6 +1609,7 @@ public class Config extends HashMap<String, Object> {
* The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion
* on the networking layer.
*/
+ @Deprecated
@isInteger
public static final String ZMQ_HWM = "zmq.hwm";
@@ -1597,7 +1648,8 @@ public class Config extends HashMap<String, Object> {
* The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
*/
- @isNumber
+ @isPositiveNumber
+ @isInteger
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index b9244ef..406362f 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -25,8 +25,10 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
/**
@@ -41,7 +43,7 @@ public class ConfigValidation {
public static abstract class Validator {
public Validator(Map<String, Object> params) {}
public Validator() {}
- public abstract void validateField(String name, Object o) throws InstantiationException, IllegalAccessException;
+ public abstract void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException;
}
/**
@@ -91,9 +93,27 @@ public class ConfigValidation {
public static class StringValidator extends Validator {
+ private HashSet<String> acceptedValues = null;
+
+ public StringValidator(){}
+
+ public StringValidator(Map<String, Object> params) {
+
+ this.acceptedValues = new HashSet<String>(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES)));
+
+ if(this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) {
+ this.acceptedValues = null;
+ }
+ }
+
@Override
public void validateField(String name, Object o) {
SimpleTypeValidator.validateField(name, String.class, o);
+ if(this.acceptedValues != null) {
+ if (!this.acceptedValues.contains((String) o)) {
+ throw new IllegalArgumentException("Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues);
+ }
+ }
}
}
@@ -147,20 +167,26 @@ public class ConfigValidation {
}
/**
- * Validates a map of Strings to a map of Strings to a list.
- * {str -> {str -> [str,str]}
+ * Validates an entry for ImpersonationAclUser
*/
- public static class ImpersonationAclValidator extends Validator {
+ public static class ImpersonationAclUserEntryValidator extends Validator {
@Override
- public void validateField(String name, Object o) {
+ public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
if (o == null) {
return;
}
+ LOG.info("object: {}", o);
ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
- ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
- ConfigValidationUtils.listFv(String.class, false), false), true);
+ ConfigValidationUtils.listFv(String.class, false), false);
validator.validateField(name, o);
+ Map<String, List<String>> mapObject = (Map<String, List<String>>) o;
+ if (!mapObject.containsKey("hosts")) {
+ throw new IllegalArgumentException(name + " should contain Map entry with key: hosts");
+ }
+ if (!mapObject.containsKey("groups")) {
+ throw new IllegalArgumentException(name + " should contain Map entry with key: groups");
+ }
}
}
@@ -293,12 +319,12 @@ public class ConfigValidation {
}
@Override
- public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException {
+ public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
validateField(name, this.entryValidators, o);
}
- public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException {
+ public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
if (o == null) {
return;
}
@@ -306,7 +332,7 @@ public class ConfigValidation {
SimpleTypeValidator.validateField(name, Iterable.class, o);
for (Object entry : (Iterable) o) {
for (Class validator : validators) {
- Object v = validator.newInstance();
+ Object v = validator.getConstructor().newInstance();
if (v instanceof Validator) {
((Validator) v).validateField(name + " list entry", entry);
} else {
@@ -355,11 +381,11 @@ public class ConfigValidation {
}
@Override
- public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException {
+ public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
validateField(name, this.keyValidators, this.valueValidators, o);
}
- public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException {
+ public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
if (o == null) {
return;
}
@@ -367,7 +393,7 @@ public class ConfigValidation {
SimpleTypeValidator.validateField(name, Map.class, o);
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
for (Class kv : keyValidators) {
- Object keyValidator = kv.newInstance();
+ Object keyValidator = kv.getConstructor().newInstance();
if (keyValidator instanceof Validator) {
((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
} else {
@@ -375,7 +401,7 @@ public class ConfigValidation {
}
}
for (Class vv : valueValidators) {
- Object valueValidator = vv.newInstance();
+ Object valueValidator = vv.getConstructor().newInstance();;
if (valueValidator instanceof Validator) {
((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
} else {
@@ -425,6 +451,26 @@ public class ConfigValidation {
}
}
+ public static class MetricRegistryValidator extends Validator {
+
+ @Override
+ public void validateField(String name, Object o) throws IllegalAccessException {
+ if(o == null) {
+ return;
+ }
+ SimpleTypeValidator.validateField(name, Map.class, o);
+ if(!((Map) o).containsKey("class") ) {
+ throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
+ }
+ if(!((Map) o).containsKey("parallelism.hint") ) {
+ throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
+ }
+
+ SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
+ SimpleTypeValidator.validateField(name, long.class, ((Map) o).get("parallelism.hint"));
+ }
+ }
+
/**
* Methods for validating confs
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index 0b64479..44d9c51 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -45,6 +45,7 @@ public class ConfigValidationAnnotations {
static final String KEY_TYPE = "keyType";
static final String VALUE_TYPE = "valueType";
static final String INCLUDE_ZERO = "includeZero";
+ static final String ACCEPTED_VALUES = "acceptedValues";
}
/**
@@ -86,6 +87,7 @@ public class ConfigValidationAnnotations {
@Target(ElementType.FIELD)
public @interface isString {
Class validatorClass() default ConfigValidation.StringValidator.class;
+ String[] acceptedValues() default "";
}
@Retention(RetentionPolicy.RUNTIME)
http://git-wip-us.apache.org/repos/asf/storm/blob/80df44ba/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 3ac1d47..0f53634 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -543,6 +543,78 @@ public class TestConfigValidate {
}
}
+ @Test
+ public void TestAcceptedStrings() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+ TestConfig config = new TestConfig();
+ String[] passCases = {"aaa", "bbb", "ccc"};
+
+ for (Object value : passCases) {
+ config.put(TestConfig.TEST_MAP_CONFIG_5, value);
+ ConfigValidation.validateFields(config, TestConfig.class);
+ }
+
+ String[] failCases = {"aa", "bb", "cc", "abc", "a", "b", "c", ""};
+ for (Object value : failCases) {
+ try {
+ config.put(TestConfig.TEST_MAP_CONFIG_5, value);
+ ConfigValidation.validateFields(config, TestConfig.class);
+ Assert.fail("Expected Exception not Thrown for value: " + value);
+ } catch (IllegalArgumentException Ex) {
+ }
+ }
+ }
+
+ @Test
+ public void TestImpersonationAclUserEntryValidator() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+ TestConfig config = new TestConfig();
+ Collection<Object> passCases = new LinkedList<Object>();
+ Collection<Object> failCases = new LinkedList<Object>();
+
+ Map<String, Map<String, List<String>>> passCase1 = new HashMap<String, Map<String, List<String>>>();
+ Map<String, List<String>> passCase1_hostsAndGroups = new HashMap<String, List<String>>();
+ String[] hosts = {"host.1", "host.2", "host.3"};
+ passCase1_hostsAndGroups.put("hosts", Arrays.asList(hosts));
+ String[] groups = {"group.1", "group.2", "group.3"};
+ passCase1_hostsAndGroups.put("groups", Arrays.asList(groups));
+ passCase1.put("jerry", passCase1_hostsAndGroups);
+ passCases.add(passCase1);
+
+ for (Object value : passCases) {
+ config.put(TestConfig.TEST_MAP_CONFIG_6, value);
+ ConfigValidation.validateFields(config, TestConfig.class);
+ }
+
+ Map<String, Map<String, List<String>>> failCase1 = new HashMap<String, Map<String, List<String>>>();
+ Map<String, List<String>> failCase1_hostsAndGroups = new HashMap<String, List<String>>();
+ String[] failhosts = {"host.1", "host.2", "host.3"};
+ failCase1_hostsAndGroups.put("hosts", Arrays.asList(hosts));
+ failCase1.put("jerry", failCase1_hostsAndGroups);
+
+
+ Map<String, Map<String, List<String>>> failCase2 = new HashMap<String, Map<String, List<String>>>();
+ Map<String, List<String>> failCase2_hostsAndGroups = new HashMap<String, List<String>>();
+ String[] failgroups = {"group.1", "group.2", "group.3"};
+ failCase2_hostsAndGroups.put("groups", Arrays.asList(groups));
+ failCase2.put("jerry", failCase2_hostsAndGroups);
+
+ failCases.add(failCase1);
+ failCases.add(failCase2);
+ failCases.add("stuff");
+ failCases.add(5);
+
+ for (Object value : failCases) {
+ try {
+ config.put(TestConfig.TEST_MAP_CONFIG_6, value);
+ ConfigValidation.validateFields(config, TestConfig.class);
+ Assert.fail("Expected Exception not Thrown for value: " + value);
+ } catch (IllegalArgumentException Ex) {
+ }
+ }
+
+
+
+ }
+
public class TestConfig extends HashMap<String, Object> {
@isMapEntryType(keyType = String.class, valueType = Integer.class)
public static final String TEST_MAP_CONFIG = "test.map.config";
@@ -560,5 +632,11 @@ public class TestConfigValidate {
entryValidatorClasses = {PositiveNumberValidator.class, NotNullValidator.class})
@isNoDuplicateInList
public static final String TEST_MAP_CONFIG_4 = "test.map.config.4";
+
+ @isString(acceptedValues = {"aaa", "bbb", "ccc"})
+ public static final String TEST_MAP_CONFIG_5 = "test.map.config.5";
+
+ @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {ImpersonationAclUserEntryValidator.class})
+ public static final String TEST_MAP_CONFIG_6 = "test.map.config.6";
}
}
[3/5] storm git commit: addressing comments
Posted by ka...@apache.org.
addressing comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b3e8617
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b3e8617
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b3e8617
Branch: refs/heads/master
Commit: 2b3e8617b88779c20524d87fe4d546ba59cc888f
Parents: b57c830
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Oct 23 17:09:24 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Oct 23 17:10:09 2015 -0500
----------------------------------------------------------------------
.../src/jvm/backtype/storm/validation/ConfigValidation.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2b3e8617/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 406362f..1b0cd7f 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -401,7 +401,7 @@ public class ConfigValidation {
}
}
for (Class vv : valueValidators) {
- Object valueValidator = vv.getConstructor().newInstance();;
+ Object valueValidator = vv.getConstructor().newInstance();
if (valueValidator instanceof Validator) {
((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
} else {
@@ -463,7 +463,7 @@ public class ConfigValidation {
throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
}
if(!((Map) o).containsKey("parallelism.hint") ) {
- throw new IllegalAccessException("Field " + name + " must have map entry with key: class");
+ throw new IllegalAccessException("Field " + name + " must have map entry with key: parallelism.hint");
}
SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
[5/5] storm git commit: add STORM-1111 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1111 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ff3b8aff
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ff3b8aff
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ff3b8aff
Branch: refs/heads/master
Commit: ff3b8affaeed5b94fa5b3e57dee54ca48569d738
Parents: aca8bfb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 10:30:35 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 10:30:35 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ff3b8aff/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5d308d7..8adfa0e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1111: Fix Validation for lots of different configs
* STORM-1125: Adding separate ZK client for read in Nimbus ZK State
* STORM-1121: Remove method call to avoid overhead during topology submission time
* STORM-1120: Fix keyword (schema -> scheme) from main-routes
[2/5] storm git commit: delete unneccessary annotations
Posted by ka...@apache.org.
delete unneccessary annotations
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b57c8305
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b57c8305
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b57c8305
Branch: refs/heads/master
Commit: b57c8305460f4de46d92129c961ed46a53a216db
Parents: 80df44b
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Mon Oct 19 12:44:41 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Oct 22 11:27:56 2015 -0500
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 7 +++----
.../storm/validation/ConfigValidationAnnotations.java | 6 ------
2 files changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b57c8305/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ea58538..5f5fdec 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -253,7 +253,6 @@ public class Config extends HashMap<String, Object> {
*
* Defaults to false.
*/
- @Deprecated
@isBoolean
public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
@@ -1254,8 +1253,8 @@ public class Config extends HashMap<String, Object> {
* to be equal to the number of workers configured for this topology. If this variable is set to 0,
* event logging will be disabled.</p>
*/
- @isPositiveNumber
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
/**
@@ -1579,8 +1578,8 @@ public class Config extends HashMap<String, Object> {
* The port to use to connect to the transactional zookeeper servers. If null (which is default),
* will use storm.zookeeper.port
*/
- @isPositiveNumber
@isInteger
+ @isPositiveNumber
public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
/**
@@ -1648,8 +1647,8 @@ public class Config extends HashMap<String, Object> {
* The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
*/
- @isPositiveNumber
@isInteger
+ @isPositiveNumber
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/b57c8305/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index 44d9c51..ed93370 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -185,12 +185,6 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
- public @interface isImpersonationAcl {
- Class validatorClass() default ConfigValidation.ImpersonationAclValidator.class;
- }
-
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.FIELD)
public @interface isStringOrStringList {
Class validatorClass() default ConfigValidation.StringOrStringListValidator.class;
}
[4/5] storm git commit: Merge branch 'STORM-1111' of
https://github.com/jerrypeng/storm into STORM-1111
Posted by ka...@apache.org.
Merge branch 'STORM-1111' of https://github.com/jerrypeng/storm into STORM-1111
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aca8bfb7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aca8bfb7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aca8bfb7
Branch: refs/heads/master
Commit: aca8bfb7dd76997340dc3d6a54479686a3ee31b0
Parents: b3bc585 2b3e861
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Oct 24 10:29:23 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Oct 24 10:29:23 2015 +0900
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 93 +++++++++++++++-----
.../storm/validation/ConfigValidation.java | 74 +++++++++++++---
.../validation/ConfigValidationAnnotations.java | 8 +-
.../jvm/backtype/storm/TestConfigValidate.java | 78 ++++++++++++++++
4 files changed, 212 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/aca8bfb7/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------