You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/15 20:18:06 UTC

[1/5] storm git commit: [STORM-1084] - Improve Storm config validation process to use java annotations instead of *_SCHEMA format

Repository: storm
Updated Branches:
  refs/heads/master 9fe97b6ea -> 54772f83c


http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
new file mode 100644
index 0000000..2e4470c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.validation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * Provides functionality for validating configuration fields.
+ */
+public class ConfigValidation {
+
+    private static final Class CONFIG_CLASS = backtype.storm.Config.class;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigValidation.class);
+
+    public static abstract class Validator {
+        public abstract void validateField(String name, Object o);
+    }
+
+    public abstract static class TypeValidator {
+        public abstract void validateField(String name, Class type, Object o);
+    }
+
+    /**
+     * Validator definitions
+     */
+
+    /**
+     * Validates if an object is not null
+     */
+
+    public static class NotNullValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                throw new IllegalArgumentException("Field " + name + "cannot be null! Actual value: " + o);
+            }
+        }
+    }
+
+    /**
+     * Validates basic types
+     */
+    public static class SimpleTypeValidator extends TypeValidator {
+
+        public void validateField(String name, Class type, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (type.isInstance(o)) {
+                return;
+            }
+            throw new IllegalArgumentException("Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass());
+        }
+    }
+
+    public static class StringValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            SimpleTypeValidator validator = new SimpleTypeValidator();
+            validator.validateField(name, String.class, o);
+        }
+    }
+
+    public static class BooleanValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            SimpleTypeValidator validator = new SimpleTypeValidator();
+            validator.validateField(name, Boolean.class, o);
+        }
+    }
+
+    public static class NumberValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            SimpleTypeValidator validator = new SimpleTypeValidator();
+            validator.validateField(name, Number.class, o);
+        }
+    }
+
+    public static class DoubleValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            SimpleTypeValidator validator = new SimpleTypeValidator();
+            validator.validateField(name, Double.class, o);
+        }
+    }
+
+    /**
+     * Validates a Integer.
+     */
+    public static class IntegerValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateInteger(name, o);
+        }
+
+        public void validateInteger(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            final long i;
+            if (o instanceof Number &&
+                    (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) {
+                if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
+                    return;
+                }
+            }
+            throw new IllegalArgumentException("Field " + name + " must be an Integer within type range.");
+        }
+    }
+
+    /**
+     * Validates a map of Strings to a map of Strings to a list.
+     * {str -> {str -> [str,str]}
+     */
+    public static class ImpersonationAclValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
+                    ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
+                            ConfigValidationUtils.listFv(String.class, false), false), true);
+            validator.validateField(name, o);
+        }
+    }
+
+    /**
+     * validates a list of has no duplicates
+     */
+    public static class NoDuplicateInListValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object field) {
+            if (field == null) {
+                return;
+            }
+            //check if iterable
+            SimpleTypeValidator isIterable = new SimpleTypeValidator();
+            isIterable.validateField(name, Iterable.class, field);
+            HashSet<Object> objectSet = new HashSet<Object>();
+            for (Object o : (Iterable) field) {
+                if (objectSet.contains(o)) {
+                    throw new IllegalArgumentException(name + " should contain no duplicate elements. Duplicated element: " + o);
+                }
+                objectSet.add(o);
+            }
+        }
+    }
+
+    /**
+     * Validates a String or a list of Strings
+     */
+    public static class StringOrStringListValidator extends Validator {
+
+        private ConfigValidationUtils.FieldValidator fv = ConfigValidationUtils.listFv(String.class, false);
+
+        @Override
+        public void validateField(String name, Object o) {
+
+            if (o == null) {
+                return;
+            }
+            if (o instanceof String) {
+                return;
+            }
+            //check if iterable
+            SimpleTypeValidator isIterable = new SimpleTypeValidator();
+            try {
+                isIterable.validateField(name, Iterable.class, o);
+            } catch (Exception ex) {
+            }
+            this.fv.validateField(name, o);
+        }
+    }
+
+    /**
+     * Validates Kryo Registration
+     */
+    public static class KryoRegValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (o instanceof Iterable) {
+                for (Object e : (Iterable) o) {
+                    if (e instanceof Map) {
+                        for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) e).entrySet()) {
+                            if (!(entry.getKey() instanceof String) ||
+                                    !(entry.getValue() instanceof String)) {
+                                throw new IllegalArgumentException(
+                                        "Each element of the list " + name + " must be a String or a Map of Strings");
+                            }
+                        }
+                    } else if (!(e instanceof String)) {
+                        throw new IllegalArgumentException(
+                                "Each element of the list " + name + " must be a String or a Map of Strings");
+                    }
+                }
+                return;
+            }
+            throw new IllegalArgumentException(
+                    "Field " + name + " must be an Iterable containing only Strings or Maps of Strings");
+        }
+    }
+
+    /**
+     * Validates if a number is a power of 2
+     */
+    public static class PowerOf2Validator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            final long i;
+            if (o instanceof Number &&
+                    (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) {
+                // Test whether the integer is a power of 2.
+                if (i > 0 && (i & (i - 1)) == 0) {
+                    return;
+                }
+            }
+            throw new IllegalArgumentException("Field " + name + " must be a power of 2.");
+        }
+    }
+
+    /**
+     * Validates each entry in a list
+     */
+    public static class ListEntryTypeValidator extends TypeValidator {
+
+        @Override
+        public void validateField(String name, Class type, Object o) {
+            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
+            validator.validateField(name, o);
+        }
+    }
+
+    /**
+     * Validates each entry in a list against a list of custom Validators
+     * Each validator in the list of validators must inherit or be an instance of Validator class
+     */
+    public static class ListEntryCustomValidator {
+
+        public void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException {
+            if (o == null) {
+                return;
+            }
+            //check if iterable
+            SimpleTypeValidator isIterable = new SimpleTypeValidator();
+            isIterable.validateField(name, Iterable.class, o);
+            for (Object entry : (Iterable) o) {
+                for (Class validator : validators) {
+                    Object v = validator.newInstance();
+                    if (v instanceof Validator) {
+                        ((Validator) v).validateField(name + " list entry", entry);
+                    } else {
+                        LOG.warn("validator: {} cannot be used in ListEntryCustomValidator.  Individual entry validators must a instance of Validator class", validator.getName());
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * validates each key and value in a map of a certain type
+     */
+    public static class MapEntryTypeValidator {
+
+        public void validateField(String name, Class keyType, Class valueType, Object o) {
+            ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
+            validator.validateField(name, o);
+        }
+    }
+
+    /**
+     * validates each key and each value against the respective arrays of validators
+     */
+    public static class MapEntryCustomValidator {
+
+        public void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException {
+            if (o == null) {
+                return;
+            }
+            //check if Map
+            SimpleTypeValidator isMap = new SimpleTypeValidator();
+            isMap.validateField(name, Map.class, o);
+            for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
+                for (Class kv : keyValidators) {
+                    Object keyValidator = kv.newInstance();
+                    if (keyValidator instanceof Validator) {
+                        ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
+                    } else {
+                        LOG.warn("validator: {} cannot be used in MapEntryCustomValidator to validate keys.  Individual entry validators must a instance of Validator class", kv.getName());
+                    }
+                }
+                for (Class vv : valueValidators) {
+                    Object valueValidator = vv.newInstance();
+                    if (valueValidator instanceof Validator) {
+                        ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
+                    } else {
+                        LOG.warn("validator: {} cannot be used in MapEntryCustomValidator to validate values.  Individual entry validators must a instance of Validator class", vv.getName());
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Validates a positive number
+     */
+    public static class PositiveNumberValidator extends Validator{
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, false, o);
+        }
+
+        public void validateField(String name, boolean includeZero, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (o instanceof Number) {
+                if(includeZero) {
+                    if (((Number) o).doubleValue() >= 0.0) {
+                        return;
+                    }
+                } else {
+                    if (((Number) o).doubleValue() > 0.0) {
+                        return;
+                    }
+                }
+            }
+            throw new IllegalArgumentException("Field " + name + " must be a Positive Number");
+        }
+    }
+
+    /**
+     * Methods for validating confs
+     */
+
+    /**
+     * Validates a field given field name as string uses Config.java as the default config class
+     *
+     * @param fieldName provided as a string
+     * @param conf      map of confs
+     */
+    public static void validateField(String fieldName, Map conf) throws NoSuchFieldException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+        validateField(fieldName, conf, CONFIG_CLASS);
+    }
+
+    /**
+     * Validates a field given field name as string
+     *
+     * @param fieldName   provided as a string
+     * @param conf        map of confs
+     * @param configClass config class
+     */
+    public static void validateField(String fieldName, Map conf, Class configClass) throws NoSuchFieldException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+        Field field = configClass.getField(fieldName);
+        validateField(field, conf);
+    }
+
+    /**
+     * Validates a field given field.  Calls correct ValidatorField method based on which fields are
+     * declared for the corresponding annotation.
+     *
+     * @param field field that needs to be validated
+     * @param conf  map of confs
+     */
+    public static void validateField(Field field, Map conf) throws NoSuchFieldException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+        Annotation[] annotations = field.getAnnotations();
+        if (annotations.length == 0) {
+            LOG.warn("Field {} does not have validator annotation", field);
+        }
+        for (Annotation annotation : annotations) {
+            String type = annotation.annotationType().getName();
+            Class validatorClass = null;
+            Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
+            //check if annotation is one of our
+            for (Class clazz : classes) {
+                if (clazz.getName().equals(type)) {
+                    validatorClass = clazz;
+                    break;
+                }
+            }
+            if (validatorClass != null) {
+                Object v = validatorClass.cast(annotation);
+                String key = (String) field.get(null);
+                Class clazz = (Class) validatorClass
+                        .getMethod(ConfigValidationAnnotations.VALIDATOR_CLASS).invoke(v);
+                //Determining which method from which class should be invoked based on what fields/parameters the annotation has
+                if (hasMethod(validatorClass, ConfigValidationAnnotations.TYPE)) {
+
+                    TypeValidator o = ((Class<TypeValidator>) clazz).newInstance();
+
+                    Class objectType = (Class) validatorClass.getMethod(ConfigValidationAnnotations.TYPE).invoke(v);
+
+                    o.validateField(field.getName(), objectType, conf.get(key));
+
+                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.ENTRY_VALIDATOR_CLASSES)) {
+
+                    ListEntryCustomValidator o = ((Class<ListEntryCustomValidator>) clazz).newInstance();
+
+                    Class[] entryValidators = (Class[]) validatorClass.getMethod(ConfigValidationAnnotations.ENTRY_VALIDATOR_CLASSES).invoke(v);
+
+                    o.validateField(field.getName(), entryValidators, conf.get(key));
+
+                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.KEY_VALIDATOR_CLASSES)
+                        && hasMethod(validatorClass, ConfigValidationAnnotations.VALUE_VALIDATOR_CLASSES)) {
+
+                    MapEntryCustomValidator o = ((Class<MapEntryCustomValidator>) clazz).newInstance();
+
+                    Class[] keyValidators = (Class[]) validatorClass.getMethod(ConfigValidationAnnotations.KEY_VALIDATOR_CLASSES).invoke(v);
+
+                    Class[] valueValidators = (Class[]) validatorClass.getMethod(ConfigValidationAnnotations.VALUE_VALIDATOR_CLASSES).invoke(v);
+
+                    o.validateField(field.getName(), keyValidators, valueValidators, conf.get(key));
+
+                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.KEY_TYPE)
+                        && hasMethod(validatorClass, ConfigValidationAnnotations.VALUE_TYPE)) {
+
+                    MapEntryTypeValidator o = ((Class<MapEntryTypeValidator>) clazz).newInstance();
+
+                    Class keyType = (Class) validatorClass.getMethod(ConfigValidationAnnotations.KEY_TYPE).invoke(v);
+
+                    Class valueType = (Class) validatorClass.getMethod(ConfigValidationAnnotations.VALUE_TYPE).invoke(v);
+
+                    o.validateField(field.getName(), keyType, valueType, conf.get(key));
+
+                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.INCLUDE_ZERO)) {
+
+                    PositiveNumberValidator o = ((Class<PositiveNumberValidator>) clazz).newInstance();
+
+                    Boolean includeZero = (Boolean) validatorClass.getMethod(ConfigValidationAnnotations.INCLUDE_ZERO).invoke(v);
+
+                    o.validateField(field.getName(), includeZero, conf.get(key));
+
+                }
+                //For annotations that does not have any additional fields. Call corresponding validateField method
+                else {
+
+                    ConfigValidation.Validator o = ((Class<ConfigValidation.Validator>) clazz).newInstance();
+
+                    o.validateField(field.getName(), conf.get(key));
+                }
+            }
+        }
+    }
+
+    /**
+     * Validate all confs in map
+     *
+     * @param conf map of configs
+     */
+    public static void validateFields(Map conf) throws IllegalAccessException, InstantiationException, NoSuchFieldException, NoSuchMethodException, InvocationTargetException {
+        validateFields(conf, CONFIG_CLASS);
+    }
+
+    /**
+     * Validate all confs in map
+     *
+     * @param conf        map of configs
+     * @param configClass config class
+     */
+    public static void validateFields(Map conf, Class configClass) throws IllegalAccessException, InstantiationException, NoSuchFieldException, NoSuchMethodException, InvocationTargetException {
+        for (Field field : configClass.getFields()) {
+            Object keyObj = field.get(null);
+            //make sure that defined key is string in case wrong stuff got put into Config.java
+            if (keyObj instanceof String) {
+                String confKey = (String) keyObj;
+                if (conf.containsKey(confKey)) {
+                    validateField(field, conf);
+                }
+            }
+        }
+    }
+
+    private static boolean hasMethod(Class clazz, String method) {
+        try {
+            clazz.getMethod(method);
+        } catch (NoSuchMethodException ex) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
new file mode 100644
index 0000000..be3b665
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.validation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Note: every annotation interface must have method validatorClass()
+ * For every annotation there must validator class to do the validation
+ * To add another annotation for config validation, add another annotation @interface class.  Implement the corresponding
+ * validator logic in a class in ConfigValidation.  Make sure validateField method in ConfigValidation knows how to use the validator
+ * and which method definition/parameters to pass in based on what fields are in the annotation.
+ */
+public class ConfigValidationAnnotations {
+    /**
+     * Field names for annotations
+     */
+
+    static final String VALIDATOR_CLASS = "validatorClass";
+    static final String TYPE = "type";
+    static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
+    static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
+    static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
+    static final String KEY_TYPE = "keyType";
+    static final String VALUE_TYPE = "valueType";
+    static final String INCLUDE_ZERO = "includeZero";
+
+    /**
+     * Validators with fields: validatorClass and type
+     */
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isType {
+        Class validatorClass() default ConfigValidation.SimpleTypeValidator.class;
+
+        Class type();
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isStringList {
+        Class validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
+
+        Class type() default String.class;
+    }
+
+    /**
+     * validates each entry in a list is of a certain type
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isListEntryType {
+        Class validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
+
+        Class type();
+    }
+
+    /**
+     * Validators with fields: validatorClass
+     */
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isString {
+        Class validatorClass() default ConfigValidation.StringValidator.class;
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isNumber {
+        Class validatorClass() default ConfigValidation.NumberValidator.class;
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isBoolean {
+        Class validatorClass() default ConfigValidation.BooleanValidator.class;
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isInteger {
+        Class validatorClass() default ConfigValidation.IntegerValidator.class;
+    }
+
+    /**
+     * validates on object is not null
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface NotNull {
+        Class validatorClass() default ConfigValidation.NotNullValidator.class;
+    }
+
+    /**
+     * validates that there are no duplicates in a list
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isNoDuplicateInList {
+        Class validatorClass() default ConfigValidation.NoDuplicateInListValidator.class;
+    }
+
+    /**
+     * Validates each entry in a list with a list of validators
+     * Validators with fields: validatorClass and entryValidatorClass
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isListEntryCustom {
+        Class validatorClass() default ConfigValidation.ListEntryCustomValidator.class;
+
+        Class[] entryValidatorClasses();
+    }
+
+    /**
+     * Validates the type of each key and value in a map
+     * Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass
+     */
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryType {
+        Class validatorClass() default ConfigValidation.MapEntryTypeValidator.class;
+
+        Class keyType();
+
+        Class valueType();
+    }
+
+    /**
+     * Validates a each key and value in a Map with a list of validators
+     * Validator with fields: validatorClass, keyValidatorClasses, valueValidatorClasses
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isMapEntryCustom {
+        Class validatorClass() default ConfigValidation.MapEntryCustomValidator.class;
+
+        Class[] keyValidatorClasses();
+
+        Class[] valueValidatorClasses();
+    }
+
+    /**
+     * checks if a number is positive and whether zero inclusive
+     * Validator with fields: validatorClass, includeZero
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isPositiveNumber {
+        Class validatorClass() default ConfigValidation.PositiveNumberValidator.class;
+
+        boolean includeZero() default false;
+    }
+
+    /**
+     * Complex/custom type validators
+     */
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isImpersonationAcl {
+        Class validatorClass() default ConfigValidation.ImpersonationAclValidator.class;
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isStringOrStringList {
+        Class validatorClass() default ConfigValidation.StringOrStringListValidator.class;
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isKryoReg {
+        Class validatorClass() default ConfigValidation.KryoRegValidator.class;
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isPowerOf2 {
+        Class validatorClass() default ConfigValidation.PowerOf2Validator.class;
+    }
+
+    /**
+     * For custom validators
+     */
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface CustomValidator {
+        Class validatorClass();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/jvm/backtype/storm/validation/ConfigValidationUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationUtils.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationUtils.java
new file mode 100644
index 0000000..d55dd5d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationUtils.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.validation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class ConfigValidationUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigValidationUtils.class);
+
+    /**
+     * Declares methods for validating configuration values.
+     */
+    public static interface FieldValidator {
+        /**
+         * Validates the given field.
+         * @param name the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public void validateField(String name, Object field) throws IllegalArgumentException;
+    }
+
+    /**
+     * Declares a method for validating configuration values that is nestable.
+     */
+    public static abstract class NestableFieldValidator implements FieldValidator {
+        @Override
+        public void validateField(String name, Object field) throws IllegalArgumentException {
+            validateField(null, name, field);
+        }
+
+        /**
+         * Validates the given field.
+         * @param pd describes the parent wrapping this validator.
+         * @param name the name of the field.
+         * @param field The field to be validated.
+         * @throws IllegalArgumentException if the field fails validation.
+         */
+        public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException;
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a given class.
+     * @param cls the Class the field should be a type of
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for that class
+     */
+    public static NestableFieldValidator fv(final Class cls, final boolean notNull) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                    throws IllegalArgumentException {
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (!cls.isInstance(field)) {
+                    throw new IllegalArgumentException(
+                            pd + name + " must be a " + cls.getName() + ". (" + field + ")");
+                }
+            }
+        };
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a List of the given Class.
+     * @param cls the Class of elements composing the list
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list of the given class
+     */
+    public static NestableFieldValidator listFv(Class cls, boolean notNull) {
+        return listFv(fv(cls, notNull), notNull);
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a List where each item is validated by validator.
+     * @param validator used to validate each item in the list
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a list with each item validated by a different validator.
+     */
+    public static NestableFieldValidator listFv(final NestableFieldValidator validator,
+                                                final boolean notNull) {
+        return new NestableFieldValidator() {
+            @Override
+            public void validateField(String pd, String name, Object field)
+                    throws IllegalArgumentException {
+
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (field instanceof Iterable) {
+                    for (Object e : (Iterable) field) {
+                        validator.validateField(pd + "Each element of the list ", name, e);
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                        "Field " + name + " must be an Iterable but was " +
+                                ((field == null) ? "null" : ("a " + field.getClass())));
+            }
+        };
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a Map of key to val.
+     * @param key the Class of keys in the map
+     * @param val the Class of values in the map
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map of key to val
+     */
+    public static NestableFieldValidator mapFv(Class key, Class val,
+                                               boolean notNull) {
+        return mapFv(fv(key, false), fv(val, false), notNull);
+    }
+
+    /**
+     * Returns a new NestableFieldValidator for a Map.
+     * @param key a validator for the keys in the map
+     * @param val a validator for the values in the map
+     * @param notNull whether or not a value of null is valid
+     * @return a NestableFieldValidator for a Map
+     */
+    public static NestableFieldValidator mapFv(final NestableFieldValidator key,
+                                               final NestableFieldValidator val, final boolean notNull) {
+        return new NestableFieldValidator() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public void validateField(String pd, String name, Object field)
+                    throws IllegalArgumentException {
+                if (field == null) {
+                    if (notNull) {
+                        throw new IllegalArgumentException("Field " + name + " must not be null");
+                    } else {
+                        return;
+                    }
+                }
+                if (field instanceof Map) {
+                    for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) field).entrySet()) {
+                        key.validateField("Each key of the map ", name, entry.getKey());
+                        val.validateField("Each value in the map ", name, entry.getValue());
+                    }
+                    return;
+                }
+                throw new IllegalArgumentException(
+                        "Field " + name + " must be a Map");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
deleted file mode 100644
index 99032bd..0000000
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ /dev/null
@@ -1,186 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.config-test
-  (:import [backtype.storm Config ConfigValidation])
-  (:import [backtype.storm.scheduler TopologyDetails])
-  (:import [backtype.storm.utils Utils])
-  (:use [clojure test])
-  (:use [backtype.storm config util])
-  )
-
-(deftest test-validity
-  (is (Utils/isValidConf {TOPOLOGY-DEBUG true "q" "asasdasd" "aaa" (Integer. "123") "bbb" (Long. "456") "eee" [1 2 (Integer. "3") (Long. "4")]}))
-  (is (not (Utils/isValidConf {"qqq" (backtype.storm.utils.Utils.)})))
-  )
-
-(deftest test-power-of-2-validator
-  (let [validator ConfigValidation/PowerOf2Validator]
-    (doseq [x [42.42 42 23423423423 -33 -32 -1 -0.00001 0 -0 "Forty-two"]]
-      (is (thrown-cause? java.lang.IllegalArgumentException
-        (.validateField validator "test" x))))
-
-    (doseq [x [64 4294967296 1 nil]]
-      (is (nil? (try
-                  (.validateField validator "test" x)
-                  (catch Exception e e)))))))
-
-(deftest test-list-validator
-  (let [validator ConfigValidation/StringsValidator]
-    (doseq [x [
-               ["Forty-two" 42]
-               [42]
-               [true "false"]
-               [nil]
-               [nil "nil"]
-              ]]
-      (is (thrown-cause-with-msg?
-            java.lang.IllegalArgumentException #"(?i).*each element.*"
-        (.validateField validator "test" x))))
-
-    (doseq [x ["not a list at all"]]
-      (is (thrown-cause-with-msg?
-            java.lang.IllegalArgumentException #"(?i).*must be an iterable.*"
-        (.validateField validator "test" x))))
-
-    (doseq [x [
-               ["one" "two" "three"]
-               [""]
-               ["42" "64"]
-               nil
-              ]]
-    (is (nil? (try
-                (.validateField validator "test" x)
-                (catch Exception e e)))))))
-
-(deftest test-integer-validator
-  (let [validator ConfigValidation/IntegerValidator]
-    (.validateField validator "test" nil)
-    (.validateField validator "test" 1000)
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 1.34)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" (inc Integer/MAX_VALUE))))))
-
-(deftest test-pos-integer-validator
-  (let [validator ConfigValidation/NotNullPosIntegerValidator]
-    (is (thrown-cause? java.lang.IllegalArgumentException
-        (.validateField validator "test" nil)))
-    (.validateField validator "test" 1000)
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 1.34)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 0)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" -100)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" (inc Integer/MAX_VALUE))))))
-
-(deftest test-integers-validator
-  (let [validator ConfigValidation/NoDuplicateIntegersValidator]
-    (.validateField validator "test" nil)
-    (.validateField validator "test" [1000 0 -1000])
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" [0 10 10])))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" [0 10 1.34])))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" [0 nil])))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" [-100 (inc Integer/MAX_VALUE)])))))
-
-(deftest test-positive-number-validator
-  (let [validator ConfigValidation/PositiveNumberValidator]
-    (.validateField validator "test" nil)
-    (.validateField validator "test" 1.0)
-    (.validateField validator "test" 1)
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" -1.0)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" -1)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 0)))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 0.0)))))
-
-(deftest test-topology-workers-is-integer
-  (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKERS)]
-    (.validateField validator "test" 42)
-    (is (thrown-cause? java.lang.IllegalArgumentException
-      (.validateField validator "test" 3.14159)))))
-
-(deftest test-topology-stats-sample-rate-is-float
-  (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-STATS-SAMPLE-RATE)]
-    (.validateField validator "test" 0.5)
-    (.validateField validator "test" 10)
-    (.validateField validator "test" Double/MAX_VALUE)))
-
-(deftest test-isolation-scheduler-machines-is-map
-  (let [validator (CONFIG-SCHEMA-MAP ISOLATION-SCHEDULER-MACHINES)]
-    (is (nil? (try
-                (.validateField validator "test" {})
-                (catch Exception e e))))
-    (is (nil? (try
-                (.validateField validator "test" {"host0" 1 "host1" 2})
-                (catch Exception e e))))
-    (is (thrown-cause? java.lang.IllegalArgumentException
-      (.validateField validator "test" 42)))))
-
-(deftest test-positive-integer-validator
-  (let [validator ConfigValidation/PositiveIntegerValidator]
-    (doseq [x [42.42 -32 0 -0 "Forty-two"]]
-      (is (thrown-cause? java.lang.IllegalArgumentException
-        (.validateField validator "test" x))))
-
-    (doseq [x [42 4294967296 1 nil]]
-      (is (nil? (try
-                  (.validateField validator "test" x)
-                  (catch Exception e e)))))))
-
-(deftest test-worker-childopts-is-string-or-string-list
-  (let [pass-cases [nil "some string" ["some" "string" "list"]]]
-    (testing "worker.childopts validates"
-      (let [validator (CONFIG-SCHEMA-MAP WORKER-CHILDOPTS)]
-        (doseq [value pass-cases]
-          (is (nil? (try
-                      (.validateField validator "test" value)
-                      (catch Exception e e)))))
-        (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 42)))))
-
-    (testing "topology.worker.childopts validates"
-      (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKER-CHILDOPTS)]
-        (doseq [value pass-cases]
-          (is (nil? (try
-                      (.validateField validator "test" value)
-                      (catch Exception e e)))))
-        (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" 42)))))))
-
-(deftest test-absolute-storm-local-dir
-  (let [storm-home-key "storm.home"
-        conf-relative {STORM-LOCAL-DIR "storm-local"}
-        conf-absolute {STORM-LOCAL-DIR
-                       (if on-windows?
-                         "C:\\storm-local"
-                         "/var/storm-local")}]
-    (testing
-      "for relative path"
-      (is (= (str (System/getProperty storm-home-key) file-path-separator (conf-relative STORM-LOCAL-DIR))
-             (absolute-storm-local-dir conf-relative))))
-    (testing
-      "for absolute path"
-      (is (= (if on-windows? "C:\\storm-local" "/var/storm-local")
-             (absolute-storm-local-dir conf-absolute))))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/test/clj/backtype/storm/serialization_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/serialization_test.clj b/storm-core/test/clj/backtype/storm/serialization_test.clj
index 7f1c0a9..68a710c 100644
--- a/storm-core/test/clj/backtype/storm/serialization_test.clj
+++ b/storm-core/test/clj/backtype/storm/serialization_test.clj
@@ -18,10 +18,8 @@
   (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer
             KryoValuesSerializer KryoValuesDeserializer])
   (:import [backtype.storm.testing TestSerObject TestKryoDecorator])
-  (:import [backtype.storm ConfigValidation])
-  (:use [backtype.storm util config])
-  )
-
+  (:import [backtype.storm.validation ConfigValidation$KryoRegValidator])
+  (:use [backtype.storm util config]))
 
 (defn mk-conf [extra]
   (merge (read-default-config) extra))
