You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/14 06:28:29 UTC
[2/4] storm git commit: STORM-832: Allow config validation to be used
by plugins/etc.
STORM-832: Allow config validation to be used by plugins/etc.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aec187f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aec187f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aec187f4
Branch: refs/heads/master
Commit: aec187f4302ac81f0bc308278beed6e9c56b7d3e
Parents: d8368b0
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Apr 12 13:31:20 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Apr 12 20:36:48 2017 -0500
----------------------------------------------------------------------
docs/Configuration.md | 12 +-
.../org/apache/storm/hdfs/spout/Configs.java | 129 ++++++++--
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 9 +-
.../org.apache.storm.validation.Validated | 17 ++
.../apache/storm/hdfs/spout/ConfigsTest.java | 88 +++++++
.../storm/validation/ConfigValidation.java | 245 ++++++++++++-------
.../validation/ConfigValidationAnnotations.java | 54 ++--
.../org/apache/storm/validation/NotConf.java | 27 ++
.../org/apache/storm/validation/Validated.java | 25 ++
.../org/apache/storm/TestConfigValidate.java | 32 +--
.../src/clj/org/apache/storm/config.clj | 14 +-
storm-core/src/clj/org/apache/storm/config.clj | 21 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 22 +-
.../org/apache/storm/command/AdminCommands.java | 17 +-
.../org/apache/storm/command/ConfigValue.java | 4 +-
.../org/apache/storm/command/DevZookeeper.java | 9 +-
.../org/apache/storm/command/HealthCheck.java | 8 +-
.../org/apache/storm/command/Heartbeats.java | 8 +-
.../org/apache/storm/command/KillWorkers.java | 4 +-
.../apache/storm/security/auth/auth_test.clj | 6 +-
.../storm/security/auth/nimbus_auth_test.clj | 8 +-
storm-server/pom.xml | 2 +-
.../java/org/apache/storm/DaemonConfig.java | 3 +-
.../java/org/apache/storm/LocalCluster.java | 4 +-
.../main/java/org/apache/storm/LocalDRPC.java | 4 +-
.../storm/blobstore/FileBlobStoreImpl.java | 3 +-
.../storm/blobstore/LocalFsBlobStore.java | 3 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 10 +-
.../org/apache/storm/pacemaker/Pacemaker.java | 27 +-
.../apache/storm/utils/ServerConfigUtils.java | 43 ++--
.../org/apache/storm/utils/ServerUtils.java | 3 +-
.../org.apache.storm.validation.Validated | 17 ++
.../apache/storm/TestDaemonConfigValidate.java | 12 +-
.../apache/storm/daemon/drpc/DRPCServer.java | 5 +-
34 files changed, 618 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/docs/Configuration.md
----------------------------------------------------------------------
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 979ac9a..83cb28a 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -21,10 +21,20 @@ The Java API lets you specify component specific configurations in two ways:
The preference order for configuration values is defaults.yaml < storm.yaml < topology specific configuration < internal component specific configuration < external component specific configuration.
+# Bolts, Spouts, and Plugins
+In almost all cases configuration for a bolt or a spout should be done though setters on the bolt or spout implementation and not the topology conf. In some rare cases it may make since to
+expose topology wide configurations that are not currently a part of [Config](javadocs/org/apache/storm/Config.html) or [DaemonConfig](javadocs/org/apache/storm/DaemonConfig.html) such as
+when writing a custom scheduler or a plugin to some part of storm. In those
+cases you can create your own class like Config but implements [Validated](javadocs/org/apache/storm/validation/Validated.html). Any `public static final String` field declared in this
+class will be treated as a config and annotations from the `org.apache.storm.validation.ConfigValidationAnnotations` class can be used to enforce what is stored in that config.
+To let the validator know about this class you need to treat the class
+like a service that will be loaded through a ServiceLoader for the Validated class and include a `META-INF/services/org.apache.storm.validation.Validated` file in your jar that holds
+the name of your Config class.
**Resources:**
-* [Config](javadocs/org/apache/storm/Config.html): a listing of all configurations as well as a helper class for creating topology specific configurations
+* [Config](javadocs/org/apache/storm/Config.html): a listing of client configurations as well as a helper class for creating topology specific configurations
+* [DaemonConfig](javadocs/org/apache/storm/DaemonConfig.html): a listing of Storm Daemon configurations.
* [defaults.yaml]({{page.git-blob-base}}/conf/defaults.yaml): the default values for all configurations
* [Setting up a Storm cluster](Setting-up-a-Storm-cluster.html): explains how to create and configure a Storm cluster
* [Running topologies on a production cluster](Running-topologies-on-a-production-cluster.html): lists useful configurations when running topologies on a cluster
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 8911b3c..cb2607a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -18,29 +18,112 @@
package org.apache.storm.hdfs.spout;
-public class Configs {
- public static final String READER_TYPE = "hdfsspout.reader.type"; // Required - chose the file type being consumed
- public static final String TEXT = "text";
- public static final String SEQ = "seq";
-
- public static final String HDFS_URI = "hdfsspout.hdfs"; // Required - HDFS name node
- public static final String SOURCE_DIR = "hdfsspout.source.dir"; // Required - dir from which to read files
- public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // Required - completed files will be moved here
- public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // Required - unparsable files will be moved here
- public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created
- public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this.
- public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled.
- public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
- public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout
- public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
- public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; // filenames with this suffix in archive dir will be ignored by the Spout
-
- public static final String DEFAULT_LOCK_DIR = ".lock";
- public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
- public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
- public static final int DEFAULT_MAX_OUTSTANDING = 10000;
- public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
- public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
+import org.apache.storm.validation.ConfigValidation.Validator;
+import org.apache.storm.validation.ConfigValidationAnnotations.CustomValidator;
+import org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
+import org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType;
+import org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
+import org.apache.storm.validation.ConfigValidationAnnotations.isString;
+import org.apache.storm.validation.NotConf;
+import org.apache.storm.validation.Validated;
+public class Configs implements Validated {
+ public static class ReaderTypeValidator extends Validator {
+ @Override
+ public void validateField(String name, Object o) {
+ HdfsSpout.checkValidReader((String)o);
+ }
+ }
+
+ /**
+ * @deprecated please use {@link HdfsSpout.setReaderType(String)}
+ */
+ @Deprecated
+ @isString
+ @CustomValidator(validatorClass = ReaderTypeValidator.class)
+ public static final String READER_TYPE = "hdfsspout.reader.type"; // Required - chose the file type being consumed
+ public static final String TEXT = "text";
+ public static final String SEQ = "seq";
+
+ /**
+ * @deprecated please use {@link HdfsSpout#setHdfsUri(String)}
+ */
+ @Deprecated
+ @isString
+ public static final String HDFS_URI = "hdfsspout.hdfs"; // Required - HDFS name node
+ /**
+ * @deprecated please use {@link HdfsSpout#setSourceDir(String)}
+ */
+ @Deprecated
+ @isString
+ public static final String SOURCE_DIR = "hdfsspout.source.dir"; // Required - dir from which to read files
+ /**
+ * @deprecated please use {@link HdfsSpout#setArchiveDir(String)}
+ */
+ @Deprecated
+ @isString
+ public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // Required - completed files will be moved here
+ /**
+ * @deprecated please use {@link HdfsSpout#setBadFilesDir(String)}
+ */
+ @Deprecated
+ @isString
+ public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // Required - unparsable files will be moved here
+ /**
+ * @deprecated please use {@link HdfsSpout#setLockDir(String)}
+ */
+ @Deprecated
+ @isString
+ public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created
+ /**
+ * @deprecated please use {@link HdfsSpout#setCommitFrequencyCount(int)}
+ */
+ @Deprecated
+ @isInteger
+ @isPositiveNumber(includeZero=true)
+ public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this.
+ /**
+ * @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)}
+ */
+ @Deprecated
+ @isInteger
+ @isPositiveNumber
+ public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled.
+ /**
+ * @deprecated please use {@link HdfsSpout#setMaxOutstanding(int)}
+ */
+ @Deprecated
+ @isInteger
+ @isPositiveNumber(includeZero=true)
+ public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
+ /**
+ * @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)}
+ */
+ @Deprecated
+ @isInteger
+ @isPositiveNumber
+ public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout
+ /**
+ * @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)}
+ */
+ @Deprecated
+ @isBoolean
+ public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
+ /**
+ * @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)}
+ */
+ @Deprecated
+ @isString
+ public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; // filenames with this suffix in archive dir will be ignored by the Spout
+ @NotConf
+ public static final String DEFAULT_LOCK_DIR = ".lock";
+ public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
+ public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
+ public static final int DEFAULT_MAX_OUTSTANDING = 10000;
+ public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
+
+ @isMapEntryType(keyType = String.class, valueType = String.class)
+ public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
} // class Configs
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 89776be..fe72610 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -364,7 +364,8 @@ public class HdfsSpout extends BaseRichSpout {
inflight.put(id, tuple);
}
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ @SuppressWarnings("deprecation")
+public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
LOG.info("Opening HDFS Spout");
this.conf = conf;
this.commitTimer = new Timer();
@@ -528,12 +529,16 @@ public class HdfsSpout extends BaseRichSpout {
return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
}
- private static void checkValidReader(String readerType) {
+ static void checkValidReader(String readerType) {
if ( readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) )
return;
try {
Class<?> classType = Class.forName(readerType);
classType.getConstructor(FileSystem.class, Path.class, Map.class);
+ if (!FileReader.class.isAssignableFrom(classType)) {
+ LOG.error(readerType + " not a FileReader");
+ throw new IllegalArgumentException(readerType + " not a FileReader.");
+ }
return;
} catch (ClassNotFoundException e) {
LOG.error(readerType + " not found in classpath.", e);
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated b/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated
new file mode 100644
index 0000000..18c242e
--- /dev/null
+++ b/external/storm-hdfs/src/main/resources/META-INF/services/org.apache.storm.validation.Validated
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.hdfs.spout.Configs
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java
new file mode 100644
index 0000000..6244466
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.validation.ConfigValidation;
+import org.junit.Test;
+
+public class ConfigsTest {
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testGood() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Configs.READER_TYPE, Configs.TEXT);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.READER_TYPE, Configs.SEQ);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.READER_TYPE, TextFileReader.class.getName());
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.HDFS_URI, "hdfs://namenode/");
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.SOURCE_DIR, "/input/source");
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.ARCHIVE_DIR, "/input/done");
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.BAD_DIR, "/input/bad");
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.LOCK_DIR, "/topology/lock");
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.COMMIT_FREQ_COUNT, 0);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.COMMIT_FREQ_COUNT, 100);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.COMMIT_FREQ_SEC, 100);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.MAX_OUTSTANDING, 500);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.LOCK_TIMEOUT, 100);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.CLOCKS_INSYNC, true);
+ ConfigValidation.validateFields(conf);
+ conf.put(Configs.IGNORE_SUFFIX, ".writing");
+ ConfigValidation.validateFields(conf);
+ Map<String, String> hdfsConf = new HashMap<>();
+ hdfsConf.put("A", "B");
+ conf.put(Configs.DEFAULT_HDFS_CONFIG_KEY, hdfsConf);
+ ConfigValidation.validateFields(conf);
+ }
+
+ public static void verifyBad(String key, Object value) {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(key, value);
+ try {
+ ConfigValidation.validateFields(conf);
+ fail("Expected "+key+" = "+ value + " to throw Exception, but it didn't");
+ } catch (IllegalArgumentException e) {
+ //good
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testBad() {
+ verifyBad(Configs.READER_TYPE, "SomeString");
+ verifyBad(Configs.HDFS_URI, 100);
+ verifyBad(Configs.COMMIT_FREQ_COUNT, -10);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index f4500b4..c78af22 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -18,26 +18,31 @@
package org.apache.storm.validation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Provides functionality for validating configuration fields.
*/
public class ConfigValidation {
-
- private static final Class CONFIG_CLASS = org.apache.storm.Config.class;
-
private static final Logger LOG = LoggerFactory.getLogger(ConfigValidation.class);
public static abstract class Validator {
@@ -69,10 +74,10 @@ public class ConfigValidation {
*/
public static class SimpleTypeValidator extends Validator {
- private Class type;
+ private Class<?> type;
public SimpleTypeValidator(Map<String, Object> params) {
- this.type = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+ this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
}
@Override
@@ -80,7 +85,7 @@ public class ConfigValidation {
validateField(name, this.type, o);
}
- public static void validateField(String name, Class type, Object o) {
+ public static void validateField(String name, Class<?> type, Object o) {
if (o == null) {
return;
}
@@ -179,6 +184,7 @@ public class ConfigValidation {
ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
ConfigValidationUtils.listFv(String.class, false), false);
validator.validateField(name, o);
+ @SuppressWarnings("unchecked")
Map<String, List<String>> mapObject = (Map<String, List<String>>) o;
if (!mapObject.containsKey("hosts")) {
throw new IllegalArgumentException(name + " should contain Map entry with key: hosts");
@@ -202,7 +208,7 @@ public class ConfigValidation {
//check if iterable
SimpleTypeValidator.validateField(name, Iterable.class, field);
HashSet<Object> objectSet = new HashSet<Object>();
- for (Object o : (Iterable) field) {
+ for (Object o : (Iterable<?>) field) {
if (objectSet.contains(o)) {
throw new IllegalArgumentException(name + " should contain no duplicate elements. Duplicated element: " + o);
}
@@ -234,13 +240,14 @@ public class ConfigValidation {
*/
public static class KryoRegValidator extends Validator {
+ @SuppressWarnings("unchecked")
@Override
public void validateField(String name, Object o) {
if (o == null) {
return;
}
if (o instanceof Iterable) {
- for (Object e : (Iterable) o) {
+ for (Object e : (Iterable<?>) o) {
if (e instanceof Map) {
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) e).entrySet()) {
if (!(entry.getKey() instanceof String) ||
@@ -288,10 +295,10 @@ public class ConfigValidation {
*/
public static class ListEntryTypeValidator extends Validator {
- private Class type;
+ private Class<?> type;
public ListEntryTypeValidator(Map<String, Object> params) {
- this.type = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
+ this.type = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.TYPE);
}
@Override
@@ -299,7 +306,7 @@ public class ConfigValidation {
validateField(name, this.type, o);
}
- public static void validateField(String name, Class type, Object o) {
+ public static void validateField(String name, Class<?> type, Object o) {
ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.listFv(type, false);
validator.validateField(name, o);
}
@@ -311,10 +318,10 @@ public class ConfigValidation {
*/
public static class ListEntryCustomValidator extends Validator{
- private Class[] entryValidators;
+ private Class<?>[] entryValidators;
public ListEntryCustomValidator(Map<String, Object> params) {
- this.entryValidators = (Class[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
+ this.entryValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.ENTRY_VALIDATOR_CLASSES);
}
@Override
@@ -326,14 +333,14 @@ public class ConfigValidation {
}
}
- public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+ public static void validateField(String name, Class<?>[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
if (o == null) {
return;
}
//check if iterable
SimpleTypeValidator.validateField(name, Iterable.class, o);
- for (Object entry : (Iterable) o) {
- for (Class validator : validators) {
+ for (Object entry : (Iterable<?>) o) {
+ for (Class<?> validator : validators) {
Object v = validator.getConstructor().newInstance();
if (v instanceof Validator) {
((Validator) v).validateField(name + " list entry", entry);
@@ -350,12 +357,12 @@ public class ConfigValidation {
*/
public static class MapEntryTypeValidator extends Validator{
- private Class keyType;
- private Class valueType;
+ private Class<?> keyType;
+ private Class<?> valueType;
public MapEntryTypeValidator(Map<String, Object> params) {
- this.keyType = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
- this.valueType = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
+ this.keyType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_TYPE);
+ this.valueType = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_TYPE);
}
@Override
@@ -363,7 +370,7 @@ public class ConfigValidation {
validateField(name, this.keyType, this.valueType, o);
}
- public static void validateField(String name, Class keyType, Class valueType, Object o) {
+ public static void validateField(String name, Class<?> keyType, Class<?> valueType, Object o) {
ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(keyType, valueType, false);
validator.validateField(name, o);
}
@@ -374,12 +381,12 @@ public class ConfigValidation {
*/
public static class MapEntryCustomValidator extends Validator{
- private Class[] keyValidators;
- private Class[] valueValidators;
+ private Class<?>[] keyValidators;
+ private Class<?>[] valueValidators;
public MapEntryCustomValidator(Map<String, Object> params) {
- this.keyValidators = (Class []) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
- this.valueValidators = (Class []) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
+ this.keyValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.KEY_VALIDATOR_CLASSES);
+ this.valueValidators = (Class<?>[]) params.get(ConfigValidationAnnotations.ValidatorParams.VALUE_VALIDATOR_CLASSES);
}
@Override
@@ -391,14 +398,15 @@ public class ConfigValidation {
}
}
- public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+ @SuppressWarnings("unchecked")
+ public static void validateField(String name, Class<?>[] keyValidators, Class<?>[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
if (o == null) {
return;
}
//check if Map
SimpleTypeValidator.validateField(name, Map.class, o);
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) {
- for (Class kv : keyValidators) {
+ for (Class<?> kv : keyValidators) {
Object keyValidator = kv.getConstructor().newInstance();
if (keyValidator instanceof Validator) {
((Validator) keyValidator).validateField(name + " Map key", entry.getKey());
@@ -406,7 +414,7 @@ public class ConfigValidation {
LOG.warn("validator: {} cannot be used in MapEntryCustomValidator to validate keys. Individual entry validators must a instance of Validator class", kv.getName());
}
}
- for (Class vv : valueValidators) {
+ for (Class<?> vv : valueValidators) {
Object valueValidator = vv.getConstructor().newInstance();
if (valueValidator instanceof Validator) {
((Validator) valueValidator).validateField(name + " Map value", entry.getValue());
@@ -465,11 +473,11 @@ public class ConfigValidation {
return;
}
SimpleTypeValidator.validateField(name, Map.class, o);
- if (!((Map) o).containsKey("class")) {
+ if (!((Map<?, ?>) o).containsKey("class")) {
throw new IllegalArgumentException("Field " + name + " must have map entry with key: class");
}
- SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
+ SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) o).get("class"));
}
}
@@ -481,15 +489,15 @@ public class ConfigValidation {
return;
}
SimpleTypeValidator.validateField(name, Map.class, o);
- if(!((Map) o).containsKey("class") ) {
+ if(!((Map<?, ?>) o).containsKey("class") ) {
throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class");
}
- if(!((Map) o).containsKey("parallelism.hint") ) {
+ if(!((Map<?, ?>) o).containsKey("parallelism.hint") ) {
throw new IllegalArgumentException("Field " + name + " must have map entry with key: parallelism.hint");
}
- SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
- new IntegerValidator().validateField(name, ((Map) o).get("parallelism.hint"));
+ SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) o).get("class"));
+ new IntegerValidator().validateField(name, ((Map<?, ?>) o).get("parallelism.hint"));
}
}
@@ -527,24 +535,25 @@ public class ConfigValidation {
return;
}
SimpleTypeValidator.validateField(name, Map.class, o);
- if (!((Map) o).containsKey("cpu")) {
+ Map<?, ?> m = (Map<?, ?>) o;
+ if (!m.containsKey("cpu")) {
throw new IllegalArgumentException("Field " + name + " must have map entry with key: cpu");
}
- if (!((Map) o).containsKey("memory")) {
+ if (!m.containsKey("memory")) {
throw new IllegalArgumentException("Field " + name + " must have map entry with key: memory");
}
- SimpleTypeValidator.validateField(name, Number.class, ((Map) o).get("cpu"));
- SimpleTypeValidator.validateField(name, Number.class, ((Map) o).get("memory"));
+ SimpleTypeValidator.validateField(name, Number.class, m.get("cpu"));
+ SimpleTypeValidator.validateField(name, Number.class, m.get("memory"));
}
}
public static class ImplementsClassValidator extends Validator {
- Class classImplements;
+ Class<?> classImplements;
public ImplementsClassValidator(Map<String, Object> params) {
- this.classImplements = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+ this.classImplements = (Class<?>) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
}
@Override
@@ -554,7 +563,7 @@ public class ConfigValidation {
}
SimpleTypeValidator.validateField(name, String.class, o);
try {
- Class objectClass = Class.forName((String) o);
+ Class<?> objectClass = Class.forName((String) o);
if (!this.classImplements.isAssignableFrom(objectClass)) {
throw new IllegalArgumentException("Field " + name + " with value " + o
+ " does not implement " + this.classImplements.getName());
@@ -575,8 +584,45 @@ public class ConfigValidation {
* @param fieldName provided as a string
* @param conf map of confs
*/
- public static void validateField(String fieldName, Map conf) {
- validateField(fieldName, conf, CONFIG_CLASS);
+ public static void validateField(String fieldName, Map<String, Object> conf) {
+ validateField(fieldName, conf, getConfigClasses());
+ }
+
+ private static List<Class<?>> configClasses = null;
+ //We follow the model of service loaders (Even though it is not a service).
+ private static final String CONFIG_CLASSES_NAME = "META-INF/services/"+Validated.class.getName();
+
+ private synchronized static List<Class<?>> getConfigClasses() {
+ if (configClasses == null) {
+ List<Class<?>> ret = new ArrayList<>();
+ Set<String> classesToScan = new HashSet<>();
+ classesToScan.add(Config.class.getName());
+ for (URL url: Utils.findResources(CONFIG_CLASSES_NAME)) {
+ try {
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()))) {
+ String line;
+ while((line = in.readLine()) != null) {
+ line = line.replaceAll("#.*$", "").trim();
+ if (!line.isEmpty()) {
+ classesToScan.add(line);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error trying to read " + url, e);
+ }
+ }
+ for (String clazz: classesToScan) {
+ try {
+ ret.add(Class.forName(clazz));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ LOG.debug("Will use {} for validation", ret);
+ configClasses = ret;
+ }
+ return configClasses;
}
/**
@@ -584,14 +630,19 @@ public class ConfigValidation {
*
* @param fieldName provided as a string
* @param conf map of confs
- * @param configClass config class
+ * @param configs config class
*/
- public static void validateField(String fieldName, Map conf, Class configClass) {
+ public static void validateField(String fieldName, Map<String, Object> conf, List<Class<?>> configs) {
Field field = null;
- try {
- field = configClass.getField(fieldName);
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
+ for (Class<?> clazz: configs) {
+ try {
+ field = clazz.getField(fieldName);
+ } catch (NoSuchFieldException e) {
+ //Ignored
+ }
+ }
+ if (field == null) {
+ throw new RuntimeException("Could not find " + fieldName + " in any of " + configs);
}
validateField(field, conf);
}
@@ -603,18 +654,23 @@ public class ConfigValidation {
* @param field field that needs to be validated
* @param conf map of confs
*/
- public static void validateField(Field field, Map conf) {
+ public static void validateField(Field field, Map<String, Object> conf) {
Annotation[] annotations = field.getAnnotations();
if (annotations.length == 0) {
LOG.warn("Field {} does not have validator annotation", field);
}
try {
for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(Deprecated.class)) {
+ LOG.warn("{} is a deprecated config please see {}.{} for more information.",
+ field.get(null), field.getDeclaringClass(), field.getName());
+ continue;
+ }
String type = annotation.annotationType().getName();
- Class validatorClass = null;
+ Class<?> validatorClass = null;
Class<?>[] classes = ConfigValidationAnnotations.class.getDeclaredClasses();
//check if annotation is one of our
- for (Class clazz : classes) {
+ for (Class<?> clazz : classes) {
if (clazz.getName().equals(type)) {
validatorClass = clazz;
break;
@@ -623,7 +679,8 @@ public class ConfigValidation {
if (validatorClass != null) {
Object v = validatorClass.cast(annotation);
String key = (String) field.get(null);
- Class clazz = (Class) validatorClass
+ @SuppressWarnings("unchecked")
+ Class<Validator> clazz = (Class<Validator>) validatorClass
.getMethod(ConfigValidationAnnotations.ValidatorParams.VALIDATOR_CLASS).invoke(v);
Validator o = null;
Map<String, Object> params = getParamsFromAnnotation(validatorClass, v);
@@ -631,11 +688,11 @@ public class ConfigValidation {
//One constructor takes input a Map of arguments, the other doesn't take any arguments (default constructor)
//If validator has a constructor that takes a Map as an argument call that constructor
if (hasConstructor(clazz, Map.class)) {
- o = (Validator) clazz.getConstructor(Map.class).newInstance(params);
+ o = clazz.getConstructor(Map.class).newInstance(params);
}
//If not call default constructor
else {
- o = (((Class<Validator>) clazz).newInstance());
+ o = clazz.newInstance();
}
o.validateField(field.getName(), conf.get(key));
}
@@ -644,41 +701,60 @@ public class ConfigValidation {
throw new RuntimeException(e);
}
}
-
+
/**
* Validate all confs in map
*
* @param conf map of configs
*/
- public static void validateFields(Map conf) {
- validateFields(conf, CONFIG_CLASS);
- }
-
+ public static void validateFields(Map<String, Object> conf) {
+ validateFields(conf, getConfigClasses());
+ }
+
+ //The following come from the JVm Specification table 4.4
+ // https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.5
+ private static final int ACC_PUBLIC = 0x0001;
+ private static final int ACC_STATIC = 0x0008;
+ private static final int ACC_FINAL = 0x0010;
+ private static final int DESIRED_FIELD_ACC = ACC_PUBLIC | ACC_STATIC | ACC_FINAL;
+ public static boolean isFieldAllowed(Field field) {
+ return field.getAnnotation(NotConf.class) == null &&
+ String.class.equals(field.getType()) &&
+ ((field.getModifiers() & DESIRED_FIELD_ACC) == DESIRED_FIELD_ACC) &&
+ !field.isSynthetic();
+ }
+
/**
* Validate all confs in map
*
* @param conf map of configs
- * @param configClass config class
+ * @param classes config class
*/
- public static void validateFields(Map conf, Class configClass) {
- for (Field field : configClass.getFields()) {
- Object keyObj = null;
- try {
- keyObj = field.get(null);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- //make sure that defined key is string in case wrong stuff got put into Config.java
- if (keyObj instanceof String) {
- String confKey = (String) keyObj;
- if (conf.containsKey(confKey)) {
- validateField(field, conf);
+ public static void validateFields(Map<String, Object> conf, List<Class<?>> classes) {
+ for (Class<?> clazz: classes) {
+ for (Field field : clazz.getDeclaredFields()) {
+ if (!isFieldAllowed(field)) {
+ continue;
+ }
+ Object keyObj = null;
+ try {
+ keyObj = field.get(null);
+ } catch (IllegalAccessException e) {
+ //This should not happen because we checked for PUBLIC in isFieldAllowed
+ throw new RuntimeException(e);
+ }
+ //make sure that defined key is string in case wrong stuff got put into Config.java
+ if (keyObj instanceof String) {
+ String confKey = (String) keyObj;
+ if (conf.containsKey(confKey)) {
+ validateField(field, conf);
+ }
}
}
}
}
- private static Map<String,Object> getParamsFromAnnotation(Class validatorClass, Object v) throws InvocationTargetException, IllegalAccessException {
+ private static Map<String,Object> getParamsFromAnnotation(Class<?> validatorClass, Object v) throws InvocationTargetException, IllegalAccessException {
Map<String, Object> params = new HashMap<String, Object>();
for(Method method : validatorClass.getDeclaredMethods()) {
@@ -695,8 +771,8 @@ public class ConfigValidation {
return params;
}
- private static boolean hasConstructor(Class clazz, Class paramClass) {
- Class[] classes = {paramClass};
+ private static boolean hasConstructor(Class<?> clazz, Class<?> paramClass) {
+ Class<?>[] classes = {paramClass};
try {
clazz.getConstructor(classes);
} catch (NoSuchMethodException e) {
@@ -704,13 +780,4 @@ public class ConfigValidation {
}
return true;
}
-
- private static boolean hasMethod(Class clazz, String method) {
- try {
- clazz.getMethod(method);
- } catch (NoSuchMethodException ex) {
- return false;
- }
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
index b770f34..56dc574 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
@@ -55,17 +55,17 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isType {
- Class validatorClass() default ConfigValidation.SimpleTypeValidator.class;
+ Class<?> validatorClass() default ConfigValidation.SimpleTypeValidator.class;
- Class type();
+ Class<?> type();
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isStringList {
- Class validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
+ Class<?> validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
- Class type() default String.class;
+ Class<?> type() default String.class;
}
/**
@@ -74,9 +74,9 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isListEntryType {
- Class validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
+ Class<?> validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
- Class type();
+ Class<?> type();
}
/**
@@ -85,26 +85,26 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isString {
- Class validatorClass() default ConfigValidation.StringValidator.class;
+ Class<?> validatorClass() default ConfigValidation.StringValidator.class;
String[] acceptedValues() default "";
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isNumber {
- Class validatorClass() default ConfigValidation.NumberValidator.class;
+ Class<?> validatorClass() default ConfigValidation.NumberValidator.class;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isBoolean {
- Class validatorClass() default ConfigValidation.BooleanValidator.class;
+ Class<?> validatorClass() default ConfigValidation.BooleanValidator.class;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isInteger {
- Class validatorClass() default ConfigValidation.IntegerValidator.class;
+ Class<?> validatorClass() default ConfigValidation.IntegerValidator.class;
}
/**
@@ -113,7 +113,7 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface NotNull {
- Class validatorClass() default ConfigValidation.NotNullValidator.class;
+ Class<?> validatorClass() default ConfigValidation.NotNullValidator.class;
}
/**
@@ -122,7 +122,7 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isNoDuplicateInList {
- Class validatorClass() default ConfigValidation.NoDuplicateInListValidator.class;
+ Class<?> validatorClass() default ConfigValidation.NoDuplicateInListValidator.class;
}
/**
@@ -132,9 +132,9 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isListEntryCustom {
- Class validatorClass() default ConfigValidation.ListEntryCustomValidator.class;
+ Class<?> validatorClass() default ConfigValidation.ListEntryCustomValidator.class;
- Class[] entryValidatorClasses();
+ Class<?>[] entryValidatorClasses();
}
/**
@@ -144,11 +144,11 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isMapEntryType {
- Class validatorClass() default ConfigValidation.MapEntryTypeValidator.class;
+ Class<?> validatorClass() default ConfigValidation.MapEntryTypeValidator.class;
- Class keyType();
+ Class<?> keyType();
- Class valueType();
+ Class<?> valueType();
}
/**
@@ -158,11 +158,11 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isMapEntryCustom {
- Class validatorClass() default ConfigValidation.MapEntryCustomValidator.class;
+ Class<?> validatorClass() default ConfigValidation.MapEntryCustomValidator.class;
- Class[] keyValidatorClasses();
+ Class<?>[] keyValidatorClasses();
- Class[] valueValidatorClasses();
+ Class<?>[] valueValidatorClasses();
}
/**
@@ -172,7 +172,7 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isPositiveNumber {
- Class validatorClass() default ConfigValidation.PositiveNumberValidator.class;
+ Class<?> validatorClass() default ConfigValidation.PositiveNumberValidator.class;
boolean includeZero() default false;
}
@@ -180,9 +180,9 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isImplementationOfClass {
- Class validatorClass() default ConfigValidation.ImplementsClassValidator.class;
+ Class<?> validatorClass() default ConfigValidation.ImplementsClassValidator.class;
- Class implementsClass();
+ Class<?> implementsClass();
}
/**
@@ -191,19 +191,19 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isStringOrStringList {
- Class validatorClass() default ConfigValidation.StringOrStringListValidator.class;
+ Class<?> validatorClass() default ConfigValidation.StringOrStringListValidator.class;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isKryoReg {
- Class validatorClass() default ConfigValidation.KryoRegValidator.class;
+ Class<?> validatorClass() default ConfigValidation.KryoRegValidator.class;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isPowerOf2 {
- Class validatorClass() default ConfigValidation.PowerOf2Validator.class;
+ Class<?> validatorClass() default ConfigValidation.PowerOf2Validator.class;
}
/**
@@ -212,7 +212,7 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CustomValidator {
- Class validatorClass();
+ Class<?> validatorClass();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/NotConf.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/NotConf.java b/storm-client/src/jvm/org/apache/storm/validation/NotConf.java
new file mode 100644
index 0000000..0040740
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/validation/NotConf.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.validation;
+
+/**
+ * Annotation that can be used to explicitly call out
+ * public static final String fields that are not configs
+ */
+public @interface NotConf {
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/src/jvm/org/apache/storm/validation/Validated.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/Validated.java b/storm-client/src/jvm/org/apache/storm/validation/Validated.java
new file mode 100644
index 0000000..b5e6c88
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/validation/Validated.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.validation;
+
+/**
+ * An interface that is used to inform config validation what to look at
+ */
+public interface Validated {
+ //Empty
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index 2f15749..ee4d143 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -401,7 +401,7 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
Map<Object, Object> failCase1 = new HashMap<Object, Object>();
@@ -418,7 +418,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -439,7 +439,7 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG_2, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
Map<Object, Object> failCase1 = new HashMap<Object, Object>();
@@ -466,7 +466,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_2, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -485,7 +485,7 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG_3, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
Object[] failCase1 = {1, 5.0, -0.01, 0, "aaa"};
@@ -498,7 +498,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_3, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -517,7 +517,7 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG_4, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
Object[] failCase1 = {1, 5.0, -0.01, 3.0};
@@ -536,7 +536,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_4, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -550,14 +550,14 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG_5, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
String[] failCases = {"aa", "bb", "cc", "abc", "a", "b", "c", ""};
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_5, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -581,7 +581,7 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG_6, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
Map<String, Map<String, List<String>>> failCase1 = new HashMap<String, Map<String, List<String>>>();
@@ -605,7 +605,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_6, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -630,7 +630,7 @@ public class TestConfigValidate {
passCase1.get("derek").put("memory", 60148);
config.put(TestConfig.TEST_MAP_CONFIG_7, (Object) passCase1);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Map<String, Map<String, Integer>> failCase1 = new HashMap<String, Map<String, Integer>>();
failCase1.put("jerry", new HashMap<String, Integer>());
@@ -660,7 +660,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_7, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
@@ -677,7 +677,7 @@ public class TestConfigValidate {
for (Object value : passCases) {
config.put(TestConfig.TEST_MAP_CONFIG_8, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
}
//will fail since org.apache.storm.nimbus.NimbusInfo doesn't implement or extend org.apache.storm.networktopography.DNSToSwitchMapping
failCases.add("org.apache.storm.nimbus.NimbusInfo");
@@ -685,7 +685,7 @@ public class TestConfigValidate {
for (Object value : failCases) {
try {
config.put(TestConfig.TEST_MAP_CONFIG_8, value);
- ConfigValidation.validateFields(config, TestConfig.class);
+ ConfigValidation.validateFields(config, Arrays.asList(TestConfig.class));
Assert.fail("Expected Exception not Thrown for value: " + value);
} catch (IllegalArgumentException Ex) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-clojure/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/config.clj b/storm-clojure/src/clj/org/apache/storm/config.clj
index bfe47ed..e7d3281 100644
--- a/storm-clojure/src/clj/org/apache/storm/config.clj
+++ b/storm-clojure/src/clj/org/apache/storm/config.clj
@@ -15,14 +15,16 @@
;; limitations under the License.
(ns org.apache.storm.config
- (:import [org.apache.storm Config]))
+ (:import [org.apache.storm Config])
+ (:import [org.apache.storm.validation ConfigValidation]))
(defn- clojure-config-name [name]
(.replace (.toUpperCase name) "_" "-"))
; define clojure constants for every configuration parameter
-(doseq [f (seq (.getFields Config))]
- (let [name (.getName f)
- new-name (clojure-config-name name)]
- (eval
- `(def ~(symbol new-name) (. Config ~(symbol name))))))
+(doseq [f (seq (.getDeclaredFields Config))]
+ (when (ConfigValidation/isFieldAllowed f)
+ (let [name (.getName f)
+ new-name (clojure-config-name name)]
+ (eval
+ `(def ~(symbol new-name) (. Config ~(symbol name)))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index 3bf6372..bf7cd12 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -15,14 +15,23 @@
;; limitations under the License.
(ns org.apache.storm.config
- (:import [org.apache.storm DaemonConfig]))
+ (:import [org.apache.storm DaemonConfig Config])
+ (:import [org.apache.storm.validation ConfigValidation]))
(defn- clojure-config-name [name]
(.replace (.toUpperCase name) "_" "-"))
; define clojure constants for every configuration parameter
-(doseq [f (seq (.getFields DaemonConfig))]
- (let [name (.getName f)
- new-name (clojure-config-name name)]
- (eval
- `(def ~(symbol new-name) (. DaemonConfig ~(symbol name))))))
+(doseq [f (seq (.getDeclaredFields DaemonConfig))]
+ (when (ConfigValidation/isFieldAllowed f)
+ (let [name (.getName f)
+ new-name (clojure-config-name name)]
+ (eval
+ `(def ~(symbol new-name) (. DaemonConfig ~(symbol name)))))))
+
+(doseq [f (seq (.getDeclaredFields Config))]
+ (when (ConfigValidation/isFieldAllowed f)
+ (let [name (.getName f)
+ new-name (clojure-config-name name)]
+ (eval
+ `(def ~(symbol new-name) (. Config ~(symbol name)))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index b0f9846..02ee449 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -46,7 +46,7 @@
(:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
(:import [org.apache.storm.security.auth AuthUtils])
(:import [org.apache.storm.utils VersionInfo ConfigUtils Utils WebAppUtils])
- (:import [org.apache.storm Config DaemonConfig])
+ (:import [org.apache.storm Config])
(:import [java.io File])
(:import [java.net URLEncoder URLDecoder])
(:import [org.json.simple JSONValue])
@@ -390,8 +390,8 @@
resourceSummary (if (> (.size sups) 0)
(reduce #(map + %1 %2)
(for [^SupervisorSummary s sups
- :let [sup-total-mem (get (.get_total_resources s) DaemonConfig/SUPERVISOR_MEMORY_CAPACITY_MB)
- sup-total-cpu (get (.get_total_resources s) DaemonConfig/SUPERVISOR_CPU_CAPACITY)
+ :let [sup-total-mem (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+ sup-total-cpu (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
sup-avail-mem (max (- sup-total-mem (.get_used_mem s)) 0.0)
sup-avail-cpu (max (- sup-total-cpu (.get_used_cpu s)) 0.0)]]
[sup-total-mem sup-total-cpu sup-avail-mem sup-avail-cpu]))
@@ -477,8 +477,8 @@
(let [slotsTotal (.get_num_workers summary)
slotsUsed (.get_num_used_workers summary)
slotsFree (max (- slotsTotal slotsUsed) 0)
- totalMem (get (.get_total_resources summary) DaemonConfig/SUPERVISOR_MEMORY_CAPACITY_MB)
- totalCpu (get (.get_total_resources summary) DaemonConfig/SUPERVISOR_CPU_CAPACITY)
+ totalMem (get (.get_total_resources summary) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+ totalCpu (get (.get_total_resources summary) Config/SUPERVISOR_CPU_CAPACITY)
usedMem (.get_used_mem summary)
usedCpu (.get_used_cpu summary)
availMem (max (- totalMem usedMem) 0)
@@ -616,9 +616,9 @@
"transferred" (.get_transferred common-stats)
"acked" (.get_acked common-stats)
"failed" (.get_failed common-stats)
- "requestedMemOnHeap" (.get (.get_resources_map common-stats) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
- "requestedMemOffHeap" (.get (.get_resources_map common-stats) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
- "requestedCpu" (.get (.get_resources_map common-stats) DaemonConfig/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)})
+ "requestedMemOnHeap" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
+ "requestedMemOffHeap" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+ "requestedCpu" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)})
(defmulti comp-agg-stats-json
"Returns a JSON representation of aggregated statistics."
@@ -1049,9 +1049,9 @@
"name" (.get_topology_name comp-page-info)
"executors" (.get_num_executors comp-page-info)
"tasks" (.get_num_tasks comp-page-info)
- "requestedMemOnHeap" (.get (.get_resources_map comp-page-info) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
- "requestedMemOffHeap" (.get (.get_resources_map comp-page-info) DaemonConfig/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
- "requestedCpu" (.get (.get_resources_map comp-page-info) DaemonConfig/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)
+ "requestedMemOnHeap" (.get (.get_resources_map comp-page-info) Config/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
+ "requestedMemOffHeap" (.get (.get_resources_map comp-page-info) Config/TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+ "requestedCpu" (.get (.get_resources_map comp-page-info) Config/TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)
"schedulerDisplayResource" (*STORM-CONF* SCHEDULER-DISPLAY-RESOURCE)
"topologyId" topology-id
"topologyStatus" (.get_topology_status comp-page-info)
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index 18ec82e..a052f8d 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -17,10 +17,15 @@
*/
package org.apache.storm.command;
-import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.storm.Config;
-import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.KeyFilter;
import org.apache.storm.blobstore.LocalFsBlobStore;
@@ -39,14 +44,14 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import com.google.common.collect.Sets;
public class AdminCommands {
private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class);
private static BlobStore nimbusBlobStore;
private static IStormClusterState stormClusterState;
- private static Map conf;
+ private static Map<String, Object> conf;
public static void main(String [] args) throws Exception {
@@ -54,7 +59,7 @@ public class AdminCommands {
throw new IllegalArgumentException("Missing command. Supported command is remove_corrupt_topologies");
}
String command = args[0];
- String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+ //String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
switch (command) {
case "remove_corrupt_topologies":
initialize();
@@ -68,7 +73,7 @@ public class AdminCommands {
}
private static void initialize() {
- conf = ServerConfigUtils.readStormConfig();
+ conf = Utils.readStormConfig();
nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java b/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
index 10c7852..6074291 100644
--- a/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
+++ b/storm-core/src/jvm/org/apache/storm/command/ConfigValue.java
@@ -19,12 +19,12 @@ package org.apache.storm.command;
import java.util.Map;
-import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.utils.Utils;
public class ConfigValue {
public static void main(String [] args) {
String name = args[0];
- Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+ Map<String, Object> conf = Utils.readStormConfig();
System.out.println("VALUE: " + conf.get(name));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java b/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
index 6850ae7..ef53170 100644
--- a/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/command/DevZookeeper.java
@@ -17,18 +17,17 @@
*/
package org.apache.storm.command;
+import java.util.Map;
+
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
-import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.Zookeeper;
-import java.util.Map;
-
public class DevZookeeper {
public static void main(String[] args) throws Exception {
- Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+ Map<String, Object> conf = Utils.readStormConfig();
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
String localPath = (String) conf.get(DaemonConfig.DEV_ZOOKEEPER_PATH);
Utils.forceDelete(localPath);
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index 49c29d5..dc7c18c 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -17,15 +17,15 @@
*/
package org.apache.storm.command;
-import org.apache.storm.healthcheck.HealthChecker;
-import org.apache.storm.utils.ServerConfigUtils;
-
import java.util.Map;
+import org.apache.storm.healthcheck.HealthChecker;
+import org.apache.storm.utils.Utils;
+
public class HealthCheck {
public static void main(String[] args) {
- Map conf = ServerConfigUtils.readStormConfig();
+ Map<String, Object> conf = Utils.readStormConfig();
System.exit(HealthChecker.healthCheck(conf));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
index 27e996d..668f019 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -17,8 +17,8 @@
*/
package org.apache.storm.command;
-import com.google.common.base.Joiner;
-import org.apache.storm.utils.ServerConfigUtils;
+import java.util.Map;
+
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.IStateStorage;
@@ -29,7 +29,7 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
+import com.google.common.base.Joiner;
public class Heartbeats {
private static final Logger LOG = LoggerFactory.getLogger(Heartbeats.class);
@@ -42,7 +42,7 @@ public class Heartbeats {
String command = args[0];
String path = args[1];
- Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+ Map<String, Object> conf = Utils.readStormConfig();
IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext());
LOG.info("Command: [{}]", command);
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index e12e4d8..5dc8402 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -21,13 +21,13 @@ import java.io.File;
import java.util.Map;
import org.apache.storm.Config;
-import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.utils.Utils;
public class KillWorkers {
public static void main(String [] args) throws Exception {
- Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+ Map<String, Object> conf = Utils.readStormConfig();
conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor())) {
supervisor.shutdownAllWorkers(null, null);
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index dbb6993..fc95097 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -345,7 +345,7 @@
"org.apache.storm.security.auth.SimpleTransportPlugin" nil]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
- DaemonConfig/NIMBUS_THRIFT_PORT a-port
+ Config/NIMBUS_THRIFT_PORT a-port
DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
nimbus_client (.getClient client)]
@@ -414,7 +414,7 @@
(deftest test-GetTransportPlugin-throws-RuntimeException
(let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {DaemonConfig/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
+ {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
(is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))
(defn mk-impersonating-req-context [impersonating-user user-being-impersonated remote-address]
@@ -433,7 +433,7 @@
_ (.prepare groups (clojurify-structure (ConfigUtils/readStormConfig)))
groups (.getGroups groups user-being-impersonated)
cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {DaemonConfig/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
+ {Config/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
"groups" groups}}})
authorizer (ImpersonationAuthorizer. )
unauthorized-host (com.google.common.net.InetAddresses/forString "10.10.10.10")
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 8c0ee31..ecdc337 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -18,7 +18,7 @@
(:require [org.apache.storm.security.auth [auth-test :refer [nimbus-timeout]]])
(:import [java.nio ByteBuffer])
(:import [java.util Optional])
- (:import [org.apache.storm LocalCluster$Builder DaemonConfig])
+ (:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
(:import [org.apache.storm.blobstore BlobStore])
(:import [org.apache.storm.utils NimbusClient])
(:import [org.apache.storm.generated NotAliveException StormBase])
@@ -106,7 +106,7 @@
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
- DaemonConfig/NIMBUS_THRIFT_PORT port
+ Config/NIMBUS_THRIFT_PORT port
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
nimbus_client (.getClient client)
@@ -142,7 +142,7 @@
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- DaemonConfig/NIMBUS_THRIFT_PORT port
+ Config/NIMBUS_THRIFT_PORT port
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
nimbus_client (.getClient client)]
@@ -172,7 +172,7 @@
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- DaemonConfig/NIMBUS_THRIFT_PORT port
+ Config/NIMBUS_THRIFT_PORT port
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
nimbus_client (.getClient client)
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 99e7958..33194ec 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -154,4 +154,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index d249313..356a347 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -23,6 +23,7 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.validation.Validated;
import java.util.ArrayList;
import java.util.Map;
@@ -37,7 +38,7 @@ import static org.apache.storm.validation.ConfigValidationAnnotations.*;
*
* This class extends {@link org.apache.storm.Config} for supporting Storm Daemons.
*/
-public class DaemonConfig extends Config {
+public class DaemonConfig implements Validated {
/**
* We check with this interval that whether the Netty channel is writable and try to write pending messages
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 4cd24eb..1330594 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -406,8 +406,8 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
conf.put(Config.STORM_CLUSTER_MODE, "local");
- conf.put(DaemonConfig.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
- conf.put(DaemonConfig.BLOBSTORE_DIR, nimbusTmp.getPath());
+ conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
+ conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
InProcessZookeeper zookeeper = null;
if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/aec187f4/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
index 9e25d16..bd60b62 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
@@ -24,8 +24,8 @@ import org.apache.storm.daemon.drpc.DRPCThrift;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServiceRegistry;
+import org.apache.storm.utils.Utils;
import org.apache.thrift.TException;
/**
@@ -41,7 +41,7 @@ public class LocalDRPC implements ILocalDRPC {
private final String serviceId;
public LocalDRPC() {
- Map<String, Object> conf = ServerConfigUtils.readStormConfig();
+ Map<String, Object> conf = Utils.readStormConfig();
drpc = new DRPC(conf);
serviceId = ServiceRegistry.registerService(new DRPCThrift(drpc));
}