You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/15 20:18:07 UTC
[2/5] storm git commit: [STORM-1084] - Improve Storm config
validation process to use java annotations instead of *_SCHEMA format
[STORM-1084] - Improve Storm config validation process to use java annotations instead of *_SCHEMA format
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7f0882f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7f0882f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7f0882f
Branch: refs/heads/master
Commit: c7f0882f79fa76cfc94a913bf28eb905471f3fc3
Parents: 9fe97b6
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Oct 15 10:26:53 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Oct 15 10:26:53 2015 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/config.clj | 53 +-
storm-core/src/jvm/backtype/storm/Config.java | 521 ++++++++---------
.../storm/validation/ConfigValidation.java | 523 +++++++++++++++++
.../validation/ConfigValidationAnnotations.java | 216 +++++++
.../storm/validation/ConfigValidationUtils.java | 175 ++++++
.../test/clj/backtype/storm/config_test.clj | 186 ------
.../clj/backtype/storm/serialization_test.clj | 14 +-
.../jvm/backtype/storm/TestConfigValidate.java | 565 +++++++++++++++++++
8 files changed, 1756 insertions(+), 497 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 57471f4..0d4f1e6 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -17,8 +17,9 @@
(ns backtype.storm.config
(:import [java.io FileReader File IOException]
[backtype.storm.generated StormTopology])
- (:import [backtype.storm Config ConfigValidation$FieldValidator])
+ (:import [backtype.storm Config])
(:import [backtype.storm.utils Utils LocalState])
+ (:import [backtype.storm.validation ConfigValidation])
(:import [org.apache.commons.io FileUtils])
(:require [clojure [string :as str]])
(:use [backtype.storm log util]))
@@ -28,7 +29,7 @@
(defn- clojure-config-name [name]
(.replace (.toUpperCase name) "_" "-"))
-;; define clojure constants for every configuration parameter
+; define clojure constants for every configuration parameter
(doseq [f (seq (.getFields Config))]
(let [name (.getName f)
new-name (clojure-config-name name)]
@@ -39,35 +40,6 @@
(dofor [f (seq (.getFields Config))]
(.get f nil)))
-(defmulti get-FieldValidator class-selector)
-
-(defmethod get-FieldValidator nil [_]
- (throw (IllegalArgumentException. "Cannot validate a nil field.")))
-
-(defmethod get-FieldValidator
- ConfigValidation$FieldValidator [validator] validator)
-
-(defmethod get-FieldValidator Object
- [klass]
- {:pre [(not (nil? klass))]}
- (reify ConfigValidation$FieldValidator
- (validateField [this name v]
- (if (and (not (nil? v))
- (not (instance? klass v)))
- (throw (IllegalArgumentException.
- (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
-
-;; Create a mapping of config-string -> validator
-;; Config fields must have a _SCHEMA field defined
-(def CONFIG-SCHEMA-MAP
- (->> (.getFields Config)
- (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
- (map (fn [f] [(.get f nil)
- (get-FieldValidator
- (-> Config
- (.getField (str (.getName f) "_SCHEMA"))
- (.get nil)))]))
- (into {})))
(defn cluster-mode
[conf & args]
@@ -92,30 +64,13 @@
[conf]
(even-sampler (sampling-rate conf)))
-; storm.zookeeper.servers:
-; - "server1"
-; - "server2"
-; - "server3"
-; nimbus.host: "master"
-;
-; ########### These all have default values as shown
-;
-; ### storm.* configs are general configurations
-; # the local dir is where jars are kept
-; storm.local.dir: "/mnt/storm"
-; storm.zookeeper.port: 2181
-; storm.zookeeper.root: "/storm"
-
(defn read-default-config
[]
(clojurify-structure (Utils/readDefaultConfig)))
(defn validate-configs-with-schemas
[conf]
- (doseq [[k v] conf
- :let [schema (CONFIG-SCHEMA-MAP k)]]
- (if (not (nil? schema))
- (.validateField schema k v))))
+ (ConfigValidation/validateFields conf))
(defn read-storm-config
[]
http://git-wip-us.apache.org/repos/asf/storm/blob/c7f0882f/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index fcdc8ad..a521b10 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -19,7 +19,10 @@ package backtype.storm;
import backtype.storm.serialization.IKryoDecorator;
import backtype.storm.serialization.IKryoFactory;
+import backtype.storm.validation.ConfigValidationAnnotations.*;
+import backtype.storm.validation.ConfigValidation.*;
import com.esotericsoftware.kryo.Serializer;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -32,7 +35,7 @@ import java.util.Map;
* serializations.
*
* <p>This class also provides constants for all the configurations possible on
- * a Storm cluster and Storm topology. Each constant is paired with a schema
+ * a Storm cluster and Storm topology. Each constant is paired with an annotation
* that defines the validity criterion of the corresponding field. Default
* values for these configs can be found in defaults.yaml.</p>
*
@@ -50,125 +53,125 @@ public class Config extends HashMap<String, Object> {
* This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for
* the user Nimbus and Supervisors use to authenticate with ZK.
*/
+ @isString
public static final String STORM_ZOOKEEPER_SUPERACL = "storm.zookeeper.superACL";
- public static final Object STORM_ZOOKEEPER_SUPERACL_SCHEMA = String.class;
/**
* The transporter for communication among Storm tasks
*/
+ @isString
public static final String STORM_MESSAGING_TRANSPORT = "storm.messaging.transport";
- public static final Object STORM_MESSAGING_TRANSPORT_SCHEMA = String.class;
/**
* Netty based messaging: The buffer size for send/recv buffer
*/
+ @isInteger
public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
- public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
*/
+ @isInteger
public static final String STORM_MESSAGING_NETTY_SOCKET_BACKLOG = "storm.messaging.netty.socket.backlog";
- public static final Object STORM_MESSAGING_NETTY_SOCKET_BACKLOG_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
*@deprecated "Since netty clients should never stop reconnecting - this does not make sense anymore.
*/
@Deprecated
+ @isInteger
public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
- public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: The min # of milliseconds that a peer will wait.
*/
+ @isInteger
public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
- public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: The max # of milliseconds that a peer will wait.
*/
+ @isInteger
public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
- public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: The # of worker threads for the server.
*/
+ @isInteger
public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";
- public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: The # of worker threads for the client.
*/
+ @isInteger
public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";
- public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
*/
+ @isInteger
public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
- public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
* We check with this interval that whether the Netty channel is writable and try to write pending messages
*/
+ @isInteger
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
- public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: Is authentication required for Netty messaging from client worker process to server worker process.
*/
+ @isBoolean
public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication";
- public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
/**
* The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk.
* This is NOT used for compressing serialized tuples sent between topologies.
*/
+ @isString
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
- public static final Object STORM_META_SERIALIZATION_DELEGATE_SCHEMA = String.class;
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
+ @isStringList
public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
- public static final Object STORM_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* The port Storm will use to connect to each of the ZooKeeper servers.
*/
+ @isInteger
public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
- public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* A directory on the local filesystem used by Storm for any local
* filesystem usage it needs. The directory must exist and the Storm daemons must
* have permission to read/write from this location.
*/
+ @isString
public static final String STORM_LOCAL_DIR = "storm.local.dir";
- public static final Object STORM_LOCAL_DIR_SCHEMA = String.class;
/**
* A directory that holds configuration files for log4j2.
* It can be either a relative or an absolute directory.
* If relative, it is relative to the storm's home directory.
*/
+ @isString
public static final String STORM_LOG4J2_CONF_DIR = "storm.log4j2.conf.dir";
- public static final Object STORM_LOG4J2_CONF_DIR_SCHEMA = String.class;
/**
* A global task scheduler used to assign topologies's tasks to supervisors' wokers.
*
* If this is not set, a default system scheduler will be used.
*/
+ @isString
public static final String STORM_SCHEDULER = "storm.scheduler";
- public static final Object STORM_SCHEDULER_SCHEMA = String.class;
/**
* The mode this Storm cluster is running in. Either "distributed" or "local".
*/
+ @isString
public static final String STORM_CLUSTER_MODE = "storm.cluster.mode";
- public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
/**
* What Network Topography detection classes should we use.
@@ -176,8 +179,8 @@ public class Config extends HashMap<String, Object> {
* rack names that correspond to the supervisors. This information is stored in Cluster.java, and
* is used in the resource aware scheduler.
*/
+ @isString
public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
- public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class;
/**
* The hostname the supervisors/workers should report to nimbus. If unset, Storm will
@@ -187,54 +190,54 @@ public class Config extends HashMap<String, Object> {
* can utilize to find each other based on hostname got from calls to
* <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
*/
+ @isString
public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
- public static final Object STORM_LOCAL_HOSTNAME_SCHEMA = String.class;
/**
* The plugin that will convert a principal to a local user.
*/
+ @isString
public static final String STORM_PRINCIPAL_TO_LOCAL_PLUGIN = "storm.principal.tolocal";
- public static final Object STORM_PRINCIPAL_TO_LOCAL_PLUGIN_SCHEMA = String.class;
/**
* The plugin that will provide user groups service
*/
+ @isString
public static final String STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN = "storm.group.mapping.service";
- public static final Object STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN_SCHEMA = String.class;
/**
* Max no.of seconds group mapping service will cache user groups
*/
+ @isNumber
public static final String STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS = "storm.group.mapping.service.cache.duration.secs";
- public static final Object STORM_GROUP_MAPPING_SERVICE_CACHE_DURATION_SECS_SCHEMA = Number.class;
/**
* Initialization parameters for the group mapping service plugin.
* Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN}
* implementation to access optional settings.
*/
+ @isType(type=Map.class)
public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params";
- public static final Object STORM_GROUP_MAPPING_SERVICE_PARAMS_SCHEMA = Map.class;
/**
* The default transport plug-in for Thrift client/server communication
*/
+ @isString
public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
- public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
* The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
*/
+ @isString
public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
- public static final Object TOPOLOGY_TUPLE_SERIALIZER_SCHEMA = String.class;
/**
* Try to serialize all tuples, even for local transfers. This should only be used
* for testing, as a sanity check that all of your tuples are setup properly.
*/
+ @isBoolean
public static final String TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE = "topology.testing.always.try.serialize";
- public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class;
/**
* Whether or not to use ZeroMQ for messaging in local mode. If this is set
@@ -244,50 +247,50 @@ public class Config extends HashMap<String, Object> {
*
* Defaults to false.
*/
+ @isBoolean
public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
- public static final Object STORM_LOCAL_MODE_ZMQ_SCHEMA = Boolean.class;
/**
* The root location at which Storm stores data in ZooKeeper.
*/
+ @isString
public static final String STORM_ZOOKEEPER_ROOT = "storm.zookeeper.root";
- public static final Object STORM_ZOOKEEPER_ROOT_SCHEMA = String.class;
/**
* The session timeout for clients to ZooKeeper.
*/
+ @isInteger
public static final String STORM_ZOOKEEPER_SESSION_TIMEOUT = "storm.zookeeper.session.timeout";
- public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The connection timeout for clients to ZooKeeper.
*/
+ @isInteger
public static final String STORM_ZOOKEEPER_CONNECTION_TIMEOUT = "storm.zookeeper.connection.timeout";
- public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The number of times to retry a Zookeeper operation.
*/
+ @isInteger
public static final String STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times";
- public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The interval between retries of a Zookeeper operation.
*/
+ @isInteger
public static final String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
- public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The ceiling of the interval between retries of a Zookeeper operation.
*/
+ @isInteger
public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
- public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The cluster Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
*/
+ @isString
public static final String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
- public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = String.class;
/**
* A string representing the payload for cluster Zookeeper authentication.
@@ -298,127 +301,127 @@ public class Config extends HashMap<String, Object> {
* This file storm-cluster-auth.yaml should then be protected with
* appropriate permissions that deny access from workers.
*/
+ @isString
public static final String STORM_ZOOKEEPER_AUTH_PAYLOAD="storm.zookeeper.auth.payload";
- public static final Object STORM_ZOOKEEPER_AUTH_PAYLOAD_SCHEMA = String.class;
/**
* The topology Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
*/
+ @isString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME="storm.zookeeper.topology.auth.scheme";
- public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME_SCHEMA = String.class;
/**
* A string representing the payload for topology Zookeeper authentication. It gets serialized using UTF-8 encoding during authentication.
*/
+ @isString
public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD="storm.zookeeper.topology.auth.payload";
- public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD_SCHEMA = String.class;
/**
* The id assigned to a running topology. The id is the storm name with a unique nonce appended.
*/
+ @isString
public static final String STORM_ID = "storm.id";
- public static final Object STORM_ID_SCHEMA = String.class;
/**
* The number of times to retry a Nimbus operation.
*/
+ @isNumber
public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
- public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
/**
* The starting interval between exponential backoff retries of a Nimbus operation.
*/
+ @isNumber
public static final String STORM_NIMBUS_RETRY_INTERVAL="storm.nimbus.retry.interval.millis";
- public static final Object STORM_NIMBUS_RETRY_INTERVAL_SCHEMA = Number.class;
/**
* The ceiling of the interval between retries of a client connect to Nimbus operation.
*/
+ @isNumber
public static final String STORM_NIMBUS_RETRY_INTERVAL_CEILING="storm.nimbus.retry.intervalceiling.millis";
- public static final Object STORM_NIMBUS_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
/**
* The Nimbus transport plug-in for Thrift client/server communication
*/
+ @isString
public static final String NIMBUS_THRIFT_TRANSPORT_PLUGIN = "nimbus.thrift.transport";
- public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
* The host that the master server is running on, added only for backward compatibility,
* the usage deprecated in favor of nimbus.seeds config.
*/
@Deprecated
+ @isString
public static final String NIMBUS_HOST = "nimbus.host";
- public static final Object NIMBUS_HOST_SCHEMA = String.class;
/**
* List of seed nimbus hosts to use for leader nimbus discovery.
*/
+ @isStringList
public static final String NIMBUS_SEEDS = "nimbus.seeds";
- public static final Object NIMBUS_SEEDS_SCHEMA = ConfigValidation.StringsValidator;
/**
* Which port the Thrift interface of Nimbus should run on. Clients should
* connect to this port to upload jars and submit topologies.
*/
+ @isInteger
public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
- public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The number of threads that should be used by the nimbus thrift server.
*/
+ @isNumber
public static final String NIMBUS_THRIFT_THREADS = "nimbus.thrift.threads";
- public static final Object NIMBUS_THRIFT_THREADS_SCHEMA = Number.class;
/**
* A list of users that are cluster admins and can run any command. To use this set
* nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
+ @isStringList
public static final String NIMBUS_ADMINS = "nimbus.admins";
- public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of users that are the only ones allowed to run user operation on storm cluster.
* To use this set nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
+ @isStringList
public static final String NIMBUS_USERS = "nimbus.users";
- public static final Object NIMBUS_USERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of groups , users belong to these groups are the only ones allowed to run user operation on storm cluster.
* To use this set nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
+ @isStringList
public static final String NIMBUS_GROUPS = "nimbus.groups";
- public static final Object NIMBUS_GROUPS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of users that run the supervisors and should be authorized to interact with
* nimbus as a supervisor would. To use this set
* nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
+ @isStringList
public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
- public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* The maximum buffer size thrift should use when reading messages.
*/
+ @isInteger
public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
- public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
* This parameter is used by the storm-deploy project to configure the
* jvm options for the nimbus daemon.
*/
+ @isString
public static final String NIMBUS_CHILDOPTS = "nimbus.childopts";
- public static final Object NIMBUS_CHILDOPTS_SCHEMA = String.class;
/**
* How long without heartbeating a task can go before nimbus will consider the
* task dead and reassign it to another location.
*/
+ @isInteger
public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
- public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
@@ -427,15 +430,15 @@ public class Config extends HashMap<String, Object> {
* This parameter is for checking for failures when there's no explicit event like that
* occuring.
*/
+ @isInteger
public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
- public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How often nimbus should wake the cleanup thread to clean the inbox.
* @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
*/
+ @isInteger
public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
- public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
@@ -446,15 +449,15 @@ public class Config extends HashMap<String, Object> {
* is set to).
* @see NIMBUS_CLEANUP_FREQ_SECS
*/
+ @isInteger
public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
- public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How long before a supervisor can go without heartbeating before nimbus considers it dead
* and stops assigning new work to it.
*/
+ @isInteger
public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
- public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* A special timeout used when a task is initially launched. During launch, this is the timeout
@@ -463,369 +466,368 @@ public class Config extends HashMap<String, Object> {
* <p>A separate timeout exists for launch because there can be quite a bit of overhead
* to launching new JVM's and configuring them.</p>
*/
+ @isInteger
public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
- public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Whether or not nimbus should reassign tasks if it detects that a task goes down.
* Defaults to true, and it's not recommended to change this value.
*/
+ @isBoolean
public static final String NIMBUS_REASSIGN = "nimbus.reassign";
- public static final Object NIMBUS_REASSIGN_SCHEMA = Boolean.class;
/**
* During upload/download with the master, how long an upload or download connection is idle
* before nimbus considers it dead and drops the connection.
*/
+ @isInteger
public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
- public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* A custom class that implements ITopologyValidator that is run whenever a
* topology is submitted. Can be used to provide business-specific logic for
* whether topologies are allowed to run or not.
*/
+ @isString
public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
- public static final Object NIMBUS_TOPOLOGY_VALIDATOR_SCHEMA = String.class;
/**
* Class name for authorization plugin for Nimbus
*/
+ @isString
public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
- public static final Object NIMBUS_AUTHORIZER_SCHEMA = String.class;
-
/**
* Impersonation user ACL config entries.
*/
+ @isString
public static final String NIMBUS_IMPERSONATION_AUTHORIZER = "nimbus.impersonation.authorizer";
- public static final Object NIMBUS_IMPERSONATION_AUTHORIZER_SCHEMA = String.class;
-
/**
* Impersonation user ACL config entries.
*/
+ @isImpersonationAcl
public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
- public static final Object NIMBUS_IMPERSONATION_ACL_SCHEMA = ConfigValidation.MapOfStringToMapValidator;
/**
* How often nimbus should wake up to renew credentials if needed.
*/
+ @isNumber
public static final String NIMBUS_CREDENTIAL_RENEW_FREQ_SECS = "nimbus.credential.renewers.freq.secs";
- public static final Object NIMBUS_CREDENTIAL_RENEW_FREQ_SECS_SCHEMA = Number.class;
/**
* A list of credential renewers that nimbus should load.
*/
+ @isStringList
public static final String NIMBUS_CREDENTIAL_RENEWERS = "nimbus.credential.renewers.classes";
- public static final Object NIMBUS_CREDENTIAL_RENEWERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of plugins that nimbus should load during submit topology to populate
* credentials on user's behalf.
*/
+ @isStringList
public static final String NIMBUS_AUTO_CRED_PLUGINS = "nimbus.autocredential.plugins.classes";
- public static final Object NIMBUS_AUTO_CRED_PLUGINS_SCHEMA = ConfigValidation.StringsValidator;
/**
* Storm UI binds to this host/interface.
*/
+ @isString
public static final String UI_HOST = "ui.host";
- public static final Object UI_HOST_SCHEMA = String.class;
/**
* Storm UI binds to this port.
*/
+ @isInteger
public static final String UI_PORT = "ui.port";
- public static final Object UI_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* HTTP UI port for log viewer
*/
+ @isInteger
public static final String LOGVIEWER_PORT = "logviewer.port";
- public static final Object LOGVIEWER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Childopts for log viewer java process.
*/
+ @isString
public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";
- public static final Object LOGVIEWER_CHILDOPTS_SCHEMA = String.class;
/**
* How often to clean up old log files
*/
+ @isInteger
+ @isPositiveNumber
public static final String LOGVIEWER_CLEANUP_INTERVAL_SECS = "logviewer.cleanup.interval.secs";
- public static final Object LOGVIEWER_CLEANUP_INTERVAL_SECS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
/**
* How many minutes since a log was last modified for the log to be considered for clean-up
*/
+ @isInteger
+ @isPositiveNumber
public static final String LOGVIEWER_CLEANUP_AGE_MINS = "logviewer.cleanup.age.mins";
- public static final Object LOGVIEWER_CLEANUP_AGE_MINS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
/**
* Storm Logviewer HTTPS port
*/
+ @isNumber
public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";
- public static final Object LOGVIEWER_HTTPS_PORT_SCHEMA = Number.class;
/**
* Path to the keystore containing the certs used by Storm Logviewer for HTTPS communications
*/
+ @isString
public static final String LOGVIEWER_HTTPS_KEYSTORE_PATH = "logviewer.https.keystore.path";
- public static final Object LOGVIEWER_HTTPS_KEYSTORE_PATH_SCHEMA = String.class;
/**
* Password for the keystore for HTTPS for Storm Logviewer
*/
+ @isString
public static final String LOGVIEWER_HTTPS_KEYSTORE_PASSWORD = "logviewer.https.keystore.password";
- public static final Object LOGVIEWER_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class;
/**
* Type of the keystore for HTTPS for Storm Logviewer.
* see http://docs.oracle.com/javase/8/docs/api/java/security/KeyStore.html for more details.
*/
+ @isString
public static final String LOGVIEWER_HTTPS_KEYSTORE_TYPE = "logviewer.https.keystore.type";
- public static final Object LOGVIEWER_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
/**
* Password to the private key in the keystore for settting up HTTPS (SSL).
*/
+ @isString
public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password";
- public static final Object LOGVIEWER_HTTPS_KEY_PASSWORD_SCHEMA = String.class;
/**
* Path to the truststore containing the certs used by Storm Logviewer for HTTPS communications
*/
+ @isString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PATH = "logviewer.https.truststore.path";
- public static final Object LOGVIEWER_HTTPS_TRUSTSTORE_PATH_SCHEMA = String.class;
/**
* Password for the truststore for HTTPS for Storm Logviewer
*/
+ @isString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD = "logviewer.https.truststore.password";
- public static final Object LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class;
/**
* Type of the truststore for HTTPS for Storm Logviewer.
* see http://docs.oracle.com/javase/8/docs/api/java/security/Truststore.html for more details.
*/
+ @isString
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_TYPE = "logviewer.https.truststore.type";
- public static final Object LOGVIEWER_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
/**
* Password to the truststore used by Storm Logviewer settting up HTTPS (SSL).
*/
+ @isBoolean
public static final String LOGVIEWER_HTTPS_WANT_CLIENT_AUTH = "logviewer.https.want.client.auth";
- public static final Object LOGVIEWER_HTTPS_WANT_CLIENT_AUTH_SCHEMA = Boolean.class;
+ @isBoolean
public static final String LOGVIEWER_HTTPS_NEED_CLIENT_AUTH = "logviewer.https.need.client.auth";
- public static final Object LOGVIEWER_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class;
/**
* A list of users allowed to view logs via the Log Viewer
*/
+ @isStringList
public static final String LOGS_USERS = "logs.users";
- public static final Object LOGS_USERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of groups allowed to view logs via the Log Viewer
*/
+ @isStringList
public static final String LOGS_GROUPS = "logs.groups";
- public static final Object LOGS_GROUPS_SCHEMA = ConfigValidation.StringsValidator;
/**
* Appender name used by log viewer to determine log directory.
*/
+ @isString
public static final String LOGVIEWER_APPENDER_NAME = "logviewer.appender.name";
- public static final Object LOGVIEWER_APPENDER_NAME_SCHEMA = String.class;
/**
* Childopts for Storm UI Java process.
*/
+ @isString
public static final String UI_CHILDOPTS = "ui.childopts";
- public static final Object UI_CHILDOPTS_SCHEMA = String.class;
/**
* A class implementing javax.servlet.Filter for authenticating/filtering UI requests
*/
+ @isString
public static final String UI_FILTER = "ui.filter";
- public static final Object UI_FILTER_SCHEMA = String.class;
/**
* Initialization parameters for the javax.servlet.Filter
*/
+ @isType(type=Map.class)
public static final String UI_FILTER_PARAMS = "ui.filter.params";
- public static final Object UI_FILTER_PARAMS_SCHEMA = Map.class;
/**
* The size of the header buffer for the UI in bytes
*/
+ @isNumber
public static final String UI_HEADER_BUFFER_BYTES = "ui.header.buffer.bytes";
- public static final Object UI_HEADER_BUFFER_BYTES_SCHEMA = Number.class;
/**
* This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
*/
+ @isNumber
public static final String UI_HTTPS_PORT = "ui.https.port";
- public static final Object UI_HTTPS_PORT_SCHEMA = Number.class;
/**
* Path to the keystore used by Storm UI for setting up HTTPS (SSL).
*/
+ @isString
public static final String UI_HTTPS_KEYSTORE_PATH = "ui.https.keystore.path";
- public static final Object UI_HTTPS_KEYSTORE_PATH_SCHEMA = String.class;
/**
* Password to the keystore used by Storm UI for setting up HTTPS (SSL).
*/
+ @isString
public static final String UI_HTTPS_KEYSTORE_PASSWORD = "ui.https.keystore.password";
- public static final Object UI_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class;
/**
* Type of keystore used by Storm UI for setting up HTTPS (SSL).
* see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
*/
+ @isString
public static final String UI_HTTPS_KEYSTORE_TYPE = "ui.https.keystore.type";
- public static final Object UI_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
/**
* Password to the private key in the keystore for settting up HTTPS (SSL).
*/
+ @isString
public static final String UI_HTTPS_KEY_PASSWORD = "ui.https.key.password";
- public static final Object UI_HTTPS_KEY_PASSWORD_SCHEMA = String.class;
/**
* Path to the truststore used by Storm UI settting up HTTPS (SSL).
*/
+ @isString
public static final String UI_HTTPS_TRUSTSTORE_PATH = "ui.https.truststore.path";
- public static final Object UI_HTTPS_TRUSTSTORE_PATH_SCHEMA = String.class;
/**
* Password to the truststore used by Storm UI settting up HTTPS (SSL).
*/
+ @isString
public static final String UI_HTTPS_TRUSTSTORE_PASSWORD = "ui.https.truststore.password";
- public static final Object UI_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class;
/**
* Type of truststore used by Storm UI for setting up HTTPS (SSL).
* see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
*/
+ @isString
public static final String UI_HTTPS_TRUSTSTORE_TYPE = "ui.https.truststore.type";
- public static final Object UI_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
/**
* Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
*/
+ @isBoolean
public static final String UI_HTTPS_WANT_CLIENT_AUTH = "ui.https.want.client.auth";
- public static final Object UI_HTTPS_WANT_CLIENT_AUTH_SCHEMA = Boolean.class;
+ @isBoolean
public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
- public static final Object UI_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class;
-
/**
* List of DRPC servers so that the DRPCSpout knows who to talk to.
*/
+ @isStringList
public static final String DRPC_SERVERS = "drpc.servers";
- public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
*/
+ @isNumber
public static final String DRPC_HTTP_PORT = "drpc.http.port";
- public static final Object DRPC_HTTP_PORT_SCHEMA = Number.class;
/**
* This port is used by Storm DRPC for receiving HTTPS (SSL) DPRC requests from clients.
*/
+ @isNumber
public static final String DRPC_HTTPS_PORT = "drpc.https.port";
- public static final Object DRPC_HTTPS_PORT_SCHEMA = Number.class;
/**
* Path to the keystore used by Storm DRPC for setting up HTTPS (SSL).
*/
+ @isString
public static final String DRPC_HTTPS_KEYSTORE_PATH = "drpc.https.keystore.path";
- public static final Object DRPC_HTTPS_KEYSTORE_PATH_SCHEMA = String.class;
/**
* Password to the keystore used by Storm DRPC for setting up HTTPS (SSL).
*/
+ @isString
public static final String DRPC_HTTPS_KEYSTORE_PASSWORD = "drpc.https.keystore.password";
- public static final Object DRPC_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class;
/**
* Type of keystore used by Storm DRPC for setting up HTTPS (SSL).
* see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
*/
+ @isString
public static final String DRPC_HTTPS_KEYSTORE_TYPE = "drpc.https.keystore.type";
- public static final Object DRPC_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
/**
* Password to the private key in the keystore for settting up HTTPS (SSL).
*/
+ @isString
public static final String DRPC_HTTPS_KEY_PASSWORD = "drpc.https.key.password";
- public static final Object DRPC_HTTPS_KEY_PASSWORD_SCHEMA = String.class;
/**
* Path to the truststore used by Storm DRPC settting up HTTPS (SSL).
*/
+ @isString
public static final String DRPC_HTTPS_TRUSTSTORE_PATH = "drpc.https.truststore.path";
- public static final Object DRPC_HTTPS_TRUSTSTORE_PATH_SCHEMA = String.class;
/**
* Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
*/
+ @isString
public static final String DRPC_HTTPS_TRUSTSTORE_PASSWORD = "drpc.https.truststore.password";
- public static final Object DRPC_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class;
/**
* Type of truststore used by Storm DRPC for setting up HTTPS (SSL).
* see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details.
*/
+ @isString
public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = "drpc.https.truststore.type";
- public static final Object DRPC_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
/**
* Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
*/
+ @isBoolean
public static final String DRPC_HTTPS_WANT_CLIENT_AUTH = "drpc.https.want.client.auth";
- public static final Object DRPC_HTTPS_WANT_CLIENT_AUTH_SCHEMA = Boolean.class;
+ @isBoolean
public static final String DRPC_HTTPS_NEED_CLIENT_AUTH = "drpc.https.need.client.auth";
- public static final Object DRPC_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class;
/**
* The DRPC transport plug-in for Thrift client/server communication
*/
+ @isString
public static final String DRPC_THRIFT_TRANSPORT_PLUGIN = "drpc.thrift.transport";
- public static final Object DRPC_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
* This port is used by Storm DRPC for receiving DPRC requests from clients.
*/
+ @isInteger
public static final String DRPC_PORT = "drpc.port";
- public static final Object DRPC_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Class name for authorization plugin for DRPC client
*/
+ @isString
public static final String DRPC_AUTHORIZER = "drpc.authorizer";
- public static final Object DRPC_AUTHORIZER_SCHEMA = String.class;
/**
* The Access Control List for the DRPC Authorizer.
* @see DRPCSimpleAclAuthorizer
*/
+ @isType(type=Map.class)
public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
- public static final Object DRPC_AUTHORIZER_ACL_SCHEMA = Map.class;
/**
* File name of the DRPC Authorizer ACL.
* @see DRPCSimpleAclAuthorizer
*/
+ @isString
public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename";
- public static final Object DRPC_AUTHORIZER_ACL_FILENAME_SCHEMA = String.class;
/**
* Whether the DRPCSimpleAclAuthorizer should deny requests for operations
@@ -836,128 +838,136 @@ public class Config extends HashMap<String, Object> {
* any request for functions will be denied.
* @see DRPCSimpleAclAuthorizer
*/
+ @isBoolean
public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict";
- public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
/**
* DRPC thrift server worker threads
*/
+ @isInteger
public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
- public static final Object DRPC_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The maximum buffer size thrift should use when reading messages for DRPC.
*/
+ @isNumber
public static final String DRPC_MAX_BUFFER_SIZE = "drpc.max_buffer_size";
- public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
/**
* DRPC thrift server queue size
*/
+ @isInteger
public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
- public static final Object DRPC_QUEUE_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The DRPC invocations transport plug-in for Thrift client/server communication
*/
+ @isString
public static final String DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN = "drpc.invocations.thrift.transport";
- public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
/**
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
+ @isInteger
public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
- public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* DRPC invocations thrift server worker threads
*/
+ @isNumber
public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
- public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
/**
* The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also
* timeout based on the socket timeout on the DRPC client, and separately based on the topology message
* timeout for the topology implementing the DRPC function.
*/
+
+ @isInteger
+ @isPositiveNumber
+ @NotNull
public static final String DRPC_REQUEST_TIMEOUT_SECS = "drpc.request.timeout.secs";
- public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
/**
* Childopts for Storm DRPC Java process.
*/
+ @isString
public static final String DRPC_CHILDOPTS = "drpc.childopts";
- public static final Object DRPC_CHILDOPTS_SCHEMA = String.class;
/**
* Class name of the HTTP credentials plugin for the UI.
*/
+ @isString
public static final String UI_HTTP_CREDS_PLUGIN = "ui.http.creds.plugin";
- public static final Object UI_HTTP_CREDS_PLUGIN_SCHEMA = String.class;
/**
* Class name of the HTTP credentials plugin for DRPC.
*/
+ @isString
public static final String DRPC_HTTP_CREDS_PLUGIN = "drpc.http.creds.plugin";
- public static final Object DRPC_HTTP_CREDS_PLUGIN_SCHEMA = String.class;
/**
* the metadata configured on the supervisor
*/
+ @isType(type=Map.class)
public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta";
- public static final Object SUPERVISOR_SCHEDULER_META_SCHEMA = Map.class;
+
/**
* A list of ports that can run workers on this supervisor. Each worker uses one port, and
* the supervisor will only run one worker per port. Use this configuration to tune
* how many workers run on each machine.
*/
+ @isNoDuplicateInList
+ @NotNull
+ @isListEntryCustom(entryValidatorClasses={IntegerValidator.class,PositiveNumberValidator.class})
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
- public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NoDuplicateIntegersValidator;
/**
* A number representing the maximum number of workers any single topology can acquire.
*/
+ @isNumber
public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
- public static final Object NIMBUS_SLOTS_PER_TOPOLOGY_SCHEMA = Number.class;
/**
* A class implementing javax.servlet.Filter for DRPC HTTP requests
*/
+ @isString
public static final String DRPC_HTTP_FILTER = "drpc.http.filter";
- public static final Object DRPC_HTTP_FILTER_SCHEMA = String.class;
/**
* Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
* service
*/
+ @isType(type=Map.class)
public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
- public static final Object DRPC_HTTP_FILTER_PARAMS_SCHEMA = Map.class;
/**
* A number representing the maximum number of executors any single topology can acquire.
*/
+ @isNumber
public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
- public static final Object NIMBUS_EXECUTORS_PER_TOPOLOGY_SCHEMA = Number.class;
/**
* This parameter is used by the storm-deploy project to configure the
* jvm options for the supervisor daemon.
*/
+ @isString
public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
- public static final Object SUPERVISOR_CHILDOPTS_SCHEMA = String.class;
/**
* How long a worker can go without heartbeating before the supervisor tries to
* restart the worker process.
*/
+ @isInteger
+ @isPositiveNumber
+ @NotNull
public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
- public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
/**
* How many seconds to sleep for before shutting down threads on worker
*/
+ @isInteger
public static final String SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS = "supervisor.worker.shutdown.sleep.secs";
- public static final Object SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How long a worker can go without heartbeating during the initial launch before
@@ -965,50 +975,52 @@ public class Config extends HashMap<String, Object> {
* supervisor.worker.timeout.secs during launch because there is additional
* overhead to starting and configuring the JVM on launch.
*/
+ @isInteger
+ @isPositiveNumber
+ @NotNull
public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
- public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
/**
* Whether or not the supervisor should launch workers assigned to it. Defaults
* to true -- and you should probably never change this value. This configuration
* is used in the Storm unit tests.
*/
+ @isBoolean
public static final String SUPERVISOR_ENABLE = "supervisor.enable";
- public static final Object SUPERVISOR_ENABLE_SCHEMA = Boolean.class;
/**
* how often the supervisor sends a heartbeat to the master.
*/
+ @isInteger
public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs";
- public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How often the supervisor checks the worker heartbeats to see if any of them
* need to be restarted.
*/
+ @isInteger
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
- public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Should the supervior try to run the worker as the lauching user or not. Defaults to false.
*/
+ @isBoolean
public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
- public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
/**
* Full path to the worker-laucher executable that will be used to lauch workers when
* SUPERVISOR_RUN_WORKER_AS_USER is set to true.
*/
+ @isString
public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
- public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
/**
* The total amount of memory (in MiB) a supervisor is allowed to give to its workers.
* A default value will be set for this config if user does not override
*/
+ @isPositiveNumber
public static final String SUPERVISOR_MEMORY_CAPACITY_MB = "supervisor.memory.capacity.mb";
- public static final Object SUPERVISOR_MEMORY_CAPACITY_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
/**
* The total amount of CPU resources a supervisor is allowed to give to its workers.
@@ -1016,8 +1028,8 @@ public class Config extends HashMap<String, Object> {
* using 100 makes it simple to set the desired value to the capacity measurement
* for single threaded bolts. A default value will be set for this config if user does not override
*/
+ @isPositiveNumber
public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
- public static final Object SUPERVISOR_CPU_CAPACITY_SCHEMA = ConfigValidation.PositiveNumberValidator;
/**
* The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%"
@@ -1027,35 +1039,34 @@ public class Config extends HashMap<String, Object> {
* %TOPOLOGY-ID% -> topology-id,
* %WORKER-PORT% -> port.
*/
+ @isStringOrStringList
public static final String WORKER_CHILDOPTS = "worker.childopts";
- public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* The jvm opts provided to workers launched by this supervisor for GC. All "%ID%" substrings are replaced
* with an identifier for this worker. Because the JVM complains about multiple GC opts the topology
* can override this default value by setting topology.worker.gc.childopts.
*/
+ @isStringOrStringList
public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
- public static final Object WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* control how many worker receiver threads we need per worker
*/
+ @isInteger
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
- public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How often this worker should heartbeat to the supervisor.
*/
+ @isInteger
public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
- public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How often a task should heartbeat its status to the master.
*/
+ @isInteger
public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
- public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
-
/**
* How often a task should sync its connections with other tasks (if a task is
@@ -1064,29 +1075,29 @@ public class Config extends HashMap<String, Object> {
* almost immediately. This configuration is here just in case that notification doesn't
* come through.
*/
+ @isInteger
public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
- public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How often a worker should check dynamic log level timeouts for expiration.
* For expired logger settings, the clean up polling task will reset the log levels
* to the original levels (detected at startup), and will clean up the timeout map
*/
+ @isInteger
+ @isPositiveNumber
public static final String WORKER_LOG_LEVEL_RESET_POLL_SECS = "worker.log.level.reset.poll.secs";
- public static final Object WORKER_LOG_LEVEL_RESET_POLL_SECS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
/**
* How often a task should sync credentials, worst case.
*/
+ @isNumber
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
- public static final Object TASK_CREDENTIALS_POLL_SECS_SCHEMA = Number.class;
-
/**
* Whether to enable backpressure in for a certain topology
*/
+ @isBoolean
public static final String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable";
- public static final Object TOPOLOGY_BACKPRESSURE_ENABLE_SCHEMA = Boolean.class;
/**
* This signifies the tuple congestion in a disruptor queue.
@@ -1094,50 +1105,50 @@ public class Config extends HashMap<String, Object> {
* the backpressure scheme, if enabled, should slow down the tuple sending speed of
* the spouts until reaching the low watermark.
*/
+ @isPositiveNumber
public static final String BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK="backpressure.disruptor.high.watermark";
- public static final Object BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
/**
* This signifies a state that a disruptor queue has left the congestion.
* If the used ratio of a disruptor queue is lower than the low watermark,
* it will unset the backpressure flag.
*/
+ @isPositiveNumber
public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
- public static final Object BACKPRESSURE_DISRUPTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
/**
* A list of users that are allowed to interact with the topology. To use this set
* nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
+ @isStringList
public static final String TOPOLOGY_USERS = "topology.users";
- public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of groups that are allowed to interact with the topology. To use this set
* nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
+ @isStringList
public static final String TOPOLOGY_GROUPS = "topology.groups";
- public static final Object TOPOLOGY_GROUPS_SCHEMA = ConfigValidation.StringsValidator;
/**
* True if Storm should timeout messages or not. Defaults to true. This is meant to be used
* in unit tests to prevent tuples from being accidentally timed out during the test.
*/
+ @isBoolean
public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";
- public static final Object TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS_SCHEMA = Boolean.class;
/**
* When set to true, Storm will log every message that's emitted.
*/
+ @isBoolean
public static final String TOPOLOGY_DEBUG = "topology.debug";
- public static final Object TOPOLOGY_DEBUG_SCHEMA = Boolean.class;
/**
* The serializer for communication between shell components and non-JVM
* processes
*/
+ @isString
public static final String TOPOLOGY_MULTILANG_SERIALIZER = "topology.multilang.serializer";
- public static final Object TOPOLOGY_MULTILANG_SERIALIZER_SCHEMA = String.class;
/**
* How many processes should be spawned around the cluster to execute this
@@ -1145,8 +1156,8 @@ public class Config extends HashMap<String, Object> {
* them. This parameter should be used in conjunction with the parallelism hints
* on each component in the topology to tune the performance of a topology.
*/
+ @isInteger
public static final String TOPOLOGY_WORKERS = "topology.workers";
- public static final Object TOPOLOGY_WORKERS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How many instances to create for a spout/bolt. A task runs on a thread with zero or more
@@ -1156,36 +1167,36 @@ public class Config extends HashMap<String, Object> {
* without redeploying the topology or violating the constraints of Storm (such as a fields grouping
* guaranteeing that the same value goes to the same task).
*/
+ @isInteger
public static final String TOPOLOGY_TASKS = "topology.tasks";
- public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler
* to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override
*/
+ @isPositiveNumber(includeZero = true)
public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb";
- public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
/**
* The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler
* to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override
*/
+ @isPositiveNumber(includeZero = true)
public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb";
- public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
/**
* The config indicates the percentage of cpu for a core an instance(executor) of a component will use.
* Assuming the a core value to be 100, a value of 10 indicates 10% of the core.
* The P in PCORE represents the term "physical". A default value will be set for this config if user does not override
*/
+ @isPositiveNumber(includeZero = true)
public static final String TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT = "topology.component.cpu.pcore.percent";
- public static final Object TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
/**
* A per topology config that specifies the maximum amount of memory a worker can use for that specific topology
*/
+ @isPositiveNumber
public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb";
- public static final Object TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
/**
* How many executors to spawn for ackers.
@@ -1194,8 +1205,8 @@ public class Config extends HashMap<String, Object> {
* to be equal to the number of workers configured for this topology. If this variable is set to 0,
* then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.</p>
*/
+ @isInteger
public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
- public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How many executors to spawn for event logger.
@@ -1204,8 +1215,8 @@ public class Config extends HashMap<String, Object> {
* to be equal to the number of workers configured for this topology. If this variable is set to 0,
* event logging will be disabled.</p>
*/
+ @isInteger
public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors";
- public static final Object TOPOLOGY_EVENTLOGGER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The maximum amount of time given to the topology to fully process a message
@@ -1213,8 +1224,10 @@ public class Config extends HashMap<String, Object> {
* will fail the message on the spout. Some spouts implementations will then replay
* the message at a later time.
*/
+ @isInteger
+ @isPositiveNumber
+ @NotNull
public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
- public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
/**
* A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ),
@@ -1224,8 +1237,8 @@ public class Config extends HashMap<String, Object> {
*
* See Kryo's documentation for more information about writing custom serializers.
*/
+ @isKryoReg
public static final String TOPOLOGY_KRYO_REGISTER = "topology.kryo.register";
- public static final Object TOPOLOGY_KRYO_REGISTER_SCHEMA = ConfigValidation.KryoRegValidator;
/**
* A list of classes that customize storm's kryo instance during start-up.
@@ -1233,17 +1246,16 @@ public class Config extends HashMap<String, Object> {
* listed class is instantiated with 0 arguments, then its 'decorate' method
* is called with storm's kryo instance as the only argument.
*/
+ @isStringList
public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
- public static final Object TOPOLOGY_KRYO_DECORATORS_SCHEMA = ConfigValidation.StringsValidator;
/**
* Class that specifies how to create a Kryo instance for serialization. Storm will then apply
* topology.kryo.register and topology.kryo.decorators on top of this. The default implementation
* implements topology.fall.back.on.java.serialization and turns references off.
*/
+ @isString
public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory";
- public static final Object TOPOLOGY_KRYO_FACTORY_SCHEMA = String.class;
-
/**
* Whether or not Storm should skip the loading of kryo registrations for which it
@@ -1255,36 +1267,36 @@ public class Config extends HashMap<String, Object> {
* By setting this config to true, Storm will ignore that it doesn't have those other serializations
* rather than throw an error.
*/
+ @isBoolean
public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations";
- public static final Object TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS_SCHEMA = Boolean.class;
/**
* A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
* Each listed class will be routed all the metrics data generated by the storm metrics API.
* Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
*/
+
+ @isListEntryType(type=Map.class)
public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
- public static final Object TOPOLOGY_METRICS_CONSUMER_REGISTER_SCHEMA = ConfigValidation.MapsValidator;
/**
* A map of metric name to class name implementing IMetric that will be created once per worker JVM
*/
+ @isType(type=Map.class)
public static final String TOPOLOGY_WORKER_METRICS = "topology.worker.metrics";
- public static final Object TOPOLOGY_WORKER_METRICS_SCHEMA = Map.class;
/**
* A map of metric name to class name implementing IMetric that will be created once per worker JVM
*/
+ @isType(type=Map.class)
public static final String WORKER_METRICS = "worker.metrics";
- public static final Object WORKER_METRICS_SCHEMA = Map.class;
/**
* The maximum parallelism allowed for a component in this topology. This configuration is
* typically used in testing to limit the number of threads spawned in local mode.
*/
+ @isInteger
public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
- public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = ConfigValidation.IntegerValidator;
-
/**
* The maximum number of tuples that can be pending on a spout task at any given time.
@@ -1294,8 +1306,8 @@ public class Config extends HashMap<String, Object> {
* Note that this config parameter has no effect for unreliable spouts that don't tag
* their tuples with a message id.
*/
+ @isInteger
public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
- public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
/**
* A class that implements a strategy for what to do when a spout needs to wait. Waiting is
@@ -1304,70 +1316,72 @@ public class Config extends HashMap<String, Object> {
* 1. nextTuple emits no tuples
* 2. The spout has hit maxSpoutPending and can't emit any more tuples
*/
+ @isString
public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
- public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
/**
* The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
*/
+ @isInteger
public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
- public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The maximum amount of time a component gives a source of state to synchronize before it requests
* synchronization again.
*/
+ @isInteger
+ @isPositiveNumber
+ @NotNull
public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
- public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
/**
* The percentage of tuples to sample to produce stats for a task.
*/
+ @isPositiveNumber
public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
- public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA =ConfigValidation.PositiveNumberValidator;
/**
* The time period that builtin metrics data in bucketed into.
*/
+ @isInteger
public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
- public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Whether or not to use Java serialization in a topology.
*/
+ @isBoolean
public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
- public static final Object TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION_SCHEMA = Boolean.class;
/**
* Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS.
*/
+ @isStringOrStringList
public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
- public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS.
*/
+ @isStringOrStringList
public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts";
- public static final Object TOPOLOGY_WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* Topology-specific options for the logwriter process of a worker.
*/
+ @isStringOrStringList
public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS="topology.worker.logwriter.childopts";
- public static final Object TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* Topology-specific classpath for the worker child process. This is combined to the usual classpath.
*/
+ @isStringOrStringList
public static final String TOPOLOGY_CLASSPATH="topology.classpath";
- public static final Object TOPOLOGY_CLASSPATH_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* Topology-specific environment variables for the worker child process.
* This is added to the existing environment (that of the supervisor)
*/
- public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
- public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class;
+ @isType(type=Map.class)
+ public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
/*
* Topology-specific option to disable/enable bolt's outgoing overflow buffer.
@@ -1376,122 +1390,119 @@ public class Config extends HashMap<String, Object> {
* The overflow buffer can fill degrading the performance gradually,
* eventually running out of memory.
*/
+ @isBoolean
public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable";
- public static final Object TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE_SCHEMA = Boolean.class;
/**
* This config is available for TransactionalSpouts, and contains the id ( a String) for
* the transactional topology. This id is used to store the state of the transactional
* topology in Zookeeper.
*/
+ @isString
public static final String TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id";
- public static final Object TOPOLOGY_TRANSACTIONAL_ID_SCHEMA = String.class;
/**
* A list of task hooks that are automatically added to every spout and bolt in the topology. An example
* of when you'd do this is to add a hook that integrates with your internal
* monitoring system. These hooks are instantiated using the zero-arg constructor.
*/
+ @isStringList
public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
- public static final Object TOPOLOGY_AUTO_TASK_HOOKS_SCHEMA = ConfigValidation.StringsValidator;
-
/**
* The size of the Disruptor receive queue for each executor. Must be a power of 2.
*/
+ @isPowerOf2
public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
- public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
/**
* The size of the Disruptor send queue for each executor. Must be a power of 2.
*/
+ @isPowerOf2
public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size";
- public static final Object TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
/**
* The size of the Disruptor transfer queue for each worker.
*/
+ @isInteger
public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
- public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
- /**
- * How often a tick tuple from the "__system" component and "__tick" stream should be sent
- * to tasks. Meant to be used as a component-specific configuration.
- */
+ /**
+ * How often a tick tuple from the "__system" component and "__tick" stream should be sent
+ * to tasks. Meant to be used as a component-specific configuration.
+ */
+ @isInteger
public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
- public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
-
- /**
- * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
- * vs. throughput
- */
+ /**
+ * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
+ * vs. throughput
+ */
+ @isString
public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";
- public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
- /**
- * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
- * via the TopologyContext.
- */
+ /**
+ * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
+ * via the TopologyContext.
+ */
+ @isInteger
public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
- public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
* an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
* reported to Zookeeper per task for every 10 second interval of time.
*/
+ @isInteger
public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";
- public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
*/
+ @isInteger
public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
- public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
-
/**
* How often a batch can be emitted in a Trident topology.
*/
+ @isInteger
public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
- public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
*/
+ @isString
public final static String TOPOLOGY_NAME="topology.name";
- public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
/**
* The principal who submitted a topology
*/
+ @isString
public final static String TOPOLOGY_SUBMITTER_PRINCIPAL = "topology.submitter.principal";
- public static final Object TOPOLOGY_SUBMITTER_PRINCIPAL_SCHEMA = String.class;
/**
* The local user name of the user who submitted a topology.
*/
+ @isString
public static final String TOPOLOGY_SUBMITTER_USER = "topology.submitter.user";
- public static final Object TOPOLOGY_SUBMITTER_USER_SCHEMA = String.class;
/**
* Array of components that scheduler should try to place on separate hosts.
*/
+ @isStringList
public static final String TOPOLOGY_SPREAD_COMPONENTS = "topology.spread.components";
- public static final Object TOPOLOGY_SPREAD_COMPONENTS_SCHEMA = ConfigValidation.StringsValidator;
/**
* A list of IAutoCredentials that the topology should load and use.
*/
+ @isStringList
public static final String TOPOLOGY_AUTO_CREDENTIALS = "topology.auto-credentials";
- public static final Object TOPOLOGY_AUTO_CREDENTIALS_SCHEMA = ConfigValidation.StringsValidator;
/**
* Max pending tuples in one ShellBolt
*/
+ @isInteger
public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
- public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Topology central logging sensitivity to determine who has access to logs in central logging system.
@@ -1501,55 +1512,55 @@ public class Config extends HashMap<String, Object> {
* S2 - Confidential
* S3 - Secret (default.)
*/
+ @isString
public static final String TOPOLOGY_LOGGING_SENSITIVITY="topology.logging.sensitivity";
- public static final Object TOPOLOGY_LOGGING_SENSITIVITY_SCHEMA = String.class;
/**
* The root directory in ZooKeeper for metadata about TransactionalSpouts.
*/
+ @isString
public static final String TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root";
- public static final Object TRANSACTIONAL_ZOOKEEPER_ROOT_SCHEMA = String.class;
/**
* The list of zookeeper servers in which to keep the transactional state. If null (which is default),
* will use storm.zookeeper.servers
*/
+ @isStringList
public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS="transactional.zookeeper.servers";
- public static final Object TRANSACTIONAL_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* The port to use to connect to the transactional zookeeper servers. If null (which is default),
* will use storm.zookeeper.port
*/
+ @isInteger
public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
- public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The user as which the nimbus client should be acquired to perform the operation.
*/
+ @isString
public static final String STORM_DO_AS_USER="storm.doAsUser";
- public static final Object STORM_DO_AS_USER_SCHEMA = String.class;
/**
* The number of threads that should be used by the zeromq context in each worker process.
*/
+ @isInteger
public static final String ZMQ_THREADS = "zmq.threads";
- public static final Object ZMQ_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* How long a connection should retry sending messages to a target host when
* the connection is closed. This is an advanced configuration and can almost
* certainly be ignored.
*/
+ @isInteger
public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
- public static final Object ZMQ_LINGER_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion
* on the networking layer.
*/
+ @isInteger
public static final String ZMQ_HWM = "zmq.hwm";
- public static final Object ZMQ_HWM_SCHEMA = ConfigValidation.IntegerValidator;
/**
* This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
@@ -1557,59 +1568,61 @@ public class Config extends HashMap<String, Object> {
* to look for native libraries. It is necessary to set this config correctly since
* Storm uses the ZeroMQ and JZMQ native libs.
*/
+ @isString
public static final String JAVA_LIBRARY_PATH = "java.library.path";
- public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
/**
* The path to use as the zookeeper dir when running a zookeeper server via
* "storm dev-zookeeper". This zookeeper instance is only intended for development;
* it is not a production grade zookeeper setup.
*/
+ @isString
public static final String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
- public static final Object DEV_ZOOKEEPER_PATH_SCHEMA = String.class;
/**
* A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
* to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
*/
+ @isMapEntryType(keyType = String.class, valueType = Number.class)
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
- public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
/**
* A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
*/
+ @isMapEntryType(keyType = String.class, valueType = Number.class)
public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools";
- public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
/**
* The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
* to backtype.storm.scheduler.multitenant.MultitenantScheduler
*/
+ @isNumber
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
- public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
/**
* Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
* vs. CPU usage
*/
+ @isInteger
+ @isPositiveNumber
+ @NotNull
public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
- public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
/**
* Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
* distribution.
*/
+ @isString
public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
- public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = String.class;
/**
* Minimum number of nimbus hosts where the code must be replicated before leader nimbus
* is allowed to perform topology activation tasks like setting up heartbeats/assignments
* and marking the topology as active. default is 0.
*/
+ @isNumber
public static final String TOPOLOGY_MIN_REPLICATION_COUNT = "topology.min.replication.count";
- public static final Object TOPOLOGY_MIN_REPLICATION_COUNT_SCHEMA = Number.class;
/**
* Maximum wait time for the nimbus host replication to achieve the nimbus.min.replication.count.
@@ -1617,14 +1630,14 @@ public class Config extends HashMap<String, Object> {
* if required nimbus.min.replication.count is not achieved. The default is 0 seconds, a value of
* -1 indicates to wait for ever.
*/
+ @isNumber
public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "topology.max.replication.wait.time.sec";
- public static final Object TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC_SCHEMA = Number.class;
/**
* How often nimbus's background thread to sync code for missing topologies should run.
*/
+ @isInteger
public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs";
- public static final Object NIMBUS_CODE_SYNC_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
public static void setClasspath(Map conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
@@ -1714,7 +1727,7 @@ public class Config extends HashMap<String, Object> {
}
public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
- registerMetricsConsumer(this, klass, argument, parallelismHint);
+ registerMetricsConsumer(this, klass, argument, parallelismHint);
}
public static void registerMetricsConsumer(Map conf, Class klass, long parallelismHint) {
@@ -1754,7 +1767,7 @@ public class Config extends HashMap<String, Object> {
}
public void setSkipMissingKryoRegistrations(boolean skip) {
- setSkipMissingKryoRegistrations(this, skip);
+ setSkipMissingKryoRegistrations(this, skip);
}
public static void setMaxTaskParallelism(Map conf, int max) {