You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/14 06:28:29 UTC

[2/4] storm git commit: STORM-832: Allow config validation to be used by plugins/etc.

STORM-832: Allow config validation to be used by plugins/etc.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aec187f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aec187f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aec187f4

Branch: refs/heads/master
Commit: aec187f4302ac81f0bc308278beed6e9c56b7d3e
Parents: d8368b0
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 12 13:31:20 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Apr 12 20:36:48 2017 -0500

----------------------------------------------------------------------
 docs/Configuration.md                           |  12 +-
 .../org/apache/storm/hdfs/spout/Configs.java    | 129 ++++++++--
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |   9 +-
 .../org.apache.storm.validation.Validated       |  17 ++
 .../apache/storm/hdfs/spout/ConfigsTest.java    |  88 +++++++
 .../storm/validation/ConfigValidation.java      | 245 ++++++++++++-------
 .../validation/ConfigValidationAnnotations.java |  54 ++--
 .../org/apache/storm/validation/NotConf.java    |  27 ++
 .../org/apache/storm/validation/Validated.java  |  25 ++
 .../org/apache/storm/TestConfigValidate.java    |  32 +--
 .../src/clj/org/apache/storm/config.clj         |  14 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  21 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  22 +-
 .../org/apache/storm/command/AdminCommands.java |  17 +-
 .../org/apache/storm/command/ConfigValue.java   |   4 +-
 .../org/apache/storm/command/DevZookeeper.java  |   9 +-
 .../org/apache/storm/command/HealthCheck.java   |   8 +-
 .../org/apache/storm/command/Heartbeats.java    |   8 +-
 .../org/apache/storm/command/KillWorkers.java   |   4 +-
 .../apache/storm/security/auth/auth_test.clj    |   6 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   8 +-
 storm-server/pom.xml                            |   2 +-
 .../java/org/apache/storm/DaemonConfig.java     |   3 +-
 .../java/org/apache/storm/LocalCluster.java     |   4 +-
 .../main/java/org/apache/storm/LocalDRPC.java   |   4 +-
 .../storm/blobstore/FileBlobStoreImpl.java      |   3 +-
 .../storm/blobstore/LocalFsBlobStore.java       |   3 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  10 +-
 .../org/apache/storm/pacemaker/Pacemaker.java   |  27 +-
 .../apache/storm/utils/ServerConfigUtils.java   |  43 ++--
 .../org/apache/storm/utils/ServerUtils.java     |   3 +-
 .../org.apache.storm.validation.Validated       |  17 ++
 .../apache/storm/TestDaemonConfigValidate.java  |  12 +-
 .../apache/storm/daemon/drpc/DRPCServer.java    |   5 +-
 34 files changed, 618 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/docs/Configuration.md
----------------------------------------------------------------------
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 979ac9a..83cb28a 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -21,10 +21,20 @@ The Java API lets you specify component specific configurations in two ways:
 
 The preference order for configuration values is defaults.yaml < storm.yaml < topology specific configuration < internal component specific configuration < external component specific configuration. 
 
+# Bolts, Spouts, and Plugins
+In almost all cases configuration for a bolt or a spout should be done though setters on the bolt or spout implementation and not the topology conf.  In some rare cases it may make since to
+expose topology wide configurations that are not currently a part of [Config](javadocs/org/apache/storm/Config.html) or [DaemonConfig](javadocs/org/apache/storm/DaemonConfig.html) such as
+when writing a custom scheduler or a plugin to some part of storm.  In those
+cases you can create your own class like Config but implements [Validated](javadocs/org/apache/storm/validation/Validated.html). Any `public static final String` field declared in this
+class will be treated as a config and annotations from the `org.apache.storm.validation.ConfigValidationAnnotations` class can be used to enforce what is stored in that config.
+To let the validator know about this class you need to treat the class
+like a service that will be loaded through a ServiceLoader for the Validated class and include a `META-INF/services/org.apache.storm.validation.Validated` file in your jar that holds
+the name of your Config class.
 
 **Resources:**
 
-* [Config](javadocs/org/apache/storm/Config.html): a listing of all configurations as well as a helper class for creating topology specific configurations
+* [Config](javadocs/org/apache/storm/Config.html): a listing of client configurations as well as a helper class for creating topology specific configurations
+* [DaemonConfig](javadocs/org/apache/storm/DaemonConfig.html): a listing of Storm Daemon configurations.
 * [defaults.yaml]({{page.git-blob-base}}/conf/defaults.yaml): the default values for all configurations
 * [Setting up a Storm cluster](Setting-up-a-Storm-cluster.html): explains how to create and configure a Storm cluster
 * [Running topologies on a production cluster](Running-topologies-on-a-production-cluster.html): lists useful configurations when running topologies on a cluster

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 8911b3c..cb2607a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -18,29 +18,112 @@
 
 package org.apache.storm.hdfs.spout;
 