@@ -42,19 +40,19 @@
     (deserialize (serialize vals conf) conf)))
 
 (deftest validate-kryo-conf-basic
-  (.validateField ConfigValidation/KryoRegValidator "test" ["a" "b" "c" {"d" "e"} {"f" "g"}]))
+  (.validateField (ConfigValidation$KryoRegValidator. ) "test" ["a" "b" "c" {"d" "e"} {"f" "g"}]))
 
 (deftest validate-kryo-conf-fail
   (try
-    (.validateField ConfigValidation/KryoRegValidator "test" {"f" "g"})
+    (.validateField (ConfigValidation$KryoRegValidator. ) "test" {"f" "g"})
     (assert false)
     (catch IllegalArgumentException e))
   (try
-    (.validateField ConfigValidation/KryoRegValidator "test" [1])
+    (.validateField (ConfigValidation$KryoRegValidator. ) "test" [1])
     (assert false)
     (catch IllegalArgumentException e))
   (try
-    (.validateField ConfigValidation/KryoRegValidator "test" [{"a" 1}])
+    (.validateField (ConfigValidation$KryoRegValidator. ) "test" [{"a" 1}])
     (assert false)
     (catch IllegalArgumentException e))
 )

http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
new file mode 100644
index 0000000..09d2be9
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm;
+
+import backtype.storm.utils.Utils;
+import backtype.storm.validation.ConfigValidation;
+import backtype.storm.validation.ConfigValidation.*;
+import backtype.storm.validation.ConfigValidationAnnotations.*;
+import org.junit.Test;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class TestConfigValidate {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestConfigValidate.class);
+
+    @Test
+    public void validConfigTest() throws InstantiationException, IllegalAccessException, NoSuchFieldException, NoSuchMethodException, InvocationTargetException {
+
+
+        Map<String, Object> conf = new HashMap<String, Object>();
+        conf.put(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG, 5);
+        conf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 500);
+        conf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, true);
+
+        ConfigValidation.validateFields(conf);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void invalidConfigTest() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        conf.put(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG, 5);
+        conf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 500);
+        conf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, "invalid");
+
+        ConfigValidation.validateFields(conf);
+    }
+
+    @Test
+    public void defaultYamlTest() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map conf = Utils.readStormConfig();
+        ConfigValidation.validateFields(conf);
+    }
+
+    @Test
+    public void testTopologyWorkersIsInteger() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        conf.put(Config.TOPOLOGY_WORKERS, 42);
+        ConfigValidation.validateFields(conf);
+
+        conf.put(Config.TOPOLOGY_WORKERS, 3.14159);
+        try {
+            ConfigValidation.validateFields(conf);
+            Assert.fail("Expected Exception not Thrown");
+        } catch (IllegalArgumentException ex) {
+        }
+    }
+
+    @Test
+    public void testTopologyStatsSampleRateIsFloat() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.5);
+        ConfigValidation.validateFields(conf);
+        conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 10);
+        ConfigValidation.validateFields(conf);
+        conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, Double.MAX_VALUE);
+        ConfigValidation.validateFields(conf);
+    }
+
+    @Test
+    public void testIsolationSchedulerMachinesIsMap() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        Map<String, Integer> isolationMap = new HashMap<String, Integer>();
+        conf.put(Config.ISOLATION_SCHEDULER_MACHINES, isolationMap);
+        ConfigValidation.validateFields(conf);
+
+        isolationMap.put("host0", 1);
+        isolationMap.put("host1", 2);
+
+        conf.put(Config.ISOLATION_SCHEDULER_MACHINES, isolationMap);
+        ConfigValidation.validateFields(conf);
+
+        conf.put(Config.ISOLATION_SCHEDULER_MACHINES, 42);
+        try {
+            ConfigValidation.validateFields(conf);
+            Assert.fail("Expected Exception not Thrown");
+        } catch (IllegalArgumentException ex) {
+        }
+    }
+
+    @Test
+    public void testWorkerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+
+        passCases.add(null);
+        passCases.add("some string");
+        String[] stuff = {"some", "string", "list"};
+        passCases.add(Arrays.asList(stuff));
+
+        failCases.add(42);
+        Integer[] wrongStuff = {1, 2, 3};
+        failCases.add(Arrays.asList(wrongStuff));
+
+        //worker.childopts validates
+        for (Object value : passCases) {
+            conf.put(Config.WORKER_CHILDOPTS, value);
+            ConfigValidation.validateFields(conf);
+        }
+
+        for (Object value : failCases) {
+            try {
+                conf.put(Config.WORKER_CHILDOPTS, value);
+                ConfigValidation.validateFields(conf);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+
+        //topology.worker.childopts validates
+        conf = new HashMap<String, Object>();
+        for (Object value : passCases) {
+            conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, value);
+            ConfigValidation.validateFields(conf);
+        }
+
+        for (Object value : failCases) {
+            try {
+                conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, value);
+                ConfigValidation.validateFields(conf);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testSupervisorSlotsPorts() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+
+        Integer[] test1 = {1233, 1234, 1235};
+        Integer[] test2 = {1233};
+        passCases.add(Arrays.asList(test1));
+        passCases.add(Arrays.asList(test2));
+
+        String[] test3 = {"1233", "1234", "1235"};
+        //duplicate case
+        Integer[] test4 = {1233, 1233, 1235};
+        failCases.add(test3);
+        failCases.add(test4);
+        failCases.add(null);
+        failCases.add("1234");
+        failCases.add(1234);
+
+        for (Object value : passCases) {
+            conf.put(Config.SUPERVISOR_SLOTS_PORTS, value);
+            ConfigValidation.validateFields(conf);
+        }
+
+        for (Object value : failCases) {
+            try {
+                conf.put(Config.SUPERVISOR_SLOTS_PORTS, value);
+                ConfigValidation.validateFields(conf);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testValidity() {
+        Map<String, Object> conf = new HashMap<String, Object>();
+        conf.put(Config.TOPOLOGY_DEBUG, true);
+        conf.put("q", "asasdasd");
+        conf.put("aaa", new Integer("123"));
+        conf.put("bbb", new Long("456"));
+        List<Object> testList = new ArrayList<Object>();
+        testList.add(1);
+        testList.add(2);
+        testList.add(new Integer("3"));
+        testList.add(new Long("4"));
+        conf.put("eee", testList);
+        Utils.isValidConf(conf);
+    }
+
+    @Test
+    public void testPowerOf2Validator() {
+        PowerOf2Validator validator = new PowerOf2Validator();
+
+        Object[] failCases = {42.42, 42, -33, 23423423423.0, -32, -1, -0.00001, 0, -0, "Forty-two"};
+        for (Object value : failCases) {
+            try {
+                validator.validateField("test", value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+
+        Object[] passCases = {64, 4294967296.0, 1, null};
+        for (Object value : passCases) {
+            validator.validateField("test", value);
+        }
+    }
+
+    @Test
+    public void testPositiveNumberValidator() {
+        PositiveNumberValidator validator = new PositiveNumberValidator();
+
+        Object[] passCases = {null, 1.0, 0.01, 1, 2147483647, 42};
+
+        for (Object value : passCases) {
+            validator.validateField("test", value);
+        }
+
+        Object[] failCases = {-1.0, -1, -0.01, 0.0, 0, "43", "string"};
+
+        for (Object value : failCases) {
+            try {
+                validator.validateField("test", value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+
+        Object[] passCasesIncludeZero = {null, 1.0, 0.01, 0, 2147483647, 0.0};
+
+        for (Object value : passCasesIncludeZero) {
+            validator.validateField("test", true, value);
+        }
+
+        Object[] failCasesIncludeZero = {-1.0, -1, -0.01, "43", "string"};
+
+        for (Object value : failCasesIncludeZero) {
+            try {
+                validator.validateField("test", true, value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testIntegerValidator() {
+        IntegerValidator validator = new IntegerValidator();
+
+        Object[] passCases = {null, 1000, Integer.MAX_VALUE};
+
+        for (Object value : passCases) {
+            validator.validateField("test", value);
+        }
+
+        Object[] failCases = {1.34, new Long(Integer.MAX_VALUE) + 1};
+
+        for (Object value : failCases) {
+            try {
+                validator.validateField("test", value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void NoDuplicateInListValidator() {
+        NoDuplicateInListValidator validator = new NoDuplicateInListValidator();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+
+        Object[] passCase1 = {1000, 0, -1000};
+        Object[] passCase2 = {"one", "two", "three"};
+        Object[] passCase3 = {false, true};
+        Object[] passCase4 = {false, true, 1000, 0, -1000, "one", "two", "three"};
+        Object[] passCase5 = {1000.0, 0.0, -1000.0};
+        passCases.add(Arrays.asList(passCase1));
+        passCases.add(Arrays.asList(passCase2));
+        passCases.add(Arrays.asList(passCase3));
+        passCases.add(Arrays.asList(passCase4));
+        passCases.add(Arrays.asList(passCase5));
+        passCases.add(null);
+
+        for (Object value : passCases) {
+            validator.validateField("test", value);
+        }
+
+        Object[] failCase1 = {1000, 0, 1000};
+        Object[] failCase2 = {"one", "one", "two"};
+        Object[] failCase3 = {5.0, 5.0, 6};
+        failCases.add(Arrays.asList(failCase1));
+        failCases.add(Arrays.asList(failCase2));
+        failCases.add(Arrays.asList(failCase3));
+        for (Object value : failCases) {
+            try {
+                validator.validateField("test", value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testListEntryTypeValidator() {
+        ListEntryTypeValidator validator = new ListEntryTypeValidator();
+        Collection<Object> testCases1 = new LinkedList<Object>();
+        Collection<Object> testCases2 = new LinkedList<Object>();
+        Collection<Object> testCases3 = new LinkedList<Object>();
+
+        Object[] testCase1 = {"one", "two", "three"};
+        ;
+        Object[] testCase2 = {"three"};
+        testCases1.add(Arrays.asList(testCase1));
+        testCases1.add(Arrays.asList(testCase2));
+
+        for (Object value : testCases1) {
+            validator.validateField("test", String.class, value);
+        }
+
+        for (Object value : testCases1) {
+            try {
+                validator.validateField("test", Number.class, value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+
+        Object[] testCase3 = {1000, 0, 1000};
+        Object[] testCase4 = {5};
+        Object[] testCase5 = {5.0, 5.0, 6};
+        testCases2.add(Arrays.asList(testCase3));
+        testCases2.add(Arrays.asList(testCase4));
+        testCases2.add(Arrays.asList(testCase5));
+        for (Object value : testCases2) {
+            try {
+                validator.validateField("test", String.class, value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+        for (Object value : testCases2) {
+            validator.validateField("test", Number.class, value);
+        }
+
+        Object[] testCase6 = {1000, 0, 1000, "5"};
+        Object[] testCase7 = {"4", "5", 5};
+        testCases3.add(Arrays.asList(testCase6));
+        testCases3.add(Arrays.asList(testCase7));
+        for (Object value : testCases3) {
+            try {
+                validator.validateField("test", String.class, value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+        for (Object value : testCases1) {
+            try {
+                validator.validateField("test", Number.class, value);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testMapEntryTypeAnnotation() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+        Map<Object, Object> passCase1 = new HashMap<Object, Object>();
+        passCase1.put("aaa", 5);
+        passCase1.put("bbb", 6);
+        passCase1.put("ccc", 7);
+        passCases.add(passCase1);
+        passCases.add(null);
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        Map<Object, Object> failCase1 = new HashMap<Object, Object>();
+        failCase1.put("aaa", 5);
+        failCase1.put(5, 6);
+        failCase1.put("ccc", 7);
+        Map<Object, Object> failCase2 = new HashMap<Object, Object>();
+        failCase2.put("aaa", "str");
+        failCase2.put("bbb", 6);
+        failCase2.put("ccc", 7);
+
+        failCases.add(failCase1);
+        failCases.add(failCase2);
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testMapEntryCustomAnnotation() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+        Map<Object, Object> passCase1 = new HashMap<Object, Object>();
+        passCase1.put("aaa", 5);
+        passCase1.put("bbb", 100);
+        passCase1.put("ccc", Integer.MAX_VALUE);
+        passCases.add(passCase1);
+        passCases.add(null);
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_2, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        Map<Object, Object> failCase1 = new HashMap<Object, Object>();
+        failCase1.put("aaa", 5);
+        failCase1.put(5, 6);
+        failCase1.put("ccc", 7);
+        Map<Object, Object> failCase2 = new HashMap<Object, Object>();
+        failCase2.put("aaa", "str");
+        failCase2.put("bbb", 6);
+        failCase2.put("ccc", 7);
+        Map<Object, Object> failCase3 = new HashMap<Object, Object>();
+        failCase3.put("aaa", -1);
+        failCase3.put("bbb", 6);
+        failCase3.put("ccc", 7);
+        Map<Object, Object> failCase4 = new HashMap<Object, Object>();
+        failCase4.put("aaa", 1);
+        failCase4.put("bbb", 6);
+        failCase4.put("ccc", 7.4);
+
+        failCases.add(failCase1);
+        failCases.add(failCase2);
+        failCases.add(failCase3);
+        failCases.add(failCase4);
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_2, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testListEntryTypeAnnotation() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+        Object[] passCase1 = {1, 5.0, -0.01, 0, Integer.MAX_VALUE, Double.MIN_VALUE};
+        Object[] passCase2 = {1};
+        passCases.add(Arrays.asList(passCase1));
+        passCases.add(Arrays.asList(passCase2));
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_3, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        Object[] failCase1 = {1, 5.0, -0.01, 0, "aaa"};
+        Object[] failCase2 = {"aaa"};
+        failCases.add(failCase1);
+        failCases.add(failCase2);
+        failCases.add(1);
+        failCases.add("b");
+        failCases.add(null);
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_3, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    @Test
+    public void testListEntryCustomAnnotation() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+        Object[] passCase1 = {1, 5.0, 0.01, Double.MAX_VALUE};
+        Object[] passCase2 = {1};
+        passCases.add(Arrays.asList(passCase1));
+        passCases.add(Arrays.asList(passCase2));
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_4, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        Object[] failCase1 = {1, 5.0, -0.01, 3.0};
+        Object[] failCase2 = {1, 5.0, -0.01, 1};
+        Object[] failCase3 = {"aaa", "bbb", "aaa"};
+        Object[] failCase4 = {1, 5.0, null, 1};
+        Object[] failCase5 = {1, 5.0, 0, 1};
+
+        failCases.add(Arrays.asList(failCase1));
+        failCases.add(Arrays.asList(failCase2));
+        failCases.add(Arrays.asList(failCase3));
+        failCases.add(Arrays.asList(failCase4));
+        failCases.add(Arrays.asList(failCase5));
+        failCases.add(1);
+        failCases.add("b");
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_4, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
+    }
+
+    public class TestConfig extends HashMap<String, Object> {
+        @isMapEntryType(keyType = String.class, valueType = Integer.class)
+        public static final String TEST_MAP_CONFIG = "test.map.config";
+
+        @isMapEntryCustom(
+                keyValidatorClasses = {StringValidator.class},
+                valueValidatorClasses = {PositiveNumberValidator.class, IntegerValidator.class})
+        public static final String TEST_MAP_CONFIG_2 = "test.map.config.2";
+
+        @isListEntryType(type = Number.class)
+        @NotNull
+        public static final String TEST_MAP_CONFIG_3 = "test.map.config.3";
+
+        @isListEntryCustom(
+                entryValidatorClasses = {PositiveNumberValidator.class, NotNullValidator.class})
+        @isNoDuplicateInList
+        public static final String TEST_MAP_CONFIG_4 = "test.map.config.4";
+    }
+}


[2/5] storm git commit: [STORM-1084] - Improve Storm config validation process to use java annotations instead of *_SCHEMA format

Posted by bo...@apache.org.
[STORM-1084] - Improve Storm config validation process to use java annotations instead of *_SCHEMA format


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7f0882f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7f0882f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7f0882f

Branch: refs/heads/master
Commit: c7f0882f79fa76cfc94a913bf28eb905471f3fc3
Parents: 9fe97b6
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Oct 15 10:26:53 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Oct 15 10:26:53 2015 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/config.clj    |  53 +-
 storm-core/src/jvm/backtype/storm/Config.java   | 521 ++++++++---------
 .../storm/validation/ConfigValidation.java      | 523 +++++++++++++++++
 .../validation/ConfigValidationAnnotations.java | 216 +++++++
 .../storm/validation/ConfigValidationUtils.java | 175 ++++++
 .../test/clj/backtype/storm/config_test.clj     | 186 ------
 .../clj/backtype/storm/serialization_test.clj   |  14 +-
 .../jvm/backtype/storm/TestConfigValidate.java  | 565 +++++++++++++++++++
 8 files changed, 1756 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 57471f4..0d4f1e6 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -17,8 +17,9 @@
 (ns backtype.storm.config
   (:import [java.io FileReader File IOException]
            [backtype.storm.generated StormTopology])
-  (:import [backtype.storm Config ConfigValidation$FieldValidator])
+  (:import [backtype.storm Config])
   (:import [backtype.storm.utils Utils LocalState])
+  (:import [backtype.storm.validation ConfigValidation])
   (:import [org.apache.commons.io FileUtils])
   (:require [clojure [string :as str]])
   (:use [backtype.storm log util]))
@@ -28,7 +29,7 @@
 (defn- clojure-config-name [name]
   (.replace (.toUpperCase name) "_" "-"))
 
-;; define clojure constants for every configuration parameter
+; define clojure constants for every configuration parameter
 (doseq [f (seq (.getFields Config))]
   (let [name (.getName f)
         new-name (clojure-config-name name)]
@@ -39,35 +40,6 @@
   (dofor [f (seq (.getFields Config))]
          (.get f nil)))
 
-(defmulti get-FieldValidator class-selector)
-
-(defmethod get-FieldValidator nil [_]
-  (throw (IllegalArgumentException. "Cannot validate a nil field.")))
-
-(defmethod get-FieldValidator
-  ConfigValidation$FieldValidator [validator] validator)
-
-(defmethod get-FieldValidator Object
-  [klass]
-  {:pre [(not (nil? klass))]}
-  (reify ConfigValidation$FieldValidator
-    (validateField [this name v]
-                   (if (and (not (nil? v))
-                            (not (instance? klass v)))
-                     (throw (IllegalArgumentException.
-                              (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
-
-;; Create a mapping of config-string -> validator
-;; Config fields must have a _SCHEMA field defined
-(def CONFIG-SCHEMA-MAP
-  (->> (.getFields Config)
-       (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
-       (map (fn [f] [(.get f nil)
-                     (get-FieldValidator
-                       (-> Config
-                           (.getField (str (.getName f) "_SCHEMA"))
-                           (.get nil)))]))
-       (into {})))
 
 (defn cluster-mode
   [conf & args]
@@ -92,30 +64,13 @@
   [conf]
   (even-sampler (sampling-rate conf)))
 
-; storm.zookeeper.servers:
-;     - "server1"
-;     - "server2"
-;     - "server3"
-; nimbus.host: "master"
-;
-; ########### These all have default values as shown
-;
-; ### storm.* configs are general configurations
-; # the local dir is where jars are kept
-; storm.local.dir: "/mnt/storm"
-; storm.zookeeper.port: 2181
-; storm.zookeeper.root: "/storm"
-
 (defn read-default-config
   []
   (clojurify-structure (Utils/readDefaultConfig)))
 
 (defn validate-configs-with-schemas
   [conf]
-  (doseq [[k v] conf
-          :let [schema (CONFIG-SCHEMA-MAP k)]]
-    (if (not (nil? schema))
-      (.validateField schema k v))))
+  (ConfigValidation/validateFields conf))
 
 (defn read-storm-config
   []

http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fcdc8ad..a521b10 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -19,7 +19,10 @@ package backtype.storm;
 
 import backtype.storm.serialization.IKryoDecorator;
 import backtype.storm.serialization.IKryoFactory;
+import backtype.storm.validation.ConfigValidationAnnotations.*;
+import backtype.storm.validation.ConfigValidation.*;
 import com.esotericsoftware.kryo.Serializer;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -32,7 +35,7 @@ import java.util.Map;
  * serializations.
  *
  * <p>This class also provides constants for all the configurations possible on
- * a Storm cluster and Storm topology. Each constant is paired with a schema
+ * a Storm cluster and Storm topology. Each constant is paired with an annotation
  * that defines the validity criterion of the corresponding field. Default
  * values for these configs can be found in defaults.yaml.</p>
  *
@@ -50,125 +53,125 @@ public class Config extends HashMap<String, Object> {
      * This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for
      * the user Nimbus and Supervisors use to authenticate with ZK.
      */
+    @isString
     public static final String STORM_ZOOKEEPER_SUPERACL = "storm.zookeeper.superACL";
-    public static final Object STORM_ZOOKEEPER_SUPERACL_SCHEMA = String.class;
 
     /**
      * The transporter for communication among Storm tasks
      */
+    @isString
     public static final String STORM_MESSAGING_TRANSPORT = "storm.messaging.transport";
-    public static final Object STORM_MESSAGING_TRANSPORT_SCHEMA = String.class;
 
     /**
      * Netty based messaging: The buffer size for send/recv buffer
      */
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
-    public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
      */
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_SOCKET_BACKLOG = "storm.messaging.netty.socket.backlog";
-    public static final Object STORM_MESSAGING_NETTY_SOCKET_BACKLOG_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
      *@deprecated "Since netty clients should never stop reconnecting - this does not make sense anymore.
      */
     @Deprecated
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
-    public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
-    public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The max # of milliseconds that a peer will wait.
      */
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
-    public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The # of worker threads for the server.
      */
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
-    public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The # of worker threads for the client.
      */
+    @isInteger
     public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
-    public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
      */
+    @isInteger
     public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
-    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * We check with this interval that whether the Netty channel is writable and try to write pending messages
      */
+    @isInteger
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
-    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: Is authentication required for Netty messaging from client worker process to server worker process.
      */
+    @isBoolean
     public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication";
-    public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
 
     /**
      * The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk.
      * This is NOT used for compressing serialized tuples sent between topologies.
      */
+    @isString
     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
-    public static final Object STORM_META_SERIALIZATION_DELEGATE_SCHEMA = String.class;
 
     /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
+    @isStringList
     public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
-    public static final Object STORM_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * The port Storm will use to connect to each of the ZooKeeper servers.
      */
+    @isInteger
     public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
-    public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A directory on the local filesystem used by Storm for any local
      * filesystem usage it needs. The directory must exist and the Storm daemons must
      * have permission to read/write from this location.
      */
+    @isString
     public static final String STORM_LOCAL_DIR = "storm.local.dir";
-    public static final Object STORM_LOCAL_DIR_SCHEMA = String.class;
 
     /**
      * A directory that holds configuration files for log4j2.
      * It can be either a relative or an absolute directory.
      * If relative, it is relative to the storm's home directory.
      */
+    @isString
     public static final String STORM_LOG4J2_CONF_DIR = "storm.log4j2.conf.dir";
-    public static final Object STORM_LOG4J2_CONF_DIR_SCHEMA = String.class;
 
     /**
      * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
      *
      * If this is not set, a default system scheduler will be used.
      */
+    @isString
     public static final String STORM_SCHEDULER = "storm.scheduler";
-    public static final Object STORM_SCHEDULER_SCHEMA = String.class;
 
     /**
      * The mode this Storm cluster is running in. Either "distributed" or "local".
      */
+    @isString
     public static final String STORM_CLUSTER_MODE = "storm.cluster.mode";
-    public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
     /**
      * What Network Topography detection classes should we use.
@@ -176,8 +179,8 @@ public class Config extends HashMap<String, Object> {
      * rack names that correspond to the supervisors. This information is stored in Cluster.java, and
      * is used in the resource aware scheduler.
      */
+    @isString
     public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
-    public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class;
 
     /**
      * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
@@ -187,54 +190,54 @@ public class Config extends HashMap<String, Object> {
      * can utilize to find each other based on hostname got from calls to
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
      */
+    @isString
     public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
-    public static final Object STORM_LOCAL_HOSTNAME_SCHEMA = String.class;
 
     /**
      * The plugin that will convert a principal to a local user.
      */
+    @isString
     public static final String STORM_PRINCIPAL_TO_LOCAL_PLUGIN = "storm.principal.tolocal";
-    public static final Object STORM_PRINCIPAL_TO_LOCAL_PLUGIN_SCHEMA = String.class;
 
     /**
      * The plugin that will provide user groups service
      */
+    @isString
     public static final String STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN = "storm.group.mapping.service";
-    public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
 
     /**
      * Max no.of seconds group mapping service will cache user groups
      */
+    @isNumber
     public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
-    public static final Object STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS_SCHEMA = Number.class;
 
     /**
      * Initialization parameters for the group mapping service plugin.
      * Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN}
      * implementation to access optional settings.
      */
+    @isType(type=Map.class)
     public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params";
-    public static final Object STORM_GROUP_MAPPING_SERVICE_PARAMS_SCHEMA = Map.class;
 
     /**
      * The default transport plug-in for Thrift client/server communication
      */
+    @isString
     public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
-    public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
      * The serializer class for ListDelegate (tuple payload).
      * The default serializer will be ListDelegateSerializer
      */
+    @isString
     public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
-    public static final Object TOPOLOGY_TUPLE_SERIALIZER_SCHEMA = String.class;
 
     /**
      * Try to serialize all tuples, even for local transfers.  This should only be used
      * for testing, as a sanity check that all of your tuples are setup properly.
      */
+    @isBoolean
     public static final String TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE = "topology.testing.always.try.serialize";
-    public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class;
 
     /**
      * Whether or not to use ZeroMQ for messaging in local mode. If this is set
@@ -244,50 +247,50 @@ public class Config extends HashMap<String, Object> {
      *
      * Defaults to false.
      */
+    @isBoolean
     public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
-    public static final Object STORM_LOCAL_MODE_ZMQ_SCHEMA = Boolean.class;
 
     /**
      * The root location at which Storm stores data in ZooKeeper.
      */
+    @isString
     public static final String STORM_ZOOKEEPER_ROOT = "storm.zookeeper.root";
-    public static final Object STORM_ZOOKEEPER_ROOT_SCHEMA = String.class;
 
     /**
      * The session timeout for clients to ZooKeeper.
      */
+    @isInteger
     public static final String STORM_ZOOKEEPER_SESSION_TIMEOUT = "storm.zookeeper.session.timeout";
-    public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The connection timeout for clients to ZooKeeper.
      */
+    @isInteger
     public static final String STORM_ZOOKEEPER_CONNECTION_TIMEOUT = "storm.zookeeper.connection.timeout";
-    public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The number of times to retry a Zookeeper operation.
      */
+    @isInteger
     public static final String STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times";
-    public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The interval between retries of a Zookeeper operation.
      */
+    @isInteger
     public static final String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
-    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The ceiling of the interval between retries of a Zookeeper operation.
      */
+    @isInteger
     public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
-    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The cluster Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
      */
+    @isString
     public static final String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
-    public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = String.class;
 
     /**
      * A string representing the payload for cluster Zookeeper authentication.
@@ -298,127 +301,127 @@ public class Config extends HashMap<String, Object> {
      * This file storm-cluster-auth.yaml should then be protected with
      * appropriate permissions that deny access from workers.
      */
+    @isString
     public static final String STORM_ZOOKEEPER_AUTH_PAYLOAD="storm.zookeeper.auth.payload";
-    public static final Object STORM_ZOOKEEPER_AUTH_PAYLOAD_SCHEMA = String.class;
 
     /**
      * The topology Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
      */
+    @isString
     public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME="storm.zookeeper.topology.auth.scheme";
-    public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME_SCHEMA = String.class;
 
     /**
      * A string representing the payload for topology Zookeeper authentication. It gets serialized using UTF-8 encoding during authentication.
      */
+    @isString
     public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD="storm.zookeeper.topology.auth.payload";
-    public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD_SCHEMA = String.class;
 
     /**
      * The id assigned to a running topology. The id is the storm name with a unique nonce appended.
      */
+    @isString
     public static final String STORM_ID = "storm.id";
-    public static final Object STORM_ID_SCHEMA = String.class;
 
     /**
      * The number of times to retry a Nimbus operation.
      */
+    @isNumber
     public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
-    public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
 
     /**
      * The starting interval between exponential backoff retries of a Nimbus operation.
      */
+    @isNumber
     public static final String STORM_NIMBUS_RETRY_INTERVAL="storm.nimbus.retry.interval.millis";
-    public static final Object STORM_NIMBUS_RETRY_INTERVAL_SCHEMA = Number.class;
 
     /**
      * The ceiling of the interval between retries of a client connect to Nimbus operation.
      */
+    @isNumber
     public static final String STORM_NIMBUS_RETRY_INTERVAL_CEILING="storm.nimbus.retry.intervalceiling.millis";
-    public static final Object STORM_NIMBUS_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
 
     /**
      * The Nimbus transport plug-in for Thrift client/server communication
      */
+    @isString
     public static final String NIMBUS_THRIFT_TRANSPORT_PLUGIN = "nimbus.thrift.transport";
-    public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
      * The host that the master server is running on, added only for backward compatibility,
      * the usage deprecated in favor of nimbus.seeds config.
      */
     @Deprecated
+    @isString
     public static final String NIMBUS_HOST = "nimbus.host";
-    public static final Object NIMBUS_HOST_SCHEMA = String.class;
 
     /**
      * List of seed nimbus hosts to use for leader nimbus discovery.
      */
+    @isStringList
     public static final String NIMBUS_SEEDS = "nimbus.seeds";
-    public static final Object NIMBUS_SEEDS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * Which port the Thrift interface of Nimbus should run on. Clients should
      * connect to this port to upload jars and submit topologies.
      */
+    @isInteger
     public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
-    public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The number of threads that should be used by the nimbus thrift server.
      */
+    @isNumber
     public static final String NIMBUS_THRIFT_THREADS = "nimbus.thrift.threads";
-    public static final Object NIMBUS_THRIFT_THREADS_SCHEMA = Number.class;
 
     /**
      * A list of users that are cluster admins and can run any command.  To use this set
      * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
+    @isStringList
     public static final String NIMBUS_ADMINS = "nimbus.admins";
-    public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of users that are the only ones allowed to run user operation on storm cluster.
      * To use this set nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
+    @isStringList
     public static final String NIMBUS_USERS = "nimbus.users";
-    public static final Object NIMBUS_USERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of groups , users belong to these groups are the only ones allowed to run user operation on storm cluster.
      * To use this set nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
+    @isStringList
     public static final String NIMBUS_GROUPS = "nimbus.groups";
-    public static final Object NIMBUS_GROUPS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of users that run the supervisors and should be authorized to interact with
      * nimbus as a supervisor would.  To use this set
      * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
+    @isStringList
     public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
-    public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * The maximum buffer size thrift should use when reading messages.
      */
+    @isInteger
     public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
-    public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * This parameter is used by the storm-deploy project to configure the
      * jvm options for the nimbus daemon.
      */
+    @isString
     public static final String NIMBUS_CHILDOPTS = "nimbus.childopts";
-    public static final Object NIMBUS_CHILDOPTS_SCHEMA = String.class;
 
 
     /**
      * How long without heartbeating a task can go before nimbus will consider the
      * task dead and reassign it to another location.
      */
+    @isInteger
     public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
-    public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -427,15 +430,15 @@ public class Config extends HashMap<String, Object> {
      * This parameter is for checking for failures when there's no explicit event like that
      * occuring.
      */
+    @isInteger
     public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
-    public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often nimbus should wake the cleanup thread to clean the inbox.
      * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
      */
+    @isInteger
     public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
-    public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
@@ -446,15 +449,15 @@ public class Config extends HashMap<String, Object> {
      * is set to).
      * @see NIMBUS_CLEANUP_FREQ_SECS
      */
+    @isInteger
     public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
-    public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long before a supervisor can go without heartbeating before nimbus considers it dead
      * and stops assigning new work to it.
      */
+    @isInteger
     public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
-    public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A special timeout used when a task is initially launched. During launch, this is the timeout
@@ -463,369 +466,368 @@ public class Config extends HashMap<String, Object> {
      * <p>A separate timeout exists for launch because there can be quite a bit of overhead
      * to launching new JVM's and configuring them.</p>
      */
+    @isInteger
     public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
-    public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not nimbus should reassign tasks if it detects that a task goes down.
      * Defaults to true, and it's not recommended to change this value.
      */
+    @isBoolean
     public static final String NIMBUS_REASSIGN = "nimbus.reassign";
-    public static final Object NIMBUS_REASSIGN_SCHEMA = Boolean.class;
 
     /**
      * During upload/download with the master, how long an upload or download connection is idle
      * before nimbus considers it dead and drops the connection.
      */
+    @isInteger
     public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
-    public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A custom class that implements ITopologyValidator that is run whenever a
      * topology is submitted. Can be used to provide business-specific logic for
      * whether topologies are allowed to run or not.
      */
+    @isString
     public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
-    public static final Object NIMBUS_TOPOLOGY_VALIDATOR_SCHEMA = String.class;
 
     /**
      * Class name for authorization plugin for Nimbus
      */
+    @isString
     public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
-    public static final Object NIMBUS_AUTHORIZER_SCHEMA = String.class;
-
 
     /**
      * Impersonation user ACL config entries.
      */
+    @isString
     public static final String NIMBUS_IMPERSONATION_AUTHORIZER = "nimbus.impersonation.authorizer";
-    public static final Object NIMBUS_IMPERSONATION_AUTHORIZER_SCHEMA = String.class;
-
 
     /**
      * Impersonation user ACL config entries.
      */
+    @isImpersonationAcl
     public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
-    public static final Object NIMBUS_IMPERSONATION_ACL_SCHEMA = ConfigValidation.MapOfStringToMapValidator;
 
     /**
      * How often nimbus should wake up to renew credentials if needed.
      */
+    @isNumber
     public static final String NIMBUS_CREDENTIAL_RENEW_FREQ_SECS = "nimbus.credential.renewers.freq.secs";
-    public static final Object NIMBUS_CREDENTIAL_RENEW_FREQ_SECS_SCHEMA = Number.class;
 
     /**
      * A list of credential renewers that nimbus should load.
      */
+    @isStringList
     public static final String NIMBUS_CREDENTIAL_RENEWERS = "nimbus.credential.renewers.classes";
-    public static final Object NIMBUS_CREDENTIAL_RENEWERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of plugins that nimbus should load during submit topology to populate
      * credentials on user's behalf.
      */
+    @isStringList
     public static final String NIMBUS_AUTO_CRED_PLUGINS = "nimbus.autocredential.plugins.classes";
-    public static final Object NIMBUS_AUTO_CRED_PLUGINS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * Storm UI binds to this host/interface.
      */
+    @isString
     public static final String UI_HOST = "ui.host";
-    public static final Object UI_HOST_SCHEMA = String.class;
 
     /**
      * Storm UI binds to this port.
      */
+    @isInteger
     public static final String UI_PORT = "ui.port";
-    public static final Object UI_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * HTTP UI port for log viewer
      */
+    @isInteger
     public static final String LOGVIEWER_PORT = "logviewer.port";
-    public static final Object LOGVIEWER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Childopts for log viewer java process.
      */
+    @isString
     public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";
-    public static final Object LOGVIEWER_CHILDOPTS_SCHEMA = String.class;
 
     /**
      * How often to clean up old log files
      */
+    @isInteger
+    @isPositiveNumber
     public static final String LOGVIEWER_CLEANUP_INTERVAL_SECS = "logviewer.cleanup.interval.secs";
-    public static final Object LOGVIEWER_CLEANUP_INTERVAL_SECS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
 
     /**
      * How many minutes since a log was last modified for the log to be considered for clean-up
      */
+    @isInteger
+    @isPositiveNumber
     public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins";
-    public static final Object LOGVIEWER_CLEANUP_AGE_MINS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
 
     /**
      * Storm Logviewer HTTPS port
      */
+    @isNumber
     public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";
-    public static final Object LOGVIEWER_HTTPS_PORT_SCHEMA = Number.class;
 
     /**
      * Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_KEYSTORE_PATH = "logviewer.https.keystore.path";
-    public static final Object LOGVIEWER_HTTPS_KEYSTORE_PATH_SCHEMA = String.class;
 
     /**
      * Password for the keystore for HTTPS for Storm Logviewer
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_KEYSTORE_PASSWORD = "logviewer.https.keystore.password";
-    public static final Object LOGVIEWER_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class;
 
     /**
      * Type of the keystore for HTTPS for Storm Logviewer.
      * see http://docs.oracle.com/javase/8/docs/api/java/security/KeyStore.html for more details.
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_KEYSTORE_TYPE = "logviewer.https.keystore.type";
-    public static final Object LOGVIEWER_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
 
     /**
      * Password to the private key in the keystore for settting up HTTPS (SSL).
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password";
-    public static final Object LOGVIEWER_HTTPS_KEY_PASSWORD_SCHEMA = String.class;
 
     /**
      * Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PATH = "logviewer.https.truststore.path";
-    public static final Object LOGVIEWER_HTTPS_TRUSTSTORE_PATH_SCHEMA = String.class;
 
     /**
      * Password for the truststore for HTTPS for Storm Logviewer
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD = "logviewer.https.truststore.password";
-    public static final Object LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class;
 
     /**
      * Type of the truststore for HTTPS for Storm Logviewer.
      * see http://docs.oracle.com/javase/8/docs/api/java/security/Truststore.html for more details.
      */
+    @isString
     public static final String LOGVIEWER_HTTPS_TRUSTSTORE_TYPE = "logviewer.https.truststore.type";
-    public static final Object LOGVIEWER_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
 
     /**
      * Password to the truststore used by Storm Logviewer settting up HTTPS (SSL).
      */
+    @isBoolean
     public static final String LOGVIEWER_HTTPS_WANT_CLIENT_AUTH = "logviewer.https.want.client.auth";
-    public static final Object LOGVIEWER_HTTPS_WANT_CLIENT_AUTH_SCHEMA = Boolean.class;
 
+    @isBoolean
     public static final String LOGVIEWER_HTTPS_NEED_CLIENT_AUTH = "logviewer.https.need.client.auth";
-    public static final Object LOGVIEWER_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class;
 
     /**
      * A list of users allowed to view logs via the Log Viewer
      */
+    @isStringList
     public static final String LOGS_USERS = "logs.users";
-    public static final Object LOGS_USERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of groups allowed to view logs via the Log Viewer
      */
+    @isStringList
     public static final String LOGS_GROUPS = "logs.groups";
-    public static final Object LOGS_GROUPS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * Appender name used by log viewer to determine log directory.
      */
+    @isString
     public static final String LOGVIEWER_APPENDER_NAME = "logviewer.appender.name";
-    public static final Object LOGVIEWER_APPENDER_NAME_SCHEMA = String.class;
 
     /**
      * Childopts for Storm UI Java process.
      */
+    @isString
     public static final String UI_CHILDOPTS = "ui.childopts";
-    public static final Object UI_CHILDOPTS_SCHEMA = String.class;
 
     /**
      * A class implementing javax.servlet.Filter for authenticating/filtering UI requests
      */
+    @isString
     public static final String UI_FILTER = "ui.filter";
-    public static final Object UI_FILTER_SCHEMA = String.class;
 
     /**
      * Initialization parameters for the javax.servlet.Filter
      */
+    @isType(type=Map.class)
     public static final String UI_FILTER_PARAMS = "ui.filter.params";
-    public static final Object UI_FILTER_PARAMS_SCHEMA = Map.class;
 
     /**
      * The size of the header buffer for the UI in bytes
      */
+    @isNumber
     public static final String UI_HEADER_BUFFER_BYTES = "ui.header.buffer.bytes";
-    public static final Object UI_HEADER_BUFFER_BYTES_SCHEMA = Number.class;
 
     /**
      * This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
      */
+    @isNumber
     public static final String UI_HTTPS_PORT = "ui.https.port";
-    public static final Object UI_HTTPS_PORT_SCHEMA = Number.class;
 
     /**
      * Path to the keystore used by Storm UI for setting up HTTPS (SSL).
      */
+    @isString
     public static final String UI_HTTPS_KEYSTORE_PATH = "ui.https.keystore.path";
-    public static final Object UI_HTTPS_KEYSTORE_PATH_SCHEMA = String.class;
 
     /**
      * Password to the keystore used by Storm UI for setting up HTTPS (SSL).
      */
+    @isString
     public static final String UI_HTTPS_KEYSTORE_PASSWORD = "ui.https.keystore.password";
-    public static final Object UI_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class;
 
     /**
      * Type of keystore used by Storm UI for setting up HTTPS (SSL).
      * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
      */
+    @isString
     public static final String UI_HTTPS_KEYSTORE_TYPE = "ui.https.keystore.type";
-    public static final Object UI_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
 
     /**
      * Password to the private key in the keystore for settting up HTTPS (SSL).
      */
+    @isString
     public static final String UI_HTTPS_KEY_PASSWORD = "ui.https.key.password";
-    public static final Object UI_HTTPS_KEY_PASSWORD_SCHEMA = String.class;
 
     /**
      * Path to the truststore used by Storm UI settting up HTTPS (SSL).
      */
+    @isString
     public static final String UI_HTTPS_TRUSTSTORE_PATH = "ui.https.truststore.path";
-    public static final Object UI_HTTPS_TRUSTSTORE_PATH_SCHEMA = String.class;
 
     /**
      * Password to the truststore used by Storm UI settting up HTTPS (SSL).
      */
+    @isString
     public static final String UI_HTTPS_TRUSTSTORE_PASSWORD = "ui.https.truststore.password";
-    public static final Object UI_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class;
 
     /**
      * Type of truststore used by Storm UI for setting up HTTPS (SSL).
      * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
      */
+    @isString
     public static final String UI_HTTPS_TRUSTSTORE_TYPE = "ui.https.truststore.type";
-    public static final Object UI_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
 
     /**
      * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
      */
+    @isBoolean
     public static final String UI_HTTPS_WANT_CLIENT_AUTH = "ui.https.want.client.auth";
-    public static final Object UI_HTTPS_WANT_CLIENT_AUTH_SCHEMA = Boolean.class;
 
+    @isBoolean
     public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
-    public static final Object UI_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class;
-
 
     /**
      * List of DRPC servers so that the DRPCSpout knows who to talk to.
      */
+    @isStringList
     public static final String DRPC_SERVERS = "drpc.servers";
-    public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
      */
+    @isNumber
     public static final String DRPC_HTTP_PORT = "drpc.http.port";
-    public static final Object DRPC_HTTP_PORT_SCHEMA = Number.class;
 
     /**
      * This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
      */
+    @isNumber
     public static final String DRPC_HTTPS_PORT = "drpc.https.port";
-    public static final Object DRPC_HTTPS_PORT_SCHEMA = Number.class;
 
     /**
      * Path to the keystore used by Storm DRPC for setting up HTTPS (SSL).
      */
+    @isString
     public static final String DRPC_HTTPS_KEYSTORE_PATH = "drpc.https.keystore.path";
-    public static final Object DRPC_HTTPS_KEYSTORE_PATH_SCHEMA = String.class;
 
     /**
      * Password to the keystore used by Storm DRPC for setting up HTTPS (SSL).
      */
+    @isString
     public static final String DRPC_HTTPS_KEYSTORE_PASSWORD = "drpc.https.keystore.password";
-    public static final Object DRPC_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class;
 
     /**
      * Type of keystore used by Storm DRPC for setting up HTTPS (SSL).
      * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
      */
+    @isString
     public static final String DRPC_HTTPS_KEYSTORE_TYPE = "drpc.https.keystore.type";
-    public static final Object DRPC_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
 
     /**
      * Password to the private key in the keystore for settting up HTTPS (SSL).
      */
+    @isString
     public static final String DRPC_HTTPS_KEY_PASSWORD = "drpc.https.key.password";
-    public static final Object DRPC_HTTPS_KEY_PASSWORD_SCHEMA = String.class;
 
     /**
      * Path to the truststore used by Storm DRPC settting up HTTPS (SSL).
      */
+    @isString
     public static final String DRPC_HTTPS_TRUSTSTORE_PATH = "drpc.https.truststore.path";
-    public static final Object DRPC_HTTPS_TRUSTSTORE_PATH_SCHEMA = String.class;
 
     /**
      * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
      */
+    @isString
     public static final String DRPC_HTTPS_TRUSTSTORE_PASSWORD = "drpc.https.truststore.password";
-    public static final Object DRPC_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class;
 
     /**
      * Type of truststore used by Storm DRPC for setting up HTTPS (SSL).
      * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
      */
+    @isString
     public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = "drpc.https.truststore.type";
-    public static final Object DRPC_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
 
     /**
      * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
      */
+    @isBoolean
     public static final String DRPC_HTTPS_WANT_CLIENT_AUTH = "drpc.https.want.client.auth";
-    public static final Object DRPC_HTTPS_WANT_CLIENT_AUTH_SCHEMA = Boolean.class;
 
+    @isBoolean
     public static final String DRPC_HTTPS_NEED_CLIENT_AUTH = "drpc.https.need.client.auth";
-    public static final Object DRPC_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class;
 
     /**
      * The DRPC transport plug-in for Thrift client/server communication
      */
+    @isString
     public static final String DRPC_THRIFT_TRANSPORT_PLUGIN = "drpc.thrift.transport";
-    public static final Object DRPC_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
      * This port is used by Storm DRPC for receiving DPRC requests from clients.
      */
+    @isInteger
     public static final String DRPC_PORT = "drpc.port";
-    public static final Object DRPC_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Class name for authorization plugin for DRPC client
      */
+    @isString
     public static final String DRPC_AUTHORIZER = "drpc.authorizer";
-    public static final Object DRPC_AUTHORIZER_SCHEMA = String.class;
 
     /**
      * The Access Control List for the DRPC Authorizer.
      * @see DRPCSimpleAclAuthorizer
      */
+    @isType(type=Map.class)
     public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
-    public static final Object DRPC_AUTHORIZER_ACL_SCHEMA = Map.class;
 
     /**
      * File name of the DRPC Authorizer ACL.
      * @see DRPCSimpleAclAuthorizer
      */
+    @isString
     public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename";
-    public static final Object DRPC_AUTHORIZER_ACL_FILENAME_SCHEMA = String.class;
 
     /**
      * Whether the DRPCSimpleAclAuthorizer should deny requests for operations
@@ -836,128 +838,136 @@ public class Config extends HashMap<String, Object> {
      * any request for functions will be denied.
      * @see DRPCSimpleAclAuthorizer
      */
+    @isBoolean
     public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict";
-    public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
 
     /**
      * DRPC thrift server worker threads
      */
+    @isInteger
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
-    public static final Object DRPC_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum buffer size thrift should use when reading messages for DRPC.
      */
+    @isNumber
     public static final String DRPC_MAX_BUFFER_SIZE = "drpc.max_buffer_size";
-    public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
 
     /**
      * DRPC thrift server queue size
      */
+    @isInteger
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
-    public static final Object DRPC_QUEUE_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The DRPC invocations transport plug-in for Thrift client/server communication
      */
+    @isString
     public static final String DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN = "drpc.invocations.thrift.transport";
-    public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
      * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
      */
+    @isInteger
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
-    public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * DRPC invocations thrift server worker threads
      */
+    @isNumber
     public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
-    public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
 
     /**
      * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also
      * timeout based on the socket timeout on the DRPC client, and separately based on the topology message
      * timeout for the topology implementing the DRPC function.
      */
+
+    @isInteger
+    @isPositiveNumber
+    @NotNull
     public static final String DRPC_REQUEST_TIMEOUT_SECS  = "drpc.request.timeout.secs";
-    public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 
     /**
      * Childopts for Storm DRPC Java process.
      */
+    @isString
     public static final String DRPC_CHILDOPTS = "drpc.childopts";
-    public static final Object DRPC_CHILDOPTS_SCHEMA = String.class;
 
     /**
      * Class name of the HTTP credentials plugin for the UI.
      */
+    @isString
     public static final String UI_HTTP_CREDS_PLUGIN = "ui.http.creds.plugin";
-    public static final Object UI_HTTP_CREDS_PLUGIN_SCHEMA = String.class;
 
     /**
      * Class name of the HTTP credentials plugin for DRPC.
      */
+    @isString
     public static final String DRPC_HTTP_CREDS_PLUGIN = "drpc.http.creds.plugin";
-    public static final Object DRPC_HTTP_CREDS_PLUGIN_SCHEMA = String.class;
 
     /**
      * the metadata configured on the supervisor
      */
+    @isType(type=Map.class)
     public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta";
-    public static final Object SUPERVISOR_SCHEDULER_META_SCHEMA = Map.class;
+
     /**
      * A list of ports that can run workers on this supervisor. Each worker uses one port, and
      * the supervisor will only run one worker per port. Use this configuration to tune
      * how many workers run on each machine.
      */
+    @isNoDuplicateInList
+    @NotNull
+    @isListEntryCustom(entryValidatorClasses={IntegerValidator.class,PositiveNumberValidator.class})
     public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
-    public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NoDuplicateIntegersValidator;
 
     /**
      * A number representing the maximum number of workers any single topology can acquire.
      */
+    @isNumber
     public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
-    public static final Object NIMBUS_SLOTS_PER_TOPOLOGY_SCHEMA = Number.class;
 
     /**
      * A class implementing javax.servlet.Filter for DRPC HTTP requests
      */
+    @isString
     public static final String DRPC_HTTP_FILTER = "drpc.http.filter";
-    public static final Object DRPC_HTTP_FILTER_SCHEMA = String.class;
 
     /**
      * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
      * service
      */
+    @isType(type=Map.class)
     public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
-    public static final Object DRPC_HTTP_FILTER_PARAMS_SCHEMA = Map.class;
 
     /**
      * A number representing the maximum number of executors any single topology can acquire.
      */
+    @isNumber
     public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
-    public static final Object NIMBUS_EXECUTORS_PER_TOPOLOGY_SCHEMA = Number.class;
 
     /**
      * This parameter is used by the storm-deploy project to configure the
      * jvm options for the supervisor daemon.
      */
+    @isString
     public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
-    public static final Object SUPERVISOR_CHILDOPTS_SCHEMA = String.class;
 
     /**
      * How long a worker can go without heartbeating before the supervisor tries to
      * restart the worker process.
      */
+    @isInteger
+    @isPositiveNumber
+    @NotNull
     public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
-    public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 
     /**
      * How many seconds to sleep for before shutting down threads on worker
      */
+    @isInteger
     public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs";
-    public static final Object SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long a worker can go without heartbeating during the initial launch before
@@ -965,50 +975,52 @@ public class Config extends HashMap<String, Object> {
      * supervisor.worker.timeout.secs during launch because there is additional
      * overhead to starting and configuring the JVM on launch.
      */
+    @isInteger
+    @isPositiveNumber
+    @NotNull
     public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
-    public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 
     /**
      * Whether or not the supervisor should launch workers assigned to it. Defaults
      * to true -- and you should probably never change this value. This configuration
      * is used in the Storm unit tests.
      */
+    @isBoolean
     public static final String SUPERVISOR_ENABLE = "supervisor.enable";
-    public static final Object SUPERVISOR_ENABLE_SCHEMA = Boolean.class;
 
     /**
      * how often the supervisor sends a heartbeat to the master.
      */
+    @isInteger
     public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs";
-    public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
      * How often the supervisor checks the worker heartbeats to see if any of them
      * need to be restarted.
      */
+    @isInteger
     public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
-    public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Should the supervior try to run the worker as the lauching user or not.  Defaults to false.
      */
+    @isBoolean
     public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
-    public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
 
     /**
      * Full path to the worker-laucher executable that will be used to lauch workers when
      * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
      */
+    @isString
     public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
-    public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
 
     /**
      * The total amount of memory (in MiB) a supervisor is allowed to give to its workers.
      *  A default value will be set for this config if user does not override
      */
+    @isPositiveNumber
     public static final String SUPERVISOR_MEMORY_CAPACITY_MB = "supervisor.memory.capacity.mb";
-    public static final Object SUPERVISOR_MEMORY_CAPACITY_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
 
     /**
      * The total amount of CPU resources a supervisor is allowed to give to its workers.
@@ -1016,8 +1028,8 @@ public class Config extends HashMap<String, Object> {
      * using 100 makes it simple to set the desired value to the capacity measurement
      * for single threaded bolts.  A default value will be set for this config if user does not override
      */
+    @isPositiveNumber
     public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
-    public static final Object SUPERVISOR_CPU_CAPACITY_SCHEMA = ConfigValidation.PositiveNumberValidator;
 
     /**
      * The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%"
@@ -1027,35 +1039,34 @@ public class Config extends HashMap<String, Object> {
      * %TOPOLOGY-ID%    -> topology-id,
      * %WORKER-PORT% -> port.
      */
+    @isStringOrStringList
     public static final String WORKER_CHILDOPTS = "worker.childopts";
-    public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * The jvm opts provided to workers launched by this supervisor for GC. All "%ID%" substrings are replaced
      * with an identifier for this worker.  Because the JVM complains about multiple GC opts the topology
      * can override this default value by setting topology.worker.gc.childopts.
      */
+    @isStringOrStringList
     public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
-    public static final Object WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * control how many worker receiver threads we need per worker
      */
+    @isInteger
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
-    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often this worker should heartbeat to the supervisor.
      */
+    @isInteger
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
-    public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often a task should heartbeat its status to the master.
      */
+    @isInteger
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
-    public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
-
 
     /**
      * How often a task should sync its connections with other tasks (if a task is
@@ -1064,29 +1075,29 @@ public class Config extends HashMap<String, Object> {
      * almost immediately. This configuration is here just in case that notification doesn't
      * come through.
      */
+    @isInteger
     public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
-    public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often a worker should check dynamic log level timeouts for expiration.
      * For expired logger settings, the clean up polling task will reset the log levels
      * to the original levels (detected at startup), and will clean up the timeout map
      */
+    @isInteger
+    @isPositiveNumber
     public static final String WORKER_LOG_LEVEL_RESET_POLL_SECS = "worker.log.level.reset.poll.secs";
-    public static final Object WORKER_LOG_LEVEL_RESET_POLL_SECS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
 
     /**
      * How often a task should sync credentials, worst case.
      */
+    @isNumber
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
-    public static final Object TASK_CREDENTIALS_POLL_SECS_SCHEMA = Number.class;
-
 
     /**
      * Whether to enable backpressure in for a certain topology
      */
+    @isBoolean
     public static final String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable";
-    public static final Object TOPOLOGY_BACKPRESSURE_ENABLE_SCHEMA = Boolean.class;
 
     /**
      * This signifies the tuple congestion in a disruptor queue.
@@ -1094,50 +1105,50 @@ public class Config extends HashMap<String, Object> {
      * the backpressure scheme, if enabled, should slow down the tuple sending speed of
      * the spouts until reaching the low watermark.
      */
+    @isPositiveNumber
     public static final String BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK="backpressure.disruptor.high.watermark";
-    public static final Object BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
 
     /**
      * This signifies a state that a disruptor queue has left the congestion.
      * If the used ratio of a disruptor queue is lower than the low watermark,
      * it will unset the backpressure flag.
      */
+    @isPositiveNumber
     public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
-    public static final Object BACKPRESSURE_DISRUPTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
 
     /**
      * A list of users that are allowed to interact with the topology.  To use this set
      * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
+    @isStringList
     public static final String TOPOLOGY_USERS = "topology.users";
-    public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of groups that are allowed to interact with the topology.  To use this set
      * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
+    @isStringList
     public static final String TOPOLOGY_GROUPS = "topology.groups";
-    public static final Object TOPOLOGY_GROUPS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * True if Storm should timeout messages or not. Defaults to true. This is meant to be used
      * in unit tests to prevent tuples from being accidentally timed out during the test.
      */
+    @isBoolean
     public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";
-    public static final Object TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS_SCHEMA = Boolean.class;
 
     /**
      * When set to true, Storm will log every message that's emitted.
      */
+    @isBoolean
     public static final String TOPOLOGY_DEBUG = "topology.debug";
-    public static final Object TOPOLOGY_DEBUG_SCHEMA = Boolean.class;
 
     /**
      * The serializer for communication between shell components and non-JVM
      * processes
      */
+    @isString
     public static final String TOPOLOGY_MULTILANG_SERIALIZER = "topology.multilang.serializer";
-    public static final Object TOPOLOGY_MULTILANG_SERIALIZER_SCHEMA = String.class;
 
     /**
      * How many processes should be spawned around the cluster to execute this
@@ -1145,8 +1156,8 @@ public class Config extends HashMap<String, Object> {
      * them. This parameter should be used in conjunction with the parallelism hints
      * on each component in the topology to tune the performance of a topology.
      */
+    @isInteger
     public static final String TOPOLOGY_WORKERS = "topology.workers";
-    public static final Object TOPOLOGY_WORKERS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How many instances to create for a spout/bolt. A task runs on a thread with zero or more
@@ -1156,36 +1167,36 @@ public class Config extends HashMap<String, Object> {
      * without redeploying the topology or violating the constraints of Storm (such as a fields grouping
      * guaranteeing that the same value goes to the same task).
      */
+    @isInteger
     public static final String TOPOLOGY_TASKS = "topology.tasks";
-    public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler
      * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override
      */
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb";
-    public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
 
     /**
      * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler
      * to allocate slots on machines with enough available memory.  A default value will be set for this config if user does not override
      */
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb";
-    public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
 
     /**
      * The config indicates the percentage of cpu for a core an instance(executor) of a component will use.
      * Assuming the a core value to be 100, a value of 10 indicates 10% of the core.
      * The P in PCORE represents the term "physical".  A default value will be set for this config if user does not override
      */
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT = "topology.component.cpu.pcore.percent";
-    public static final Object TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
 
     /**
      * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology
      */
+    @isPositiveNumber
     public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb";
-    public static final Object TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
 
     /**
      * How many executors to spawn for ackers.
@@ -1194,8 +1205,8 @@ public class Config extends HashMap<String, Object> {
      * to be equal to the number of workers configured for this topology. If this variable is set to 0,
      * then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.</p>
      */
+    @isInteger
     public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
-    public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How many executors to spawn for event logger.
@@ -1204,8 +1215,8 @@ public class Config extends HashMap<String, Object> {
      * to be equal to the number of workers configured for this topology. If this variable is set to 0,
      * event logging will be disabled.</p>
      */
+    @isInteger
     public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
-    public static final Object TOPOLOGY_EVENTLOGGER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum amount of time given to the topology to fully process a message
@@ -1213,8 +1224,10 @@ public class Config extends HashMap<String, Object> {
      * will fail the message on the spout. Some spouts implementations will then replay
      * the message at a later time.
      */
+    @isInteger
+    @isPositiveNumber
+    @NotNull
     public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
-    public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 
     /**
      * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ),
@@ -1224,8 +1237,8 @@ public class Config extends HashMap<String, Object> {
      *
      * See Kryo's documentation for more information about writing custom serializers.
      */
+    @isKryoReg
     public static final String TOPOLOGY_KRYO_REGISTER = "topology.kryo.register";
-    public static final Object TOPOLOGY_KRYO_REGISTER_SCHEMA = ConfigValidation.KryoRegValidator;
 
     /**
      * A list of classes that customize storm's kryo instance during start-up.
@@ -1233,17 +1246,16 @@ public class Config extends HashMap<String, Object> {
      * listed class is instantiated with 0 arguments, then its 'decorate' method
      * is called with storm's kryo instance as the only argument.
      */
+    @isStringList
     public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
-    public static final Object TOPOLOGY_KRYO_DECORATORS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * Class that specifies how to create a Kryo instance for serialization. Storm will then apply
      * topology.kryo.register and topology.kryo.decorators on top of this. The default implementation
      * implements topology.fall.back.on.java.serialization and turns references off.
      */
+    @isString
     public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory";
-    public static final Object TOPOLOGY_KRYO_FACTORY_SCHEMA = String.class;
-
 
     /**
      * Whether or not Storm should skip the loading of kryo registrations for which it
@@ -1255,36 +1267,36 @@ public class Config extends HashMap<String, Object> {
      * By setting this config to true, Storm will ignore that it doesn't have those other serializations
      * rather than throw an error.
      */
+    @isBoolean
     public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations";
-    public static final Object TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS_SCHEMA = Boolean.class;
 
     /**
      * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
      * Each listed class will be routed all the metrics data generated by the storm metrics API.
      * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
      */
+
+    @isListEntryType(type=Map.class)
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
-    public static final Object TOPOLOGY_METRICS_CONSUMER_REGISTER_SCHEMA = ConfigValidation.MapsValidator;
 
     /**
      * A map of metric name to class name implementing IMetric that will be created once per worker JVM
      */
+    @isType(type=Map.class)
     public static final String TOPOLOGY_WORKER_METRICS = "topology.worker.metrics";
-    public static final Object TOPOLOGY_WORKER_METRICS_SCHEMA = Map.class;
 
     /**
      * A map of metric name to class name implementing IMetric that will be created once per worker JVM
      */
+    @isType(type=Map.class)
     public static final String WORKER_METRICS = "worker.metrics";
-    public static final Object WORKER_METRICS_SCHEMA = Map.class;
 
     /**
      * The maximum parallelism allowed for a component in this topology. This configuration is
      * typically used in testing to limit the number of threads spawned in local mode.
      */
+    @isInteger
     public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
-    public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = ConfigValidation.IntegerValidator;
-
 
     /**
      * The maximum number of tuples that can be pending on a spout task at any given time.
@@ -1294,8 +1306,8 @@ public class Config extends HashMap<String, Object> {
      * Note that this config parameter has no effect for unreliable spouts that don't tag
      * their tuples with a message id.
      */
+    @isInteger
     public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
-    public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
@@ -1304,70 +1316,72 @@ public class Config extends HashMap<String, Object> {
      * 1. nextTuple emits no tuples
      * 2. The spout has hit maxSpoutPending and can't emit any more tuples
      */
+    @isString
     public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
-    public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
 
     /**
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
+    @isInteger
     public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
-    public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum amount of time a component gives a source of state to synchronize before it requests
      * synchronization again.
      */
+    @isInteger
+    @isPositiveNumber
+    @NotNull
     public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
-    public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 
     /**
      * The percentage of tuples to sample to produce stats for a task.
      */
+    @isPositiveNumber
     public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
-    public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA =ConfigValidation.PositiveNumberValidator;
 
     /**
      * The time period that builtin metrics data in bucketed into.
      */
+    @isInteger
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
-    public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not to use Java serialization in a topology.
      */
+    @isBoolean
     public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
-    public static final Object TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION_SCHEMA = Boolean.class;
 
     /**
      * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS.
      */
+    @isStringOrStringList
     public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
-    public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS.
      */
+    @isStringOrStringList
     public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts";
-    public static final Object TOPOLOGY_WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * Topology-specific options for the logwriter process of a worker.
      */
+    @isStringOrStringList
     public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS="topology.worker.logwriter.childopts";
-    public static final Object TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * Topology-specific classpath for the worker child process. This is combined to the usual classpath.
      */
+    @isStringOrStringList
     public static final String TOPOLOGY_CLASSPATH="topology.classpath";
-    public static final Object TOPOLOGY_CLASSPATH_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * Topology-specific environment variables for the worker child process.
      * This is added to the existing environment (that of the supervisor)
      */
-     public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
-     public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class;
+    @isType(type=Map.class)
+    public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
 
     /*
      * Topology-specific option to disable/enable bolt's outgoing overflow buffer.
@@ -1376,122 +1390,119 @@ public class Config extends HashMap<String, Object> {
      * The overflow buffer can fill degrading the performance gradually,
      * eventually running out of memory.
      */
+    @isBoolean
     public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable";
-    public static final Object TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE_SCHEMA = Boolean.class;
 
     /**
      * This config is available for TransactionalSpouts, and contains the id ( a String) for
      * the transactional topology. This id is used to store the state of the transactional
      * topology in Zookeeper.
      */
+    @isString
     public static final String TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id";
-    public static final Object TOPOLOGY_TRANSACTIONAL_ID_SCHEMA = String.class;
 
     /**
      * A list of task hooks that are automatically added to every spout and bolt in the topology. An example
      * of when you'd do this is to add a hook that integrates with your internal
      * monitoring system. These hooks are instantiated using the zero-arg constructor.
      */
+    @isStringList
     public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
-    public static final Object TOPOLOGY_AUTO_TASK_HOOKS_SCHEMA = ConfigValidation.StringsValidator;
-
 
     /**
      * The size of the Disruptor receive queue for each executor. Must be a power of 2.
      */
+    @isPowerOf2
     public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
-    public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
 
     /**
      * The size of the Disruptor send queue for each executor. Must be a power of 2.
      */
+    @isPowerOf2
     public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size";
-    public static final Object TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
 
     /**
      * The size of the Disruptor transfer queue for each worker.
      */
+    @isInteger
     public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
-    public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
-   /**
-    * How often a tick tuple from the "__system" component and "__tick" stream should be sent
-    * to tasks. Meant to be used as a component-specific configuration.
-    */
+    /**
+     * How often a tick tuple from the "__system" component and "__tick" stream should be sent
+     * to tasks. Meant to be used as a component-specific configuration.
+     */
+    @isInteger
     public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
-    public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
-
 
-   /**
-    * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
-    * vs. throughput
-    */
+    /**
+     * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
+     * vs. throughput
+     */
+    @isString
     public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";
-    public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
 
-   /**
-    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
-    * via the TopologyContext.
-    */
+    /**
+     * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
+     * via the TopologyContext.
+     */
+    @isInteger
     public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
-    public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
      * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
      * reported to Zookeeper per task for every 10 second interval of time.
      */
+    @isInteger
     public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";
-    public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
      */
+    @isInteger
     public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
-    public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
-
 
     /**
      * How often a batch can be emitted in a Trident topology.
      */
+    @isInteger
     public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
-    public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Name of the topology. This config is automatically set by Storm when the topology is submitted.
      */
+    @isString
     public final static String TOPOLOGY_NAME="topology.name";
-    public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
 
     /**
      * The principal who submitted a topology
      */
+    @isString
     public final static String TOPOLOGY_SUBMITTER_PRINCIPAL = "topology.submitter.principal";
-    public static final Object TOPOLOGY_SUBMITTER_PRINCIPAL_SCHEMA = String.class;
 
     /**
      * The local user name of the user who submitted a topology.
      */
+    @isString
     public static final String TOPOLOGY_SUBMITTER_USER = "topology.submitter.user";
-    public static final Object TOPOLOGY_SUBMITTER_USER_SCHEMA = String.class;
 
     /**
      * Array of components that scheduler should try to place on separate hosts.
      */
+    @isStringList
     public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
-    public static final Object TOPOLOGY_SPREAD_COMPONENTS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * A list of IAutoCredentials that the topology should load and use.
      */
+    @isStringList
     public static final String TOPOLOGY_AUTO_CREDENTIALS = "topology.auto-credentials";
-    public static final Object TOPOLOGY_AUTO_CREDENTIALS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * Max pending tuples in one ShellBolt
      */
+    @isInteger
     public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
-    public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Topology central logging sensitivity to determine who has access to logs in central logging system.
@@ -1501,55 +1512,55 @@ public class Config extends HashMap<String, Object> {
      *   S2 - Confidential
      *   S3 - Secret (default.)
      */
+    @isString
     public static final String TOPOLOGY_LOGGING_SENSITIVITY="topology.logging.sensitivity";
-    public static final Object TOPOLOGY_LOGGING_SENSITIVITY_SCHEMA = String.class;
 
     /**
      * The root directory in ZooKeeper for metadata about TransactionalSpouts.
      */
+    @isString
     public static final String TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root";
-    public static final Object TRANSACTIONAL_ZOOKEEPER_ROOT_SCHEMA = String.class;
 
     /**
      * The list of zookeeper servers in which to keep the transactional state. If null (which is default),
      * will use storm.zookeeper.servers
      */
+    @isStringList
     public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS="transactional.zookeeper.servers";
-    public static final Object TRANSACTIONAL_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
 
     /**
      * The port to use to connect to the transactional zookeeper servers. If null (which is default),
      * will use storm.zookeeper.port
      */
+    @isInteger
     public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
-    public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The user as which the nimbus client should be acquired to perform the operation.
      */
+    @isString
     public static final String STORM_DO_AS_USER="storm.doAsUser";
-    public static final Object STORM_DO_AS_USER_SCHEMA = String.class;
 
     /**
      * The number of threads that should be used by the zeromq context in each worker process.
      */
+    @isInteger
     public static final String ZMQ_THREADS = "zmq.threads";
-    public static final Object ZMQ_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long a connection should retry sending messages to a target host when
      * the connection is closed. This is an advanced configuration and can almost
      * certainly be ignored.
      */
+    @isInteger
     public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
-    public static final Object ZMQ_LINGER_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion
      * on the networking layer.
      */
+    @isInteger
     public static final String ZMQ_HWM = "zmq.hwm";
-    public static final Object ZMQ_HWM_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
@@ -1557,59 +1568,61 @@ public class Config extends HashMap<String, Object> {
      * to look for native libraries. It is necessary to set this config correctly since
      * Storm uses the ZeroMQ and JZMQ native libs.
      */
+    @isString
     public static final String JAVA_LIBRARY_PATH = "java.library.path";
-    public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
 
     /**
      * The path to use as the zookeeper dir when running a zookeeper server via
      * "storm dev-zookeeper". This zookeeper instance is only intended for development;
      * it is not a production grade zookeeper setup.
      */
+    @isString
     public static final String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
-    public static final Object DEV_ZOOKEEPER_PATH_SCHEMA = String.class;
 
     /**
      * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
      * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
      */
+    @isMapEntryType(keyType = String.class, valueType = Number.class)
     public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
-    public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
 
     /**
      * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
+    @isMapEntryType(keyType = String.class, valueType = Number.class)
     public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools";
-    public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
 
     /**
      * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
+    @isNumber
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
-    public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
 
     /**
      * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
      * vs. CPU usage
      */
+    @isInteger
+    @isPositiveNumber
+    @NotNull
     public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
-    public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 
     /**
      * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
      * distribution.
      */
+    @isString
     public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
-    public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = String.class;
 
     /**
      * Minimum number of nimbus hosts where the code must be replicated before leader nimbus
      * is allowed to perform topology activation tasks like setting up heartbeats/assignments
      * and marking the topology as active. default is 0.
      */
+    @isNumber
     public static final String TOPOLOGY_MIN_REPLICATION_COUNT = "topology.min.replication.count";
-    public static final Object TOPOLOGY_MIN_REPLICATION_COUNT_SCHEMA = Number.class;
 
     /**
      * Maximum wait time for the nimbus host replication to achieve the nimbus.min.replication.count.
@@ -1617,14 +1630,14 @@ public class Config extends HashMap<String, Object> {
      * if required nimbus.min.replication.count is not achieved. The default is 0 seconds, a value of
      * -1 indicates to wait for ever.
      */
+    @isNumber
     public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "topology.max.replication.wait.time.sec";
-    public static final Object TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC_SCHEMA = Number.class;
 
     /**
      * How often nimbus's background thread to sync code for missing topologies should run.
      */
+    @isInteger
     public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs";
-    public static final Object NIMBUS_CODE_SYNC_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     public static void setClasspath(Map conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
@@ -1714,7 +1727,7 @@ public class Config extends HashMap<String, Object> {
     }
 
     public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
-       registerMetricsConsumer(this, klass, argument, parallelismHint);
+        registerMetricsConsumer(this, klass, argument, parallelismHint);
     }
 
     public static void registerMetricsConsumer(Map conf, Class klass, long parallelismHint) {
@@ -1754,7 +1767,7 @@ public class Config extends HashMap<String, Object> {
     }
 
     public void setSkipMissingKryoRegistrations(boolean skip) {
-       setSkipMissingKryoRegistrations(this, skip);
+        setSkipMissingKryoRegistrations(this, skip);
     }
 
     public static void setMaxTaskParallelism(Map conf, int max) {


[5/5] storm git commit: Added STORM-1084 to Changelog

Posted by bo...@apache.org.
Added STORM-1084 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54772f83
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54772f83
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54772f83

Branch: refs/heads/master
Commit: 54772f83ce68160741a76d0383c9f944eda2e058
Parents: 46d7c0e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 15 13:08:41 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Oct 15 13:08:41 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/54772f83/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0cc9686..5e17b7e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1084: Improve Storm config validation process to use java annotations instead of *_SCHEMA format
  * STORM-1106: Netty should not limit attempts to reconnect
  * STORM-1103: Changes log message to DEBUG from INFO
  * STORM-1104: Nimbus HA fails to find newly downloaded code files


[3/5] storm git commit: revision based on suggestions

Posted by bo...@apache.org.
revision based on suggestions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b5cb2ecd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b5cb2ecd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b5cb2ecd

Branch: refs/heads/master
Commit: b5cb2ecdad9d023e9691d5ef5f7dc4aaef7dca75
Parents: c7f0882
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Oct 15 10:18:14 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Oct 15 11:16:50 2015 -0500

----------------------------------------------------------------------
 .../storm/validation/ConfigValidation.java      | 228 +++++++++++--------
 .../validation/ConfigValidationAnnotations.java |  22 +-
 .../jvm/backtype/storm/TestConfigValidate.java  |  13 +-
 3 files changed, 152 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b5cb2ecd/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 2e4470c..b9244ef 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
@@ -37,11 +39,9 @@ public class ConfigValidation {
     private static final Logger LOG = LoggerFactory.getLogger(ConfigValidation.class);
 
     public static abstract class Validator {
-        public abstract void validateField(String name, Object o);
-    }
-
-    public abstract static class TypeValidator {
-        public abstract void validateField(String name, Class type, Object o);
+        public Validator(Map<String, Object> params) {}
+        public Validator() {}
+        public abstract void validateField(String name, Object o) throws InstantiationException, IllegalAccessException;
     }
 
     /**
@@ -65,9 +65,20 @@ public class ConfigValidation {
     /**
      * Validates basic types
      */
-    public static class SimpleTypeValidator extends TypeValidator {
+    public static class SimpleTypeValidator extends Validator {
+
+        private Class type;
 
-        public void validateField(String name, Class type, Object o) {
+        public SimpleTypeValidator(Map<String, Object> params) {
+            this.type = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.type, o);
+        }
+
+        public static void validateField(String name, Class type, Object o) {
             if (o == null) {
                 return;
             }
@@ -82,8 +93,7 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            SimpleTypeValidator validator = new SimpleTypeValidator();
-            validator.validateField(name, String.class, o);
+            SimpleTypeValidator.validateField(name, String.class, o);
         }
     }
 
@@ -91,8 +101,7 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            SimpleTypeValidator validator = new SimpleTypeValidator();
-            validator.validateField(name, Boolean.class, o);
+            SimpleTypeValidator.validateField(name, Boolean.class, o);
         }
     }
 
@@ -100,8 +109,7 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            SimpleTypeValidator validator = new SimpleTypeValidator();
-            validator.validateField(name, Number.class, o);
+            SimpleTypeValidator.validateField(name, Number.class, o);
         }
     }
 
@@ -109,8 +117,7 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            SimpleTypeValidator validator = new SimpleTypeValidator();
-            validator.validateField(name, Double.class, o);
+            SimpleTypeValidator.validateField(name, Double.class, o);
         }
     }
 
@@ -168,8 +175,7 @@ public class ConfigValidation {
                 return;
             }
             //check if iterable
-            SimpleTypeValidator isIterable = new SimpleTypeValidator();
-            isIterable.validateField(name, Iterable.class, field);
+            SimpleTypeValidator.validateField(name, Iterable.class, field);
             HashSet<Object> objectSet = new HashSet<Object>();
             for (Object o : (Iterable) field) {
                 if (objectSet.contains(o)) {
@@ -190,18 +196,10 @@ public class ConfigValidation {
         @Override
         public void validateField(String name, Object o) {
 
-            if (o == null) {
+            if (o == null || o instanceof String) {
+                // A null value or a String value is acceptable
                 return;
             }
-            if (o instanceof String) {
-                return;
-            }
-            //check if iterable
-            SimpleTypeValidator isIterable = new SimpleTypeValidator();
-            try {
-                isIterable.validateField(name, Iterable.class, o);
-            } catch (Exception ex) {
-            }
             this.fv.validateField(name, o);
         }
     }
@@ -263,10 +261,20 @@ public class ConfigValidation {
     /**
      * Validates each entry in a list
      */
-    public static class ListEntryTypeValidator extends TypeValidator {
+    public static class ListEntryTypeValidator extends Validator {
+
+        private Class type;
+
+        public ListEntryTypeValidator(Map<String, Object> params) {
+            this.type = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+        }
 
         @Override
-        public void validateField(String name, Class type, Object o) {
+        public void validateField(String name, Object o) {
+            validateField(name, this.type, o);
+        }
+
+        public static void validateField(String name, Class type, Object o) {
             ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
             validator.validateField(name, o);
         }
@@ -276,15 +284,26 @@ public class ConfigValidation {
      * Validates each entry in a list against a list of custom Validators
      * Each validator in the list of validators must inherit or be an instance of Validator class
      */
-    public static class ListEntryCustomValidator {
+    public static class ListEntryCustomValidator extends Validator{
 
-        public void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException {
+        private Class[] entryValidators;
+
+        public ListEntryCustomValidator(Map<String, Object> params) {
+            this.entryValidators = (Class[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
+        }
+
+        @Override
+        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException {
+
+            validateField(name, this.entryValidators, o);
+        }
+
+        public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException {
             if (o == null) {
                 return;
             }
             //check if iterable
-            SimpleTypeValidator isIterable = new SimpleTypeValidator();
-            isIterable.validateField(name, Iterable.class, o);
+            SimpleTypeValidator.validateField(name, Iterable.class, o);
             for (Object entry : (Iterable) o) {
                 for (Class validator : validators) {
                     Object v = validator.newInstance();
@@ -301,9 +320,22 @@ public class ConfigValidation {
     /**
      * validates each key and value in a map of a certain type
      */
-    public static class MapEntryTypeValidator {
+    public static class MapEntryTypeValidator extends Validator{
+
+        private Class keyType;
+        private Class valueType;
 
-        public void validateField(String name, Class keyType, Class valueType, Object o) {
+        public MapEntryTypeValidator(Map<String, Object> params) {
+            this.keyType = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
+            this.valueType = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            validateField(name, this.keyType, this.valueType, o);
+        }
+
+        public static void validateField(String name, Class keyType, Class valueType, Object o) {
             ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
             validator.validateField(name, o);
         }
@@ -312,15 +344,27 @@ public class ConfigValidation {
     /**
      * validates each key and each value against the respective arrays of validators
      */
-    public static class MapEntryCustomValidator {
+    public static class MapEntryCustomValidator extends Validator{
+
+        private Class[] keyValidators;
+        private Class[] valueValidators;
+
+        public MapEntryCustomValidator(Map<String, Object> params) {
+            this.keyValidators = (Class []) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
+            this.valueValidators = (Class []) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+        }
 
-        public void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException {
+        @Override
+        public void validateField(String name, Object o) throws InstantiationException, IllegalAccessException {
+            validateField(name, this.keyValidators, this.valueValidators, o);
+        }
+
+        public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException {
             if (o == null) {
                 return;
             }
             //check if Map
-            SimpleTypeValidator isMap = new SimpleTypeValidator();
-            isMap.validateField(name, Map.class, o);
+            SimpleTypeValidator.validateField(name, Map.class, o);
             for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
                 for (Class kv : keyValidators) {
                     Object keyValidator = kv.newInstance();
@@ -347,12 +391,22 @@ public class ConfigValidation {
      */
     public static class PositiveNumberValidator extends Validator{
 
+        private boolean includeZero;
+
+        public PositiveNumberValidator() {
+            this.includeZero = false;
+        }
+
+        public PositiveNumberValidator(Map<String, Object> params) {
+            this.includeZero = (boolean) params.get(ConfigValidationAnnotations.ValidatorParams.INCLUDE_ZERO);
+        }
+
         @Override
         public void validateField(String name, Object o) {
-            validateField(name, false, o);
+            validateField(name, this.includeZero, o);
         }
 
-        public void validateField(String name, boolean includeZero, Object o) {
+        public static void validateField(String name, boolean includeZero, Object o) {
             if (o == null) {
                 return;
             }
@@ -409,6 +463,7 @@ public class ConfigValidation {
         if (annotations.length == 0) {
             LOG.warn("Field {} does not have validator annotation", field);
         }
+
         for (Annotation annotation : annotations) {
             String type = annotation.annotationType().getName();
             Class validatorClass = null;
@@ -424,62 +479,20 @@ public class ConfigValidation {
                 Object v = validatorClass.cast(annotation);
                 String key = (String) field.get(null);
                 Class clazz = (Class) validatorClass
-                        .getMethod(ConfigValidationAnnotations.VALIDATOR_CLASS).invoke(v);
-                //Determining which method from which class should be invoked based on what fields/parameters the annotation has
-                if (hasMethod(validatorClass, ConfigValidationAnnotations.TYPE)) {
-
-                    TypeValidator o = ((Class<TypeValidator>) clazz).newInstance();
-
-                    Class objectType = (Class) validatorClass.getMethod(ConfigValidationAnnotations.TYPE).invoke(v);
-
-                    o.validateField(field.getName(), objectType, conf.get(key));
-
-                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.ENTRY_VALIDATOR_CLASSES)) {
-
-                    ListEntryCustomValidator o = ((Class<ListEntryCustomValidator>) clazz).newInstance();
-
-                    Class[] entryValidators = (Class[]) validatorClass.getMethod(ConfigValidationAnnotations.ENTRY_VALIDATOR_CLASSES).invoke(v);
-
-                    o.validateField(field.getName(), entryValidators, conf.get(key));
-
-                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.KEY_VALIDATOR_CLASSES)
-                        && hasMethod(validatorClass, ConfigValidationAnnotations.VALUE_VALIDATOR_CLASSES)) {
-
-                    MapEntryCustomValidator o = ((Class<MapEntryCustomValidator>) clazz).newInstance();
-
-                    Class[] keyValidators = (Class[]) validatorClass.getMethod(ConfigValidationAnnotations.KEY_VALIDATOR_CLASSES).invoke(v);
-
-                    Class[] valueValidators = (Class[]) validatorClass.getMethod(ConfigValidationAnnotations.VALUE_VALIDATOR_CLASSES).invoke(v);
-
-                    o.validateField(field.getName(), keyValidators, valueValidators, conf.get(key));
-
-                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.KEY_TYPE)
-                        && hasMethod(validatorClass, ConfigValidationAnnotations.VALUE_TYPE)) {
-
-                    MapEntryTypeValidator o = ((Class<MapEntryTypeValidator>) clazz).newInstance();
-
-                    Class keyType = (Class) validatorClass.getMethod(ConfigValidationAnnotations.KEY_TYPE).invoke(v);
-
-                    Class valueType = (Class) validatorClass.getMethod(ConfigValidationAnnotations.VALUE_TYPE).invoke(v);
-
-                    o.validateField(field.getName(), keyType, valueType, conf.get(key));
-
-                } else if (hasMethod(validatorClass, ConfigValidationAnnotations.INCLUDE_ZERO)) {
-
-                    PositiveNumberValidator o = ((Class<PositiveNumberValidator>) clazz).newInstance();
-
-                    Boolean includeZero = (Boolean) validatorClass.getMethod(ConfigValidationAnnotations.INCLUDE_ZERO).invoke(v);
-
-                    o.validateField(field.getName(), includeZero, conf.get(key));
-
+                        .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
+                Validator o = null;
+                Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
+                //two constructor signatures used to initialize validators.
+                //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
+                //If validator has a constructor that takes a Map as an argument call that constructor
+                if(hasConstructor(clazz, Map.class)) {
+                    o  = (Validator) clazz.getConstructor(Map.class).newInstance(params);
                 }
-                //For annotations that does not have any additional fields. Call corresponding validateField method
+                //If not call default constructor
                 else {
-
-                    ConfigValidation.Validator o = ((Class<ConfigValidation.Validator>) clazz).newInstance();
-
-                    o.validateField(field.getName(), conf.get(key));
+                    o  = (((Class<Validator>) clazz).newInstance());
                 }
+                o.validateField(field.getName(), conf.get(key));
             }
         }
     }
@@ -512,6 +525,33 @@ public class ConfigValidation {
         }
     }
 
+    private static Map<String,Object> getParamsFromAnnotation(Class validatorClass, Object v) throws InvocationTargetException, IllegalAccessException {
+        Map<String, Object> params = new HashMap<String, Object>();
+        for(Method method : validatorClass.getDeclaredMethods()) {
+
+            Object value = null;
+            try {
+                value = (Object) method.invoke(v);
+            } catch (IllegalArgumentException ex) {
+                value = null;
+            }
+            if(value != null) {
+                params.put(method.getName(), value);
+            }
+        }
+        return params;
+    }
+
+    private static boolean hasConstructor(Class clazz, Class paramClass) {
+        Class[] classes = {paramClass};
+        try {
+            clazz.getConstructor(classes);
+        } catch (NoSuchMethodException e) {
+            return false;
+        }
+        return true;
+    }
+
     private static boolean hasMethod(Class clazz, String method) {
         try {
             clazz.getMethod(method);

http://git-wip-us.apache.org/repos/asf/storm/blob/b5cb2ecd/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index be3b665..0b64479 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -29,21 +29,23 @@ import java.lang.annotation.RetentionPolicy;
  * For every annotation there must validator class to do the validation
  * To add another annotation for config validation, add another annotation @interface class.  Implement the corresponding
  * validator logic in a class in ConfigValidation.  Make sure validateField method in ConfigValidation knows how to use the validator
- * and which method definition/parameters to pass in based on what fields are in the annotation.
+ * and which method definition/parameters to pass in based on what fields are in the annotation.  By default, params of annotations
+ * will be passed into a constructor that takes a Map as a parameter.
  */
 public class ConfigValidationAnnotations {
     /**
      * Field names for annotations
      */
-
-    static final String VALIDATOR_CLASS = "validatorClass";
-    static final String TYPE = "type";
-    static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
-    static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
-    static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
-    static final String KEY_TYPE = "keyType";
-    static final String VALUE_TYPE = "valueType";
-    static final String INCLUDE_ZERO = "includeZero";
+    public static class ValidatorParams {
+        static final String VALIDATOR_CLASS = "validatorClass";
+        static final String TYPE = "type";
+        static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
+        static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
+        static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
+        static final String KEY_TYPE = "keyType";
+        static final String VALUE_TYPE = "valueType";
+        static final String INCLUDE_ZERO = "includeZero";
+    }
 
     /**
      * Validators with fields: validatorClass and type

http://git-wip-us.apache.org/repos/asf/storm/blob/b5cb2ecd/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 09d2be9..3ac1d47 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -328,7 +328,6 @@ public class TestConfigValidate {
 
     @Test
     public void testListEntryTypeValidator() {
-        ListEntryTypeValidator validator = new ListEntryTypeValidator();
         Collection<Object> testCases1 = new LinkedList<Object>();
         Collection<Object> testCases2 = new LinkedList<Object>();
         Collection<Object> testCases3 = new LinkedList<Object>();
@@ -340,12 +339,12 @@ public class TestConfigValidate {
         testCases1.add(Arrays.asList(testCase2));
 
         for (Object value : testCases1) {
-            validator.validateField("test", String.class, value);
+            ListEntryTypeValidator.validateField("test", String.class, value);
         }
 
         for (Object value : testCases1) {
             try {
-                validator.validateField("test", Number.class, value);
+                ListEntryTypeValidator.validateField("test", Number.class, value);
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -359,13 +358,13 @@ public class TestConfigValidate {
         testCases2.add(Arrays.asList(testCase5));
         for (Object value : testCases2) {
             try {
-                validator.validateField("test", String.class, value);
+                ListEntryTypeValidator.validateField("test", String.class, value);
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
         }
         for (Object value : testCases2) {
-            validator.validateField("test", Number.class, value);
+            ListEntryTypeValidator.validateField("test", Number.class, value);
         }
 
         Object[] testCase6 = {1000, 0, 1000, "5"};
@@ -374,14 +373,14 @@ public class TestConfigValidate {
         testCases3.add(Arrays.asList(testCase7));
         for (Object value : testCases3) {
             try {
-                validator.validateField("test", String.class, value);
+                ListEntryTypeValidator.validateField("test", String.class, value);
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
         }
         for (Object value : testCases1) {
             try {
-                validator.validateField("test", Number.class, value);
+                ListEntryTypeValidator.validateField("test", Number.class, value);
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }


[4/5] storm git commit: Merge branch 'STORM-1084' of https://github.com/jerrypeng/storm into STORM-1084

Posted by bo...@apache.org.
Merge branch 'STORM-1084' of https://github.com/jerrypeng/storm into STORM-1084

STORM-1084: Improve Storm config validation process to use java annotations instead of *_SCHEMA format


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/46d7c0e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/46d7c0e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/46d7c0e5

Branch: refs/heads/master
Commit: 46d7c0e53cd667f397242499d096461229e6510f
Parents: 9fe97b6 b5cb2ec
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 15 13:08:03 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Oct 15 13:08:03 2015 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/config.clj    |  53 +-
 storm-core/src/jvm/backtype/storm/Config.java   | 521 ++++++++---------
 .../storm/validation/ConfigValidation.java      | 563 ++++++++++++++++++
 .../validation/ConfigValidationAnnotations.java | 218 +++++++
 .../storm/validation/ConfigValidationUtils.java | 175 ++++++
 .../test/clj/backtype/storm/config_test.clj     | 186 ------
 .../clj/backtype/storm/serialization_test.clj   |  14 +-
 .../jvm/backtype/storm/TestConfigValidate.java  | 564 +++++++++++++++++++
 8 files changed, 1797 insertions(+), 497 deletions(-)
----------------------------------------------------------------------