You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/15 16:21:00 UTC

[1/4] git commit: STORM-328 More restrictive to Config, strict range check within Utils.getInt

Repository: incubator-storm
Updated Branches:
  refs/heads/master 9e8c769a6 -> e5226dbc5


STORM-328 More restrictive to Config, strict range check within Utils.getInt


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/214ee745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/214ee745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/214ee745

Branch: refs/heads/master
Commit: 214ee7454548b884c591991b1faea770d1478cec
Parents: 18a0721
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jul 12 00:59:01 2014 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jul 12 00:59:01 2014 +0900

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 121 +++++++++----------
 .../jvm/backtype/storm/ConfigValidation.java    |  70 +++++++++++
 .../src/jvm/backtype/storm/utils/Utils.java     |  23 ++--
 .../test/clj/backtype/storm/config_test.clj     |  36 +++++-
 4 files changed, 175 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index eefaee7..ac8b6b6 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm;
 
-import backtype.storm.ConfigValidation;
 import backtype.storm.serialization.IKryoDecorator;
 import backtype.storm.serialization.IKryoFactory;
 import com.esotericsoftware.kryo.Serializer;
@@ -53,49 +52,49 @@ public class Config extends HashMap<String, Object> {
      * Netty based messaging: The buffer size for send/recv buffer
      */
     public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size"; 
-    public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
      */
     public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries"; 
-    public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The min # of milliseconds that a peer will wait. 
      */
     public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms"; 
-    public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The max # of milliseconds that a peer will wait. 
      */
     public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms"; 
-    public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The # of worker threads for the server.
      */
     public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads"; 
-    public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The # of worker threads for the client.
      */
     public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
-    public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
     
     /**
      * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
      */
     public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
-    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
+    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * We check with this interval that whether the Netty channel is writable and try to write pending messages
      */
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
-    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
     
     
     /**
@@ -108,7 +107,7 @@ public class Config extends HashMap<String, Object> {
      * The port Storm will use to connect to each of the ZooKeeper servers.
      */
     public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
-    public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A directory on the local filesystem used by Storm for any local
@@ -177,31 +176,31 @@ public class Config extends HashMap<String, Object> {
      * The session timeout for clients to ZooKeeper.
      */
     public static final String STORM_ZOOKEEPER_SESSION_TIMEOUT = "storm.zookeeper.session.timeout";
-    public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The connection timeout for clients to ZooKeeper.
      */
     public static final String STORM_ZOOKEEPER_CONNECTION_TIMEOUT = "storm.zookeeper.connection.timeout";
-    public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The number of times to retry a Zookeeper operation.
      */
     public static final String STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times";
-    public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The interval between retries of a Zookeeper operation.
      */
     public static final String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
-    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The ceiling of the interval between retries of a Zookeeper operation.
      */
     public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
-    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
@@ -232,13 +231,13 @@ public class Config extends HashMap<String, Object> {
      * connect to this port to upload jars and submit topologies.
      */
     public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
-    public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;
+    public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum buffer size thrift should use when reading messages.
      */
     public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
-    public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = Number.class;
+    public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -254,7 +253,7 @@ public class Config extends HashMap<String, Object> {
      * task dead and reassign it to another location.
      */
     public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
-    public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -264,14 +263,14 @@ public class Config extends HashMap<String, Object> {
      * occuring.
      */
     public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
-    public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often nimbus should wake the cleanup thread to clean the inbox.
      * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
      */
     public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
-    public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
@@ -283,14 +282,14 @@ public class Config extends HashMap<String, Object> {
      * @see NIMBUS_CLEANUP_FREQ_SECS
      */
     public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
-    public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long before a supervisor can go without heartbeating before nimbus considers it dead
      * and stops assigning new work to it.
      */
     public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
-    public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A special timeout used when a task is initially launched. During launch, this is the timeout
@@ -300,7 +299,7 @@ public class Config extends HashMap<String, Object> {
      * to launching new JVM's and configuring them.</p>
      */
     public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
-    public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not nimbus should reassign tasks if it detects that a task goes down. 
@@ -314,7 +313,7 @@ public class Config extends HashMap<String, Object> {
      * before nimbus considers it dead and drops the connection.
      */
     public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
-    public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A custom class that implements ITopologyValidator that is run whenever a
@@ -334,13 +333,13 @@ public class Config extends HashMap<String, Object> {
      * Storm UI binds to this port.
      */
     public static final String UI_PORT = "ui.port";
-    public static final Object UI_PORT_SCHEMA = Number.class;
+    public static final Object UI_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * HTTP UI port for log viewer
      */
     public static final String LOGVIEWER_PORT = "logviewer.port";
-    public static final Object LOGVIEWER_PORT_SCHEMA = Number.class;
+    public static final Object LOGVIEWER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Childopts for log viewer java process.
@@ -370,25 +369,25 @@ public class Config extends HashMap<String, Object> {
      * This port is used by Storm DRPC for receiving DPRC requests from clients.
      */
     public static final String DRPC_PORT = "drpc.port";
-    public static final Object DRPC_PORT_SCHEMA = Number.class;
+    public static final Object DRPC_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * DRPC thrift server worker threads 
      */
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
-    public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
+    public static final Object DRPC_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * DRPC thrift server queue size 
      */
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
-    public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
+    public static final Object DRPC_QUEUE_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back. 
      */
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
-    public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
+    public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also
@@ -396,7 +395,7 @@ public class Config extends HashMap<String, Object> {
      * timeout for the topology implementing the DRPC function.
      */
     public static final String DRPC_REQUEST_TIMEOUT_SECS  = "drpc.request.timeout.secs";
-    public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Childopts for Storm DRPC Java process.
@@ -415,7 +414,7 @@ public class Config extends HashMap<String, Object> {
      * how many workers run on each machine.
      */
     public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
-    public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
+    public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.IntegersValidator;
 
 
     /**
@@ -431,7 +430,7 @@ public class Config extends HashMap<String, Object> {
      * restart the worker process.
      */
     public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
-    public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -441,7 +440,7 @@ public class Config extends HashMap<String, Object> {
      * overhead to starting and configuring the JVM on launch.
      */
     public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
-    public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -457,7 +456,7 @@ public class Config extends HashMap<String, Object> {
      * how often the supervisor sends a heartbeat to the master.
      */
     public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs";
-    public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -465,7 +464,7 @@ public class Config extends HashMap<String, Object> {
      * need to be restarted.
      */
     public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
-    public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
@@ -478,19 +477,19 @@ public class Config extends HashMap<String, Object> {
      * control how many worker receiver threads we need per worker
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
-    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = ConfigValidation.IntegerValidator;
     
     /**
      * How often this worker should heartbeat to the supervisor.
      */
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
-    public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often a task should heartbeat its status to the master.
      */
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
-    public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -501,7 +500,7 @@ public class Config extends HashMap<String, Object> {
      * come through.
      */
     public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
-    public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = Number.class;
+    public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
 
@@ -532,7 +531,7 @@ public class Config extends HashMap<String, Object> {
      * on each component in the topology to tune the performance of a topology.
      */
     public static final String TOPOLOGY_WORKERS = "topology.workers";
-    public static final Object TOPOLOGY_WORKERS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_WORKERS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How many instances to create for a spout/bolt. A task runs on a thread with zero or more
@@ -543,7 +542,7 @@ public class Config extends HashMap<String, Object> {
      * guaranteeing that the same value goes to the same task).
      */
     public static final String TOPOLOGY_TASKS = "topology.tasks";
-    public static final Object TOPOLOGY_TASKS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How many executors to spawn for ackers.
@@ -552,7 +551,7 @@ public class Config extends HashMap<String, Object> {
      * as they come off the spout, effectively disabling reliability.</p>
      */
     public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
-    public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -562,7 +561,7 @@ public class Config extends HashMap<String, Object> {
      * the message at a later time.
      */
     public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
-    public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ),
@@ -620,7 +619,7 @@ public class Config extends HashMap<String, Object> {
      * typically used in testing to limit the number of threads spawned in local mode.
      */
     public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
-    public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -632,7 +631,7 @@ public class Config extends HashMap<String, Object> {
      * their tuples with a message id.
      */
     public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
-    public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
@@ -648,26 +647,26 @@ public class Config extends HashMap<String, Object> {
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
     public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
-    public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum amount of time a component gives a source of state to synchronize before it requests
      * synchronization again.
      */
     public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
-    public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The percentage of tuples to sample to produce stats for a task.
      */
     public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
-    public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = ConfigValidation.DoubleValidator;
 
     /**
      * The time period that builtin metrics data in bucketed into. 
      */
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
-    public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not to use Java serialization in a topology.
@@ -734,14 +733,14 @@ public class Config extends HashMap<String, Object> {
      * The size of the Disruptor transfer queue for each worker.
      */
     public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
-    public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
    /**
     * How often a tick tuple from the "__system" component and "__tick" stream should be sent
     * to tasks. Meant to be used as a component-specific configuration.
     */
     public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
-    public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
    /**
@@ -756,7 +755,7 @@ public class Config extends HashMap<String, Object> {
     * via the TopologyContext.
     */
     public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
-    public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, 
@@ -764,20 +763,20 @@ public class Config extends HashMap<String, Object> {
      * reported to Zookeeper per task for every 10 second interval of time.
      */
     public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";
-    public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
      */
     public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
-    public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
      * How often a batch can be emitted in a Trident topology.
      */
     public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
-    public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Name of the topology. This config is automatically set by Storm when the topology is submitted.
@@ -789,7 +788,7 @@ public class Config extends HashMap<String, Object> {
      * Max pending tuples in one ShellBolt
      */
     public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
-    public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The root directory in ZooKeeper for metadata about TransactionalSpouts.
@@ -809,13 +808,13 @@ public class Config extends HashMap<String, Object> {
      * will use storm.zookeeper.port
      */
     public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
-    public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = Number.class;
+    public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The number of threads that should be used by the zeromq context in each worker process.
      */
     public static final String ZMQ_THREADS = "zmq.threads";
-    public static final Object ZMQ_THREADS_SCHEMA = Number.class;
+    public static final Object ZMQ_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long a connection should retry sending messages to a target host when
@@ -823,14 +822,14 @@ public class Config extends HashMap<String, Object> {
      * certainly be ignored.
      */
     public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
-    public static final Object ZMQ_LINGER_MILLIS_SCHEMA = Number.class;
+    public static final Object ZMQ_LINGER_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion
      * on the networking layer.
      */
     public static final String ZMQ_HWM = "zmq.hwm";
-    public static final Object ZMQ_HWM_SCHEMA = Number.class;
+    public static final Object ZMQ_HWM_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 3accb82..156ccf8 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -82,6 +82,76 @@ public class ConfigValidation {
     public static Object MapsValidator = FieldListValidatorFactory(Map.class);
 
     /**
+     * Validates a Integer.
+     */
+    public static Object IntegerValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException {
+            if (o == null) {
+                // A null value is acceptable.
+                return;
+            }
+            final long i;
+            if (o instanceof Number &&
+                    (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) {
+                if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
+                    return;
+                }
+            }
+
+            throw new IllegalArgumentException("Field " + name + " must be an Integer within type range.");
+        }
+    };
+
+    /**
+     * Validates is a list of Integers.
+     */
+    public static Object IntegersValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object field)
+                throws IllegalArgumentException {
+            if (field == null) {
+                // A null value is acceptable.
+                return;
+            }
+            if (field instanceof Iterable) {
+                for (Object o : (Iterable)field) {
+                    final long i;
+                    if (o instanceof Number &&
+                            ((i = ((Number)o).longValue()) == ((Number)o).doubleValue()) &&
+                            (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE)) {
+                        // pass the test
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Each element of the list " + name + " must be an Integer within type range.");
+                    }
+                }
+                return;
+            }
+        }
+    };
+
+    /**
+     * Validates a Double.
+     */
+    public static Object DoubleValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException {
+            if (o == null) {
+                // A null value is acceptable.
+                return;
+            }
+
+            // we can provide a lenient way to convert int/long to double with losing some precision
+            if (o instanceof Number) {
+                return;
+            }
+
+            throw new IllegalArgumentException("Field " + name + " must be an Double.");
+        }
+    };
+
+    /**
      * Validates a power of 2.
      */
     public static Object PowerOf2Validator = new FieldValidator() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 6a0a447..364b53f 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -305,7 +305,7 @@ public class Utils {
     public static Integer getInt(Object o) {
       Integer result = getInt(o, null);
       if (null == result) {
-        throw new IllegalArgumentException("Don't know how to convert null + to int");
+        throw new IllegalArgumentException("Don't know how to convert null to int");
       }
       return result;
     }
@@ -314,16 +314,19 @@ public class Utils {
       if (null == o) {
         return defaultValue;
       }
-      
-      if(o instanceof Long) {
-          return ((Long) o ).intValue();
-      } else if (o instanceof Integer) {
-          return (Integer) o;
-      } else if (o instanceof Short) {
-          return ((Short) o).intValue();
-      } else {
-          throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
+
+      if (o instanceof Integer ||
+          o instanceof Short ||
+          o instanceof Byte) {
+          return ((Number) o).intValue();
+      } else if (o instanceof Long) {
+          final long l = (Long) o;
+          if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+              return (int) l;
+          }
       }
+
+      throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
     }
 
     public static boolean getBoolean(Object o, boolean defaultValue) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 01f788b..50970b6 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -65,13 +65,41 @@
                 (.validateField validator "test" x)
                 (catch Exception e e)))))))
 
-(deftest test-topology-workers-is-number
+(deftest test-integer-validator
+   (let [validator ConfigValidation/IntegerValidator]
+        (.validateField validator "test" 1000)
+        (is (thrown-cause? java.lang.IllegalArgumentException
+           (.validateField validator "test" 1.34)))
+        (is (thrown-cause? java.lang.IllegalArgumentException
+           (.validateField validator "test" 9223372036854775807)))))
+
+(deftest test-integers-validator
+  (let [validator ConfigValidation/IntegersValidator]
+    (.validateField validator "test" [1000 0 -1000])
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" [0 10 1.34])))
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" [-100 9223372036854775807])))))
+
+(deftest test-double-validator
+  (let [validator ConfigValidation/DoubleValidator]
+    (.validateField validator "test" 10)
+    ;; we can provide lenient way to convert int/long to double with losing precision
+    (.validateField validator "test" 2147483647)
+    (.validateField validator "test" 9223372036854775807)
+    (.validateField validator "test" 1.7976931348623157E308)))
+
+(deftest test-topology-workers-is-integer
   (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKERS)]
     (.validateField validator "test" 42)
-    ;; The float can be rounded down to an int.
-    (.validateField validator "test" 3.14159)
     (is (thrown-cause? java.lang.IllegalArgumentException
-      (.validateField validator "test" "42")))))
+      (.validateField validator "test" 3.14159)))))
+
+(deftest test-topology-stats-sample-rate-is-float
+  (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-STATS-SAMPLE-RATE)]
+    (.validateField validator "test" 0.5)
+    (.validateField validator "test" 10)
+    (.validateField validator "test" 1.7976931348623157E308)))
 
 (deftest test-isolation-scheduler-machines-is-map
   (let [validator (CONFIG-SCHEMA-MAP ISOLATION-SCHEDULER-MACHINES)]


[4/4] git commit: Added STORM-328 to changelog and readme

Posted by bo...@apache.org.
Added STORM-328 to changelog and readme


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e5226dbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e5226dbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e5226dbc

Branch: refs/heads/master
Commit: e5226dbc535d65ff2c3f8d8bca43b36a0a3c5b77
Parents: ad6bbee
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 15 09:16:05 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 15 09:16:05 2014 -0500

----------------------------------------------------------------------
 CHANGELOG.md    | 1 +
 README.markdown | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e5226dbc/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index da2e6fa..6a77bdf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@
  * STORM-200: Proposal for Multilang's Metrics feature
  * STORM-351: multilang python process fall into endless loop
  * STORM-375: Smarter downloading of assignments by supervisors and workers
+ * STORM-328: More restrictive Config checks, strict range check within Utils.getInt()
 
 ## 0.9.2-incubating
  * STORM-66: send taskid on initial handshake

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e5226dbc/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index f026e6d..25cc9ae 100644
--- a/README.markdown
+++ b/README.markdown
@@ -149,6 +149,7 @@ under the License.
 * Parth-Brahmbhatt ([@Parth-Brahmbhatt](https://github.com/Parth-Brahmbhatt))
 * Adrian Petrescu ([@apetresc](https://github.com/apetresc))
 * DashengJu ([@dashengju](https://github.com/dashengju))
+* Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
 
 ## Acknowledgements
 


[2/4] git commit: Remove magic numbers to be easily understood

Posted by bo...@apache.org.
Remove magic numbers to be easily understood


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/cc5f1f63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/cc5f1f63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/cc5f1f63

Branch: refs/heads/master
Commit: cc5f1f632a0d4b81ebbf57668ce025a7780b7a0f
Parents: 214ee74
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jul 12 09:23:44 2014 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jul 12 09:23:44 2014 +0900

----------------------------------------------------------------------
 .../test/clj/backtype/storm/config_test.clj     | 27 ++++++++++++--------
 1 file changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/cc5f1f63/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 50970b6..3fdbf5a 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -66,28 +66,33 @@
                 (catch Exception e e)))))))
 
 (deftest test-integer-validator
-   (let [validator ConfigValidation/IntegerValidator]
-        (.validateField validator "test" 1000)
-        (is (thrown-cause? java.lang.IllegalArgumentException
-           (.validateField validator "test" 1.34)))
-        (is (thrown-cause? java.lang.IllegalArgumentException
-           (.validateField validator "test" 9223372036854775807)))))
+  (let [validator ConfigValidation/IntegerValidator]
+    (.validateField validator "test" nil)
+    (.validateField validator "test" 1000)
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" 1.34)))
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" (inc Integer/MAX_VALUE))))))
 
 (deftest test-integers-validator
   (let [validator ConfigValidation/IntegersValidator]
+    (.validateField validator "test" nil)
     (.validateField validator "test" [1000 0 -1000])
     (is (thrown-cause? java.lang.IllegalArgumentException
           (.validateField validator "test" [0 10 1.34])))
     (is (thrown-cause? java.lang.IllegalArgumentException
-          (.validateField validator "test" [-100 9223372036854775807])))))
+          (.validateField validator "test" [0 nil])))
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" [-100 (inc Integer/MAX_VALUE)])))))
 
 (deftest test-double-validator
   (let [validator ConfigValidation/DoubleValidator]
+    (.validateField validator "test" nil)
     (.validateField validator "test" 10)
     ;; we can provide lenient way to convert int/long to double with losing precision
-    (.validateField validator "test" 2147483647)
-    (.validateField validator "test" 9223372036854775807)
-    (.validateField validator "test" 1.7976931348623157E308)))
+    (.validateField validator "test" Integer/MAX_VALUE)
+    (.validateField validator "test" (inc Integer/MAX_VALUE))
+    (.validateField validator "test" Double/MAX_VALUE)))
 
 (deftest test-topology-workers-is-integer
   (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKERS)]
@@ -99,7 +104,7 @@
   (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-STATS-SAMPLE-RATE)]
     (.validateField validator "test" 0.5)
     (.validateField validator "test" 10)
-    (.validateField validator "test" 1.7976931348623157E308)))
+    (.validateField validator "test" Double/MAX_VALUE)))
 
 (deftest test-isolation-scheduler-machines-is-map
   (let [validator (CONFIG-SCHEMA-MAP ISOLATION-SCHEDULER-MACHINES)]


[3/4] git commit: Merge branch 'STORM-328' of https://github.com/HeartSaVioR/incubator-storm into STORM-328

Posted by bo...@apache.org.
Merge branch 'STORM-328' of https://github.com/HeartSaVioR/incubator-storm into STORM-328

STORM-328: More restrictive Config checks, strict range check within Utils.getInt()


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ad6bbee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ad6bbee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ad6bbee9

Branch: refs/heads/master
Commit: ad6bbee99c5d37d4e6e3dd3a119b14c7031179da
Parents: 9e8c769 cc5f1f6
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 15 09:12:43 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 15 09:12:43 2014 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 121 +++++++++----------
 .../jvm/backtype/storm/ConfigValidation.java    |  70 +++++++++++
 .../src/jvm/backtype/storm/utils/Utils.java     |  23 ++--
 .../test/clj/backtype/storm/config_test.clj     |  41 ++++++-
 4 files changed, 180 insertions(+), 75 deletions(-)
----------------------------------------------------------------------