-public class Configs {
-  public static final String READER_TYPE = "hdfsspout.reader.type";        // Required - chose the file type being consumed
-  public static final String TEXT = "text";
-  public static final String SEQ = "seq";
-
-  public static final String HDFS_URI = "hdfsspout.hdfs";                   // Required - HDFS name node
-  public static final String SOURCE_DIR = "hdfsspout.source.dir";           // Required - dir from which to read files
-  public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // Required - completed files will be moved here
-  public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // Required - unparsable files will be moved here
-  public static final String LOCK_DIR = "hdfsspout.lock.dir";               // dir in which lock files will be created
-  public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  // commit after N records. 0 disables this.
-  public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";      // commit after N secs. cannot be disabled.
-  public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
-  public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";   // inactivity duration after which locks are considered candidates for being reassigned to another spout
-  public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";     // if clocks on machines in the Storm cluster are in sync
-  public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";     // filenames with this suffix in archive dir will be ignored by the Spout
-
-  public static final String DEFAULT_LOCK_DIR = ".lock";
-  public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
-  public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
-  public static final int DEFAULT_MAX_OUTSTANDING = 10000;
-  public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
-  public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
+import org.apache.storm.validation.ConfigValidation.Validator;
+import org.apache.storm.validation.ConfigValidationAnnotations.CustomValidator;
+import org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
+import org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType;
+import org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
+import org.apache.storm.validation.ConfigValidationAnnotations.isString;
+import org.apache.storm.validation.NotConf;
+import org.apache.storm.validation.Validated;
 
+public class Configs implements Validated {
+    public static class ReaderTypeValidator extends Validator {
+        @Override
+        public void validateField(String name, Object o) {
+            HdfsSpout.checkValidReader((String)o);
+        }
+    }
+    
+    /**
+     * @deprecated please use {@link HdfsSpout.setReaderType(String)}
+     */
+    @Deprecated
+    @isString
+    @CustomValidator(validatorClass = ReaderTypeValidator.class)
+    public static final String READER_TYPE = "hdfsspout.reader.type";        // Required - chose the file type being consumed
+    public static final String TEXT = "text";
+    public static final String SEQ = "seq";
+    
+    /**
+     * @deprecated please use {@link HdfsSpout#setHdfsUri(String)}
+     */
+    @Deprecated
+    @isString
+    public static final String HDFS_URI = "hdfsspout.hdfs";                   // Required - HDFS name node
+    /**
+     * @deprecated please use {@link HdfsSpout#setSourceDir(String)}
+     */
+    @Deprecated
+    @isString
+    public static final String SOURCE_DIR = "hdfsspout.source.dir";           // Required - dir from which to read files
+    /**
+     * @deprecated please use {@link HdfsSpout#setArchiveDir(String)}
+     */
+    @Deprecated
+    @isString
+    public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // Required - completed files will be moved here
+    /**
+     * @deprecated please use {@link HdfsSpout#setBadFilesDir(String)}
+     */
+    @Deprecated
+    @isString
+    public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // Required - unparsable files will be moved here
+    /**
+     * @deprecated please use {@link HdfsSpout#setLockDir(String)}
+     */
+    @Deprecated
+    @isString
+    public static final String LOCK_DIR = "hdfsspout.lock.dir";               // dir in which lock files will be created
+    /**
+     * @deprecated please use {@link HdfsSpout#setCommitFrequencyCount(int)}
+     */
+    @Deprecated
+    @isInteger
+    @isPositiveNumber(includeZero=true)
+    public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  // commit after N records. 0 disables this.
+    /**
+     * @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)}
+     */
+    @Deprecated
+    @isInteger
+    @isPositiveNumber
+    public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";      // commit after N secs. cannot be disabled.
+    /**
+     * @deprecated please use {@link HdfsSpout#setMaxOutstanding(int)}
+     */
+    @Deprecated
+    @isInteger
+    @isPositiveNumber(includeZero=true)
+    public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
+    /**
+     * @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)}
+     */
+    @Deprecated
+    @isInteger
+    @isPositiveNumber
+    public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";   // inactivity duration after which locks are considered candidates for being reassigned to another spout
+    /**
+     * @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)}
+     */
+    @Deprecated
+    @isBoolean
+    public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";     // if clocks on machines in the Storm cluster are in sync
+    /**
+     * @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)}
+     */
+    @Deprecated
+    @isString
+    public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";     // filenames with this suffix in archive dir will be ignored by the Spout
 
+    @NotConf
+    public static final String DEFAULT_LOCK_DIR = ".lock";
+    public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
+    public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
+    public static final int DEFAULT_MAX_OUTSTANDING = 10000;
+    public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
+    
+    @isMapEntryType(keyType = String.class, valueType = String.class)
+    public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
 } // class Configs

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 89776be..fe72610 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -364,7 +364,8 @@ public class HdfsSpout extends BaseRichSpout {
     inflight.put(id, tuple);
   }
 
-  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+  @SuppressWarnings("deprecation")
+public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
     LOG.info("Opening HDFS Spout");
     this.conf = conf;
     this.commitTimer = new Timer();
@@ -528,12 +529,16 @@ public class HdfsSpout extends BaseRichSpout {
     return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
   }
 
-  private static void checkValidReader(String readerType) {
+  static void checkValidReader(String readerType) {
     if ( readerType.equalsIgnoreCase(Configs.TEXT)  || readerType.equalsIgnoreCase(Configs.SEQ) )
       return;
     try {
       Class<?> classType = Class.forName(readerType);
       classType.getConstructor(FileSystem.class, Path.class, Map.class);
+      if (!FileReader.class.isAssignableFrom(classType)) {
+          LOG.error(readerType + " not a FileReader");
+          throw new IllegalArgumentException(readerType + " not a FileReader."); 
+      }
       return;
     } catch (ClassNotFoundException e) {
       LOG.error(readerType + " not found in classpath.", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated b/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated
new file mode 100644
index 0000000..18c242e
--- /dev/null
+++ b/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.hdfs.spout.Configs

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java
new file mode 100644
index 0000000..6244466
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.validation.ConfigValidation;
+import org.junit.Test;
+
+public class ConfigsTest {
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testGood() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Configs.READER_TYPE, Configs.TEXT);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.READER_TYPE, Configs.SEQ);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.READER_TYPE, TextFileReader.class.getName());
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.HDFS_URI, "hdfs://namenode/");
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.SOURCE_DIR, "/input/source");
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.ARCHIVE_DIR, "/input/done");
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.BAD_DIR, "/input/bad");
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.LOCK_DIR, "/topology/lock");
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.COMMIT_FREQ_COUNT, 0);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.COMMIT_FREQ_COUNT, 100);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.COMMIT_FREQ_SEC, 100);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.MAX_OUTSTANDING, 500);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.LOCK_TIMEOUT, 100);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.CLOCKS_INSYNC, true);
+        ConfigValidation.validateFields(conf);
+        conf.put(Configs.IGNORE_SUFFIX, ".writing");
+        ConfigValidation.validateFields(conf);
+        Map<String, String> hdfsConf = new HashMap<>();
+        hdfsConf.put("A", "B");
+        conf.put(Configs.DEFAULT_HDFS_CONFIG_KEY, hdfsConf);
+        ConfigValidation.validateFields(conf);
+    }
+
+    public static void verifyBad(String key, Object value) {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(key, value);
+        try {
+            ConfigValidation.validateFields(conf);
+            fail("Expected "+key+" = "+ value + " to throw Exception, but it didn't");
+        } catch (IllegalArgumentException e) {
+            //good
+        }
+    }
+    
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testBad() {
+        verifyBad(Configs.READER_TYPE, "SomeString");
+        verifyBad(Configs.HDFS_URI, 100);
+        verifyBad(Configs.COMMIT_FREQ_COUNT, -10);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index f4500b4..c78af22 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -18,26 +18,31 @@
 
 package org.apache.storm.validation;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Provides functionality for validating configuration fields.
  */
 public class ConfigValidation {
-
-    private static final Class CONFIG_CLASS = org.apache.storm.Config.class;
-
     private static final Logger LOG = LoggerFactory.getLogger(ConfigValidation.class);
 
     public static abstract class Validator {
@@ -69,10 +74,10 @@ public class ConfigValidation {
      */
     public static class SimpleTypeValidator extends Validator {
 
-        private Class type;
+        private Class<?> type;
 
         public SimpleTypeValidator(Map<String, Object> params) {
-            this.type = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+            this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
         }
 
         @Override
@@ -80,7 +85,7 @@ public class ConfigValidation {
             validateField(name, this.type, o);
         }
 
-        public static void validateField(String name, Class type, Object o) {
+        public static void validateField(String name, Class<?> type, Object o) {
             if (o == null) {
                 return;
             }
@@ -179,6 +184,7 @@ public class ConfigValidation {
             ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
                     ConfigValidationUtils.listFv(String.class, false), false);
             validator.validateField(name, o);
+            @SuppressWarnings("unchecked")
             Map<String, List<String>> mapObject = (Map<String, List<String>>) o;
             if (!mapObject.containsKey("hosts")) {
                 throw new IllegalArgumentException(name + " should contain Map entry with key: hosts");
@@ -202,7 +208,7 @@ public class ConfigValidation {
             //check if iterable
             SimpleTypeValidator.validateField(name, Iterable.class, field);
             HashSet<Object> objectSet = new HashSet<Object>();
-            for (Object o : (Iterable) field) {
+            for (Object o : (Iterable<?>) field) {
                 if (objectSet.contains(o)) {
                     throw new IllegalArgumentException(name + " should contain no duplicate elements. Duplicated element: " + o);
                 }
@@ -234,13 +240,14 @@ public class ConfigValidation {
      */
     public static class KryoRegValidator extends Validator {
 
+        @SuppressWarnings("unchecked")
         @Override
         public void validateField(String name, Object o) {
             if (o == null) {
                 return;
             }
             if (o instanceof Iterable) {
-                for (Object e : (Iterable) o) {
+                for (Object e : (Iterable<?>) o) {
                     if (e instanceof Map) {
                         for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) e).entrySet()) {
                             if (!(entry.getKey() instanceof String) ||
@@ -288,10 +295,10 @@ public class ConfigValidation {
      */
     public static class ListEntryTypeValidator extends Validator {
 
-        private Class type;
+        private Class<?> type;
 
         public ListEntryTypeValidator(Map<String, Object> params) {
-            this.type = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+            this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
         }
 
         @Override
@@ -299,7 +306,7 @@ public class ConfigValidation {
             validateField(name, this.type, o);
         }
 
-        public static void validateField(String name, Class type, Object o) {
+        public static void validateField(String name, Class<?> type, Object o) {
             ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
             validator.validateField(name, o);
         }
@@ -311,10 +318,10 @@ public class ConfigValidation {
      */
     public static class ListEntryCustomValidator extends Validator{
 
-        private Class[] entryValidators;
+        private Class<?>[] entryValidators;
 
         public ListEntryCustomValidator(Map<String, Object> params) {
-            this.entryValidators = (Class[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
+            this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
         }
 
         @Override
@@ -326,14 +333,14 @@ public class ConfigValidation {
             }
         }
 
-        public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+        public static void validateField(String name, Class<?>[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
             //check if iterable
             SimpleTypeValidator.validateField(name, Iterable.class, o);
-            for (Object entry : (Iterable) o) {
-                for (Class validator : validators) {
+            for (Object entry : (Iterable<?>) o) {
+                for (Class<?> validator : validators) {
                     Object v = validator.getConstructor().newInstance();
                     if (v instanceof Validator) {
                         ((Validator) v).validateField(name + " list entry", entry);
@@ -350,12 +357,12 @@ public class ConfigValidation {
      */
     public static class MapEntryTypeValidator extends Validator{
 
-        private Class keyType;
-        private Class valueType;
+        private Class<?> keyType;
+        private Class<?> valueType;
 
         public MapEntryTypeValidator(Map<String, Object> params) {
-            this.keyType = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
-            this.valueType = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
+            this.keyType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
+            this.valueType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
         }
 
         @Override
@@ -363,7 +370,7 @@ public class ConfigValidation {
             validateField(name, this.keyType, this.valueType, o);
         }
 
-        public static void validateField(String name, Class keyType, Class valueType, Object o) {
+        public static void validateField(String name, Class<?> keyType, Class<?> valueType, Object o) {
             ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
             validator.validateField(name, o);
         }
@@ -374,12 +381,12 @@ public class ConfigValidation {
      */
     public static class MapEntryCustomValidator extends Validator{
 
-        private Class[] keyValidators;
-        private Class[] valueValidators;
+        private Class<?>[] keyValidators;
+        private Class<?>[] valueValidators;
 
         public MapEntryCustomValidator(Map<String, Object> params) {
-            this.keyValidators = (Class []) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
-            this.valueValidators = (Class []) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+            this.keyValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
+            this.valueValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
         }
 
         @Override
@@ -391,14 +398,15 @@ public class ConfigValidation {
             }
         }
 
-        public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+        @SuppressWarnings("unchecked")
+        public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
             if (o == null) {
                 return;
             }
             //check if Map
             SimpleTypeValidator.validateField(name, Map.class, o);
             for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
-                for (Class kv : keyValidators) {
+                for (Class<?> kv : keyValidators) {
                     Object keyValidator = kv.getConstructor().newInstance();
                     if (keyValidator instanceof Validator) {
                         ((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
@@ -406,7 +414,7 @@ public class ConfigValidation {
                         LOG.warn("validator: {} cannot be used in MapEntryCustomValidator to validate keys.  Individual entry validators must a instance of Validator class", kv.getName());
                     }
                 }
-                for (Class vv : valueValidators) {
+                for (Class<?> vv : valueValidators) {
                     Object valueValidator = vv.getConstructor().newInstance();
                     if (valueValidator instanceof Validator) {
                         ((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
@@ -465,11 +473,11 @@ public class ConfigValidation {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if (!((Map) o).containsKey("class")) {
+            if (!((Map<?, ?>) o).containsKey("class")) {
                 throw new IllegalArgumentException("Field " + name + " must have map entry with key: class");
             }
 
-            SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
+            SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) o).get("class"));
         }
     }
 
@@ -481,15 +489,15 @@ public class ConfigValidation {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if(!((Map) o).containsKey("class") ) {
+            if(!((Map<?, ?>) o).containsKey("class") ) {
                 throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class");
             }
-            if(!((Map) o).containsKey("parallelism.hint") ) {
+            if(!((Map<?, ?>) o).containsKey("parallelism.hint") ) {
                 throw new IllegalArgumentException("Field " + name + " must have map entry with key: parallelism.hint");
             }
 
-            SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
-            new IntegerValidator().validateField(name, ((Map) o).get("parallelism.hint"));
+            SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) o).get("class"));
+            new IntegerValidator().validateField(name, ((Map<?, ?>) o).get("parallelism.hint"));
         }
     }
 
@@ -527,24 +535,25 @@ public class ConfigValidation {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if (!((Map) o).containsKey("cpu")) {
+            Map<?, ?> m = (Map<?, ?>) o;
+            if (!m.containsKey("cpu")) {
                 throw new IllegalArgumentException("Field " + name + " must have map entry with key: cpu");
             }
-            if (!((Map) o).containsKey("memory")) {
+            if (!m.containsKey("memory")) {
                 throw new IllegalArgumentException("Field " + name + " must have map entry with key: memory");
             }
 
-            SimpleTypeValidator.validateField(name, Number.class, ((Map) o).get("cpu"));
-            SimpleTypeValidator.validateField(name, Number.class, ((Map) o).get("memory"));
+            SimpleTypeValidator.validateField(name, Number.class, m.get("cpu"));
+            SimpleTypeValidator.validateField(name, Number.class, m.get("memory"));
         }
     }
 
     public static class ImplementsClassValidator extends Validator {
 
-        Class classImplements;
+        Class<?> classImplements;
 
         public ImplementsClassValidator(Map<String, Object> params) {
-            this.classImplements = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+            this.classImplements = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
         }
 
         @Override
@@ -554,7 +563,7 @@ public class ConfigValidation {
             }
             SimpleTypeValidator.validateField(name, String.class, o);
             try {
-                Class objectClass = Class.forName((String) o);
+                Class<?> objectClass = Class.forName((String) o);
                 if (!this.classImplements.isAssignableFrom(objectClass)) {
                     throw new IllegalArgumentException("Field " + name + " with value " + o
                             + " does not implement " + this.classImplements.getName());
@@ -575,8 +584,45 @@ public class ConfigValidation {
      * @param fieldName provided as a string
      * @param conf      map of confs
      */
-    public static void validateField(String fieldName, Map conf) {
-        validateField(fieldName, conf, CONFIG_CLASS);
+    public static void validateField(String fieldName, Map<String, Object> conf)  {
+        validateField(fieldName, conf, getConfigClasses());
+    }
+
+    private static List<Class<?>> configClasses = null;
+    //We follow the model of service loaders (Even though it is not a service).
+    private static final String CONFIG_CLASSES_NAME = "META-INF/services/"+Validated.class.getName();
+    
+    private synchronized static List<Class<?>> getConfigClasses() {
+        if (configClasses == null) {
+            List<Class<?>> ret = new ArrayList<>();
+            Set<String> classesToScan = new HashSet<>();
+            classesToScan.add(Config.class.getName());
+            for (URL url: Utils.findResources(CONFIG_CLASSES_NAME)) {
+                try {
+                    try (BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()))) {
+                        String line;
+                        while((line = in.readLine()) != null) {
+                            line = line.replaceAll("#.*$", "").trim();
+                            if (!line.isEmpty()) {
+                                classesToScan.add(line);
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException("Error trying to read " + url, e);
+                }
+            }
+            for (String clazz: classesToScan) {
+                try {
+                    ret.add(Class.forName(clazz));
+                } catch (ClassNotFoundException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            LOG.debug("Will use {} for validation", ret);
+            configClasses = ret;
+        }
+        return configClasses;
     }
 
     /**
@@ -584,14 +630,19 @@ public class ConfigValidation {
      *
      * @param fieldName   provided as a string
      * @param conf        map of confs
-     * @param configClass config class
+     * @param configs config class
      */
-    public static void validateField(String fieldName, Map conf, Class configClass) {
+    public static void validateField(String fieldName, Map<String, Object> conf, List<Class<?>> configs) {
         Field field = null;
-        try {
-            field = configClass.getField(fieldName);
-        } catch (NoSuchFieldException e) {
-            throw new RuntimeException(e);
+        for (Class<?> clazz: configs) {
+            try {
+                field = clazz.getField(fieldName);
+            } catch (NoSuchFieldException e) {
+                //Ignored
+            }
+        }
+        if (field == null) {
+            throw new RuntimeException("Could not find " + fieldName + " in any of " + configs);
         }
         validateField(field, conf);
     }
@@ -603,18 +654,23 @@ public class ConfigValidation {
      * @param field field that needs to be validated
      * @param conf  map of confs
      */
-    public static void validateField(Field field, Map conf) {
+    public static void validateField(Field field, Map<String, Object> conf) {
         Annotation[] annotations = field.getAnnotations();
         if (annotations.length == 0) {
             LOG.warn("Field {} does not have validator annotation", field);
         }
         try {
             for (Annotation annotation : annotations) {
+                if (annotation.annotationType().equals(Deprecated.class)) {
+                    LOG.warn("{} is a deprecated config please see {}.{} for more information.", 
+                            field.get(null), field.getDeclaringClass(), field.getName());
+                    continue;
+                }
                 String type = annotation.annotationType().getName();
-                Class validatorClass = null;
+                Class<?> validatorClass = null;
                 Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
                 //check if annotation is one of our
-                for (Class clazz : classes) {
+                for (Class<?> clazz : classes) {
                     if (clazz.getName().equals(type)) {
                         validatorClass = clazz;
                         break;
@@ -623,7 +679,8 @@ public class ConfigValidation {
                 if (validatorClass != null) {
                     Object v = validatorClass.cast(annotation);
                     String key = (String) field.get(null);
-                    Class clazz = (Class) validatorClass
+                    @SuppressWarnings("unchecked")
+                    Class<Validator> clazz = (Class<Validator>) validatorClass
                             .getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
                     Validator o = null;
                     Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
@@ -631,11 +688,11 @@ public class ConfigValidation {
                     //One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
                     //If validator has a constructor that takes a Map as an argument call that constructor
                     if (hasConstructor(clazz, Map.class)) {
-                        o = (Validator) clazz.getConstructor(Map.class).newInstance(params);
+                        o = clazz.getConstructor(Map.class).newInstance(params);
                     }
                     //If not call default constructor
                     else {
-                        o = (((Class<Validator>) clazz).newInstance());
+                        o = clazz.newInstance();
                     }
                     o.validateField(field.getName(), conf.get(key));
                 }
@@ -644,41 +701,60 @@ public class ConfigValidation {
             throw new RuntimeException(e);
         }
     }
-
+    
     /**
      * Validate all confs in map
      *
      * @param conf map of configs
      */
-    public static void validateFields(Map conf) {
-        validateFields(conf, CONFIG_CLASS);
-    }
-
+    public static void validateFields(Map<String, Object> conf) {
+        validateFields(conf, getConfigClasses());
+    }
+
+    //The following come from the JVm Specification table 4.4
+    // https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.5
+    private static final int ACC_PUBLIC = 0x0001;
+    private static final int ACC_STATIC = 0x0008;
+    private static final int ACC_FINAL  = 0x0010;
+    private static final int DESIRED_FIELD_ACC = ACC_PUBLIC | ACC_STATIC | ACC_FINAL;
+    public static boolean isFieldAllowed(Field field) {
+        return field.getAnnotation(NotConf.class) == null &&
+                String.class.equals(field.getType()) &&
+                ((field.getModifiers() & DESIRED_FIELD_ACC) == DESIRED_FIELD_ACC) &&
+                !field.isSynthetic();
+    }
+    
     /**
      * Validate all confs in map
      *
      * @param conf        map of configs
-     * @param configClass config class
+     * @param classes config class
      */
-    public static void validateFields(Map conf, Class configClass) {
-        for (Field field : configClass.getFields()) {
-            Object keyObj = null;
-            try {
-                keyObj = field.get(null);
-            } catch (IllegalAccessException e) {
-                throw new RuntimeException(e);
-            }
-            //make sure that defined key is string in case wrong stuff got put into Config.java
-            if (keyObj instanceof String) {
-                String confKey = (String) keyObj;
-                if (conf.containsKey(confKey)) {
-                    validateField(field, conf);
+    public static void validateFields(Map<String, Object> conf, List<Class<?>> classes) {
+        for (Class<?> clazz: classes) {
+            for (Field field : clazz.getDeclaredFields()) {
+                if (!isFieldAllowed(field)) {
+                    continue;
+                }
+                Object keyObj = null;
+                try {
+                    keyObj = field.get(null);
+                } catch (IllegalAccessException e) {
+                    //This should not happen because we checked for PUBLIC in isFieldAllowed
+                    throw new RuntimeException(e);
+                }
+                //make sure that defined key is string in case wrong stuff got put into Config.java
+                if (keyObj instanceof String) {
+                    String confKey = (String) keyObj;
+                    if (conf.containsKey(confKey)) {
+                        validateField(field, conf);
+                    }
                 }
             }
         }
     }
 
-    private static Map<String,Object> getParamsFromAnnotation(Class validatorClass, Object v) throws InvocationTargetException, IllegalAccessException {
+    private static Map<String,Object> getParamsFromAnnotation(Class<?> validatorClass, Object v) throws InvocationTargetException, IllegalAccessException {
         Map<String, Object> params = new HashMap<String, Object>();
         for(Method method : validatorClass.getDeclaredMethods()) {
 
@@ -695,8 +771,8 @@ public class ConfigValidation {
         return params;
     }
 
-    private static boolean hasConstructor(Class clazz, Class paramClass) {
-        Class[] classes = {paramClass};
+    private static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) {
+        Class<?>[] classes = {paramClass};
         try {
             clazz.getConstructor(classes);
         } catch (NoSuchMethodException e) {
@@ -704,13 +780,4 @@ public class ConfigValidation {
         }
         return true;
     }
-
-    private static boolean hasMethod(Class clazz, String method) {
-        try {
-            clazz.getMethod(method);
-        } catch (NoSuchMethodException ex) {
-            return false;
-        }
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
index b770f34..56dc574 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
@@ -55,17 +55,17 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isType {
-        Class validatorClass() default ConfigValidation.SimpleTypeValidator.class;
+        Class<?> validatorClass() default ConfigValidation.SimpleTypeValidator.class;
 
-        Class type();
+        Class<?> type();
     }
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isStringList {
-        Class validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
+        Class<?> validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
 
-        Class type() default String.class;
+        Class<?> type() default String.class;
     }
 
     /**
@@ -74,9 +74,9 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isListEntryType {
-        Class validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
+        Class<?> validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
 
-        Class type();
+        Class<?> type();
     }
 
     /**
@@ -85,26 +85,26 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isString {
-        Class validatorClass() default ConfigValidation.StringValidator.class;
+        Class<?> validatorClass() default ConfigValidation.StringValidator.class;
         String[] acceptedValues() default "";
     }
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isNumber {
-        Class validatorClass() default ConfigValidation.NumberValidator.class;
+        Class<?> validatorClass() default ConfigValidation.NumberValidator.class;
     }
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isBoolean {
-        Class validatorClass() default ConfigValidation.BooleanValidator.class;
+        Class<?> validatorClass() default ConfigValidation.BooleanValidator.class;
     }
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isInteger {
-        Class validatorClass() default ConfigValidation.IntegerValidator.class;
+        Class<?> validatorClass() default ConfigValidation.IntegerValidator.class;
     }
 
     /**
@@ -113,7 +113,7 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface NotNull {
-        Class validatorClass() default ConfigValidation.NotNullValidator.class;
+        Class<?> validatorClass() default ConfigValidation.NotNullValidator.class;
     }
 
     /**
@@ -122,7 +122,7 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isNoDuplicateInList {
-        Class validatorClass() default ConfigValidation.NoDuplicateInListValidator.class;
+        Class<?> validatorClass() default ConfigValidation.NoDuplicateInListValidator.class;
     }
 
     /**
@@ -132,9 +132,9 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isListEntryCustom {
-        Class validatorClass() default ConfigValidation.ListEntryCustomValidator.class;
+        Class<?> validatorClass() default ConfigValidation.ListEntryCustomValidator.class;
 
-        Class[] entryValidatorClasses();
+        Class<?>[] entryValidatorClasses();
     }
 
     /**
@@ -144,11 +144,11 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isMapEntryType {
-        Class validatorClass() default ConfigValidation.MapEntryTypeValidator.class;
+        Class<?> validatorClass() default ConfigValidation.MapEntryTypeValidator.class;
 
-        Class keyType();
+        Class<?> keyType();
 
-        Class valueType();
+        Class<?> valueType();
     }
 
     /**
@@ -158,11 +158,11 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isMapEntryCustom {
-        Class validatorClass() default ConfigValidation.MapEntryCustomValidator.class;
+        Class<?> validatorClass() default ConfigValidation.MapEntryCustomValidator.class;
 
-        Class[] keyValidatorClasses();
+        Class<?>[] keyValidatorClasses();
 
-        Class[] valueValidatorClasses();
+        Class<?>[] valueValidatorClasses();
     }
 
     /**
@@ -172,7 +172,7 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isPositiveNumber {
-        Class validatorClass() default ConfigValidation.PositiveNumberValidator.class;
+        Class<?> validatorClass() default ConfigValidation.PositiveNumberValidator.class;
 
         boolean includeZero() default false;
     }
@@ -180,9 +180,9 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isImplementationOfClass {
-        Class validatorClass() default ConfigValidation.ImplementsClassValidator.class;
+        Class<?> validatorClass() default ConfigValidation.ImplementsClassValidator.class;
 
-        Class implementsClass();
+        Class<?> implementsClass();
     }
 
     /**
@@ -191,19 +191,19 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isStringOrStringList {
-        Class validatorClass() default ConfigValidation.StringOrStringListValidator.class;
+        Class<?> validatorClass() default ConfigValidation.StringOrStringListValidator.class;
     }
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isKryoReg {
-        Class validatorClass() default ConfigValidation.KryoRegValidator.class;
+        Class<?> validatorClass() default ConfigValidation.KryoRegValidator.class;
     }
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isPowerOf2 {
-        Class validatorClass() default ConfigValidation.PowerOf2Validator.class;
+        Class<?> validatorClass() default ConfigValidation.PowerOf2Validator.class;
     }
 
     /**
@@ -212,7 +212,7 @@ public class ConfigValidationAnnotations {
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface CustomValidator {
-        Class validatorClass();
+        Class<?> validatorClass();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/NotConf.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/NotConf.java b/storm-client/src/jvm/org/apache/storm/validation/NotConf.java
new file mode 100644
index 0000000..0040740
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/validation/NotConf.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.validation;
+
+/**
+ * Annotation that can be used to explicitly call out
+ * public static final String fields that are not configs
+ */
+public @interface NotConf {
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/Validated.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/Validated.java b/storm-client/src/jvm/org/apache/storm/validation/Validated.java
new file mode 100644
index 0000000..b5e6c88
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/validation/Validated.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.validation;
+
+/**
+ * An interface that is used to inform config validation what to look at
+ */
+public interface Validated {
+    //Empty
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index 2f15749..ee4d143 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -401,7 +401,7 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
 
         Map<Object, Object> failCase1 = new HashMap<Object, Object>();
@@ -418,7 +418,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -439,7 +439,7 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG_2, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
 
         Map<Object, Object> failCase1 = new HashMap<Object, Object>();
@@ -466,7 +466,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_2, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -485,7 +485,7 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG_3, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
 
         Object[] failCase1 = {1, 5.0, -0.01, 0, "aaa"};
@@ -498,7 +498,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_3, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -517,7 +517,7 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG_4, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
 
         Object[] failCase1 = {1, 5.0, -0.01, 3.0};
@@ -536,7 +536,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_4, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -550,14 +550,14 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG_5, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
 
         String[] failCases = {"aa", "bb", "cc", "abc", "a", "b", "c", ""};
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_5, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -581,7 +581,7 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG_6, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
 
         Map<String, Map<String, List<String>>> failCase1 = new HashMap<String, Map<String, List<String>>>();
@@ -605,7 +605,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_6, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -630,7 +630,7 @@ public class TestConfigValidate {
         passCase1.get("derek").put("memory", 60148);
 
         config.put(TestConfig.TEST_MAP_CONFIG_7, (Object) passCase1);
-        ConfigValidation.validateFields(config, TestConfig.class);
+        ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
 
         Map<String, Map<String, Integer>> failCase1 = new HashMap<String, Map<String, Integer>>();
         failCase1.put("jerry", new HashMap<String, Integer>());
@@ -660,7 +660,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_7, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }
@@ -677,7 +677,7 @@ public class TestConfigValidate {
 
         for (Object value : passCases) {
             config.put(TestConfig.TEST_MAP_CONFIG_8, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
+            ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
         }
         //will fail since org.apache.storm.nimbus.NimbusInfo doesn't implement or extend org.apache.storm.networktopography.DNSToSwitchMapping
         failCases.add("org.apache.storm.nimbus.NimbusInfo");
@@ -685,7 +685,7 @@ public class TestConfigValidate {
         for (Object value : failCases) {
             try {
                 config.put(TestConfig.TEST_MAP_CONFIG_8, value);
-                ConfigValidation.validateFields(config, TestConfig.class);
+                ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
                 Assert.fail("Expected Exception not Thrown for value: " + value);
             } catch (IllegalArgumentException Ex) {
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-clojure/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/config.clj b/storm-clojure/src/clj/org/apache/storm/config.clj
index bfe47ed..e7d3281 100644
--- a/storm-clojure/src/clj/org/apache/storm/config.clj
+++ b/storm-clojure/src/clj/org/apache/storm/config.clj
@@ -15,14 +15,16 @@
 ;; limitations under the License.
 
 (ns org.apache.storm.config
-  (:import [org.apache.storm Config]))
+  (:import [org.apache.storm Config])
+  (:import [org.apache.storm.validation ConfigValidation]))
 
 (defn- clojure-config-name [name]
   (.replace (.toUpperCase name) "_" "-"))
 
 ; define clojure constants for every configuration parameter
-(doseq [f (seq (.getFields Config))]
-  (let [name (.getName f)
-        new-name (clojure-config-name name)]
-    (eval
-      `(def ~(symbol new-name) (. Config ~(symbol name))))))
+(doseq [f (seq (.getDeclaredFields Config))]
+  (when (ConfigValidation/isFieldAllowed f)
+    (let [name (.getName f)
+          new-name (clojure-config-name name)]
+      (eval
+        `(def ~(symbol new-name) (. Config ~(symbol name)))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index 3bf6372..bf7cd12 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -15,14 +15,23 @@
 ;; limitations under the License.
 
 (ns org.apache.storm.config
-  (:import [org.apache.storm DaemonConfig]))
+  (:import [org.apache.storm DaemonConfig Config])
+  (:import [org.apache.storm.validation ConfigValidation]))
 
 (defn- clojure-config-name [name]
   (.replace (.toUpperCase name) "_" "-"))
 
 ; define clojure constants for every configuration parameter
-(doseq [f (seq (.getFields DaemonConfig))]
-  (let [name (.getName f)
-        new-name (clojure-config-name name)]
-    (eval
-      `(def ~(symbol new-name) (. DaemonConfig ~(symbol name))))))
+(doseq [f (seq (.getDeclaredFields DaemonConfig))]
+  (when (ConfigValidation/isFieldAllowed f)
+    (let [name (.getName f)
+          new-name (clojure-config-name name)]
+      (eval
+        `(def ~(symbol new-name) (. DaemonConfig ~(symbol name)))))))
+
+(doseq [f (seq (.getDeclaredFields Config))]
+  (when (ConfigValidation/isFieldAllowed f)
+    (let [name (.getName f)
+          new-name (clojure-config-name name)]
+      (eval
+        `(def ~(symbol new-name) (. Config ~(symbol name)))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index b0f9846..02ee449 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -46,7 +46,7 @@
   (:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
   (:import [org.apache.storm.security.auth AuthUtils])
   (:import [org.apache.storm.utils VersionInfo ConfigUtils Utils WebAppUtils])
-  (:import [org.apache.storm Config DaemonConfig])
+  (:import [org.apache.storm Config])
   (:import [java.io File])
   (:import [java.net URLEncoder URLDecoder])
   (:import [org.json.simple JSONValue])
@@ -390,8 +390,8 @@
            resourceSummary (if (> (.size sups) 0)
                              (reduce #(map + %1 %2)
                                (for [^SupervisorSummary s sups
-                                     :let [sup-total-mem (get (.get_total_resources s) DaemonConfig/SUPERVISOR_MEMORY_CAPACITY_MB)
-                                           sup-total-cpu (get (.get_total_resources s) DaemonConfig/SUPERVISOR_CPU_CAPACITY)
+                                     :let [sup-total-mem (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+                                           sup-total-cpu (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
                                            sup-avail-mem (max (- sup-total-mem (.get_used_mem s)) 0.0)
                                            sup-avail-cpu (max (- sup-total-cpu (.get_used_cpu s)) 0.0)]]
                                  [sup-total-mem sup-total-cpu sup-avail-mem sup-avail-cpu]))
@@ -477,8 +477,8 @@
   (let [slotsTotal (.get_num_workers summary)
         slotsUsed (.get_num_used_workers summary)
         slotsFree (max (- slotsTotal slotsUsed) 0)
-        totalMem (get (.get_total_resources summary) DaemonConfig/SUPERVISOR_MEMORY_CAPACITY_MB)
-        totalCpu (get (.get_total_resources summary) DaemonConfig/SUPERVISOR_CPU_CAPACITY)
+        totalMem (get (.get_total_resources summary) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+        totalCpu (get (.get_total_resources summary) Config/SUPERVISOR_CPU_CAPACITY)
         usedMem (.get_used_mem summary)
         usedCpu (.get_used_cpu summary)
         availMem (max (- totalMem usedMem) 0)
@@ -616,9 +616,9 @@
    "transferred" (.get_transferred common-stats)
    "acked" (.get_acked common-stats)
    "failed" (.get_failed common-stats)
-   "requestedMemOnHeap" (.get (.get_resources_map common-stats) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
-   "requestedMemOffHeap" (.get (.get_resources_map common-stats) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
-   "requestedCpu" (.get (.get_resources_map common-stats) DaemonConfig/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)})
+   "requestedMemOnHeap" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
+   "requestedMemOffHeap" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+   "requestedCpu" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)})
 
 (defmulti comp-agg-stats-json
   "Returns a JSON representation of aggregated statistics."
@@ -1049,9 +1049,9 @@
        "name" (.get_topology_name comp-page-info)
        "executors" (.get_num_executors comp-page-info)
        "tasks" (.get_num_tasks comp-page-info)
-       "requestedMemOnHeap" (.get (.get_resources_map comp-page-info) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
-       "requestedMemOffHeap" (.get (.get_resources_map comp-page-info) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
-       "requestedCpu" (.get (.get_resources_map comp-page-info) DaemonConfig/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)
+       "requestedMemOnHeap" (.get (.get_resources_map comp-page-info) Config/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
+       "requestedMemOffHeap" (.get (.get_resources_map comp-page-info) Config/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+       "requestedCpu" (.get (.get_resources_map comp-page-info) Config/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)
        "schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)
        "topologyId" topology-id
        "topologyStatus" (.get_topology_status comp-page-info)

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index 18ec82e..a052f8d 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -17,10 +17,15 @@
  */
 package org.apache.storm.command;
 
-import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.Config;
-import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.KeyFilter;
 import org.apache.storm.blobstore.LocalFsBlobStore;
@@ -39,14 +44,14 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import com.google.common.collect.Sets;
 
 public class AdminCommands {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class);
     private static BlobStore nimbusBlobStore;
     private static IStormClusterState stormClusterState;
-    private static Map conf;
+    private static Map<String, Object> conf;
 
     public static void main(String [] args) throws Exception {
 
@@ -54,7 +59,7 @@ public class AdminCommands {
             throw new IllegalArgumentException("Missing command. Supported command is remove_corrupt_topologies");
         }
         String command = args[0];
-        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+        //String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
         switch (command) {
             case "remove_corrupt_topologies":
                 initialize();
@@ -68,7 +73,7 @@ public class AdminCommands {
     }
 
     private static void initialize() {
-        conf = ServerConfigUtils.readStormConfig();
+        conf = Utils.readStormConfig();
         nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java b/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
index 10c7852..6074291 100644
--- a/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
+++ b/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
@@ -19,12 +19,12 @@ package org.apache.storm.command;
 
 import java.util.Map;
 
-import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.utils.Utils;
 
 public class ConfigValue {
     public static void main(String [] args) {
         String name = args[0];
-        Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+        Map<String, Object> conf = Utils.readStormConfig();
         System.out.println("VALUE: " + conf.get(name));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java b/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
index 6850ae7..ef53170 100644
--- a/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
@@ -17,18 +17,17 @@
  */
 package org.apache.storm.command;
 
+import java.util.Map;
+
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.zookeeper.Zookeeper;
 
-import java.util.Map;
-
 public class DevZookeeper {
     public static void main(String[] args) throws Exception {
-        Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+        Map<String, Object> conf = Utils.readStormConfig();
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         String localPath = (String) conf.get(DaemonConfig.DEV_ZOOKEEPER_PATH);
         Utils.forceDelete(localPath);

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index 49c29d5..dc7c18c 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -17,15 +17,15 @@
  */
 package org.apache.storm.command;
 
-import org.apache.storm.healthcheck.HealthChecker;
-import org.apache.storm.utils.ServerConfigUtils;
-
 import java.util.Map;
 
+import org.apache.storm.healthcheck.HealthChecker;
+import org.apache.storm.utils.Utils;
+
 public class HealthCheck {
 
     public static void main(String[] args) {
-        Map conf = ServerConfigUtils.readStormConfig();
+        Map<String, Object> conf = Utils.readStormConfig();
         System.exit(HealthChecker.healthCheck(conf));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
index 27e996d..668f019 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.command;
 
-import com.google.common.base.Joiner;
-import org.apache.storm.utils.ServerConfigUtils;
+import java.util.Map;
+
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.IStateStorage;
@@ -29,7 +29,7 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
+import com.google.common.base.Joiner;
 
 public class Heartbeats {
     private static final Logger LOG = LoggerFactory.getLogger(Heartbeats.class);
@@ -42,7 +42,7 @@ public class Heartbeats {
         String command = args[0];
         String path = args[1];
 
-        Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+        Map<String, Object> conf = Utils.readStormConfig();
         IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext());
 
         LOG.info("Command: [{}]", command);

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index e12e4d8..5dc8402 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -21,13 +21,13 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.storm.Config;
-import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
 import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.utils.Utils;
 
 public class KillWorkers {
     public static void main(String [] args) throws Exception {
-        Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+        Map<String, Object> conf = Utils.readStormConfig();
         conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
         try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor())) {
             supervisor.shutdownAllWorkers(null, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index dbb6993..fc95097 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -345,7 +345,7 @@
                   "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                               {STORM-THRIFT-TRANSPORT-PLUGIN         "org.apache.storm.security.auth.SimpleTransportPlugin"
-                               DaemonConfig/NIMBUS_THRIFT_PORT       a-port
+                               Config/NIMBUS_THRIFT_PORT       a-port
                                DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
             client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
             nimbus_client (.getClient client)]
@@ -414,7 +414,7 @@
 
 (deftest test-GetTransportPlugin-throws-RuntimeException
   (let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                    {DaemonConfig/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
+                    {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
     (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))
 
 (defn mk-impersonating-req-context [impersonating-user user-being-impersonated remote-address]
@@ -433,7 +433,7 @@
         _ (.prepare groups (clojurify-structure (ConfigUtils/readStormConfig)))
         groups (.getGroups groups user-being-impersonated)
         cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                       {DaemonConfig/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
+                       {Config/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
                                                                             "groups" groups}}})
         authorizer (ImpersonationAuthorizer. )
         unauthorized-host (com.google.common.net.InetAddresses/forString "10.10.10.10")

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 8c0ee31..ecdc337 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -18,7 +18,7 @@
   (:require [org.apache.storm.security.auth [auth-test :refer [nimbus-timeout]]])
   (:import [java.nio ByteBuffer])
   (:import [java.util Optional])
-  (:import [org.apache.storm LocalCluster$Builder DaemonConfig])
+  (:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
   (:import [org.apache.storm.blobstore BlobStore])
   (:import [org.apache.storm.utils NimbusClient])
   (:import [org.apache.storm.generated NotAliveException StormBase])
@@ -106,7 +106,7 @@
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                               {STORM-THRIFT-TRANSPORT-PLUGIN   "org.apache.storm.security.auth.SimpleTransportPlugin"
-                               DaemonConfig/NIMBUS_THRIFT_PORT port
+                               Config/NIMBUS_THRIFT_PORT port
                                STORM-NIMBUS-RETRY-TIMES        0})
             client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
             nimbus_client (.getClient client)
@@ -142,7 +142,7 @@
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                               {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                               DaemonConfig/NIMBUS_THRIFT_PORT port
+                               Config/NIMBUS_THRIFT_PORT port
                                STORM-NIMBUS-RETRY-TIMES 0})
             client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
             nimbus_client (.getClient client)]
@@ -172,7 +172,7 @@
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                                {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                               DaemonConfig/NIMBUS_THRIFT_PORT port
+                               Config/NIMBUS_THRIFT_PORT port
                                STORM-NIMBUS-RETRY-TIMES 0})
             client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
             nimbus_client (.getClient client)

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 99e7958..33194ec 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -154,4 +154,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index d249313..356a347 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -23,6 +23,7 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.validation.Validated;
 
 import java.util.ArrayList;
 import java.util.Map;
@@ -37,7 +38,7 @@ import static org.apache.storm.validation.ConfigValidationAnnotations.*;
  *
  * This class extends {@link org.apache.storm.Config} for supporting Storm Daemons.
  */
-public class DaemonConfig extends Config {
+public class DaemonConfig implements Validated {
 
     /**
      * We check with this interval that whether the Netty channel is writable and try to write pending messages

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 4cd24eb..1330594 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -406,8 +406,8 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
             conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
             conf.put(Config.STORM_CLUSTER_MODE, "local");
-            conf.put(DaemonConfig.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
-            conf.put(DaemonConfig.BLOBSTORE_DIR, nimbusTmp.getPath());
+            conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
+            conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
         
             InProcessZookeeper zookeeper = null;
             if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
index 9e25d16..bd60b62 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
@@ -24,8 +24,8 @@ import org.apache.storm.daemon.drpc.DRPCThrift;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServiceRegistry;
+import org.apache.storm.utils.Utils;
 import org.apache.thrift.TException;
 
 /**
@@ -41,7 +41,7 @@ public class LocalDRPC implements ILocalDRPC {
     private final String serviceId;
 
     public LocalDRPC() {
-        Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+        Map<String, Object> conf = Utils.readStormConfig();
         drpc = new DRPC(conf);
         serviceId = ServiceRegistry.registerService(new DRPCThrift(drpc));
     }