You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 22:58:49 UTC

[50/50] git commit: Merge branch 'master' into security-upmerge

Merge branch 'master' into security-upmerge

Conflicts:
	bin/storm
	conf/defaults.yaml
	storm-core/src/clj/backtype/storm/daemon/drpc.clj
	storm-core/src/clj/backtype/storm/daemon/logviewer.clj
	storm-core/src/clj/backtype/storm/daemon/nimbus.clj
	storm-core/src/clj/backtype/storm/daemon/supervisor.clj
	storm-core/src/clj/backtype/storm/daemon/worker.clj
	storm-core/src/clj/backtype/storm/testing.clj
	storm-core/src/clj/backtype/storm/ui/helpers.clj
	storm-core/src/clj/backtype/storm/util.clj
	storm-core/src/jvm/backtype/storm/Config.java
	storm-core/src/jvm/backtype/storm/utils/Utils.java
	storm-core/src/ui/public/component.html
	storm-core/test/clj/backtype/storm/supervisor_test.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/559c883d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/559c883d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/559c883d

Branch: refs/heads/security
Commit: 559c883d5331362808b7e1ada647cbac76a88ab3
Parents: ff8336b b2a8a77
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 29 15:57:03 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 29 15:57:03 2014 -0500

----------------------------------------------------------------------
 BYLAWS.md                                       |  96 ++++
 CHANGELOG.md                                    |  31 +
 LICENSE                                         |  30 +-
 README.markdown                                 |  14 +
 STORM-UI-REST-API.md                            | 567 +++++++++++++++++++
 bin/storm                                       |  77 ++-
 conf/defaults.yaml                              |   5 +-
 conf/storm_env.ini                              |   2 +-
 dev-tools/github/__init__.py                    | 109 ++++
 dev-tools/jira-github-join.py                   |  80 +++
 dev-tools/jira/__init__.py                      | 232 ++++++++
 examples/storm-starter/README.markdown          |  30 +-
 .../storm-starter/multilang/resources/storm.py  |   2 +-
 .../src/jvm/storm/starter/RollingTopWords.java  |  62 +-
 .../src/jvm/storm/starter/util/StormRunner.java |   9 +
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |   9 +-
 .../src/jvm/storm/kafka/KafkaConfig.java        |   2 +-
 .../src/jvm/storm/kafka/Partition.java          |   9 +-
 pom.xml                                         |   6 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |   7 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  31 +-
 .../src/clj/backtype/storm/command/monitor.clj  |  37 ++
 .../src/clj/backtype/storm/daemon/common.clj    |   2 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |   6 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   4 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   7 +-
 .../clj/backtype/storm/daemon/supervisor.clj    | 104 +++-
 .../src/clj/backtype/storm/daemon/worker.clj    |  72 +--
 storm-core/src/clj/backtype/storm/disruptor.clj |   2 +-
 storm-core/src/clj/backtype/storm/event.clj     |   2 +-
 storm-core/src/clj/backtype/storm/testing.clj   |  49 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   2 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   5 -
 storm-core/src/clj/backtype/storm/util.clj      |  52 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |  25 +
 .../src/dev/resources/tester_bolt_metrics.py    |  35 ++
 .../src/dev/resources/tester_spout_metrics.py   |  51 ++
 storm-core/src/jvm/backtype/storm/Config.java   | 171 +++---
 .../jvm/backtype/storm/ConfigValidation.java    |  70 +++
 .../backtype/storm/messaging/netty/Client.java  |  13 +-
 .../metric/api/rpc/AssignableShellMetric.java   |  30 +
 .../metric/api/rpc/CombinedShellMetric.java     |  31 +
 .../storm/metric/api/rpc/CountShellMetric.java  |  38 ++
 .../storm/metric/api/rpc/IShellMetric.java      |  31 +
 .../metric/api/rpc/ReducedShellMetric.java      |  32 ++
 .../storm/multilang/JsonSerializer.java         |  15 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  46 ++
 .../src/jvm/backtype/storm/spout/ISpout.java    |   2 +-
 .../jvm/backtype/storm/spout/ShellSpout.java    |  72 ++-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  75 ++-
 .../backtype/storm/task/TopologyContext.java    |  28 +
 .../storm/testing/PythonShellMetricsBolt.java   |  32 ++
 .../storm/testing/PythonShellMetricsSpout.java  |  35 ++
 .../src/jvm/backtype/storm/utils/Monitor.java   | 249 ++++++++
 .../jvm/backtype/storm/utils/ShellProcess.java  |  46 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  23 +-
 storm-core/src/multilang/py/storm.py            |  30 +-
 storm-core/src/multilang/rb/storm.rb            |  24 +-
 storm-core/src/ui/public/component.html         |   3 +-
 .../src/ui/public/js/jquery.tablesorter.min.js  |   9 +-
 storm-core/src/ui/public/js/moment.min.js       |   6 +
 storm-core/src/ui/public/js/script.js           |   9 +
 .../test/clj/backtype/storm/cluster_test.clj    |   3 +-
 .../test/clj/backtype/storm/config_test.clj     |  41 +-
 .../test/clj/backtype/storm/metrics_test.clj    | 206 ++++---
 .../test/clj/backtype/storm/supervisor_test.clj | 135 ++++-
 66 files changed, 3003 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/bin/storm
----------------------------------------------------------------------
diff --cc bin/storm
index 550e01d,a4aadb1..1b5be36
--- a/bin/storm
+++ b/bin/storm
@@@ -449,8 -468,7 +480,8 @@@ COMMANDS = {"jar": jar, "kill": kill, "
              "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
              "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
              "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
-             "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, 
 -            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}
++            "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor,
 +            "upload-credentials": upload_credentials}
  
  def parse_config(config_list):
      global CONFIG_OPTS

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index d4283a4,2864adc..05948e1
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -188,6 -150,7 +189,8 @@@ topology.max.error.report.per.interval
  topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
  topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer"
  topology.trident.batch.emit.interval.millis: 500
 +topology.testing.always.try.serialize: false
+ topology.classpath: null
+ topology.environment: null
  
  dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
index f916ec6,f3017ce..eb25a86
--- a/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
+++ b/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
@@@ -19,6 -19,9 +19,10 @@@ package storm.starter.util
  
  import backtype.storm.Config;
  import backtype.storm.LocalCluster;
+ import backtype.storm.StormSubmitter;
+ import backtype.storm.generated.AlreadyAliveException;
++import backtype.storm.generated.AuthorizationException;
+ import backtype.storm.generated.InvalidTopologyException;
  import backtype.storm.generated.StormTopology;
  
  public final class StormRunner {
@@@ -36,4 -39,9 +40,9 @@@
      cluster.killTopology(topologyName);
      cluster.shutdown();
    }
+ 
+   public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf)
 -      throws AlreadyAliveException, InvalidTopologyException {
++      throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+     StormSubmitter.submitTopology(topologyName, conf, topology);
+   }
  }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 2c6f6f3,8ff5a2c..8ead710
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -25,15 -23,16 +25,17 @@@
    (:require [backtype.storm.daemon [common :as common]]))
  
  (defprotocol ClusterState
 -  (set-ephemeral-node [this path data])
 +  (set-ephemeral-node [this path data acls])
    (delete-node [this path])
 -  (create-sequential [this path data])
 +  (create-sequential [this path data acls])
    ;; if node does not exist, create persistent with this data
 -  (set-data [this path data])
 +  (set-data [this path data acls])
    (get-data [this path watch?])
+   (get-version [this path watch?])
+   (get-data-with-version [this path watch?])
    (get-children [this path watch?])
 -  (mkdirs [this path])
 +  (mkdirs [this path acls])
 +  (exists-node? [this path watch?])
    (close [this])
    (register [this callback])
    (unregister [this id]))
@@@ -240,31 -231,31 +252,33 @@@
           (into {}))))
  
  ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
 -(defn mk-storm-cluster-state
 -  [cluster-state-spec]
 +(defnk mk-storm-cluster-state
 +  [cluster-state-spec :acls nil]
    (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                  [false cluster-state-spec]
 -                                [true (mk-distributed-cluster-state cluster-state-spec)])
 +                                [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls)])
          assignment-info-callback (atom {})
+         assignment-info-with-version-callback (atom {})
+         assignment-version-callback (atom {})
          supervisors-callback (atom nil)
          assignments-callback (atom nil)
          storm-base-callback (atom {})
 +        credentials-callback (atom {})
          state-id (register
 -                   cluster-state
 -                   (fn [type path]
 -                     (let [[subtree & args] (tokenize-path path)]
 -                       (condp = subtree
 +                  cluster-state
 +                  (fn [type path]
 +                    (let [[subtree & args] (tokenize-path path)]
 +                      (condp = subtree
                           ASSIGNMENTS-ROOT (if (empty? args)
 -                                            (issue-callback! assignments-callback)
 -                                            (issue-map-callback! assignment-info-callback (first args)))
 +                                             (issue-callback! assignments-callback)
 +                                             (issue-map-callback! assignment-info-callback (first args)))
                           SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                           STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
 +                         CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
                           ;; this should never happen
-                          (halt-process! 30 "Unknown callback for subtree " subtree args)))))]
+                          (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
      (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
 -      (mkdirs cluster-state p))
 +      (mkdirs cluster-state p acls))
      (reify
        StormClusterState
  

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/drpc.clj
index 1340e49,3527b7c..68128c3
--- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj
@@@ -199,48 -133,37 +199,48 @@@
      (let [conf (read-storm-config)
            worker-threads (int (conf DRPC-WORKER-THREADS))
            queue-size (int (conf DRPC-QUEUE-SIZE))
 -          service-handler (service-handler)
 -
 -          ;; Requests and returns need to be on separate thread pools, since
 -          ;; calls to "execute" don't unblock until other thrift methods are
 -          ;; called. So if 64 threads are calling execute, the server won't
 -          ;; accept the result invocations that will unblock those threads.
 -
 -          handler-server
 -          (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
 -                            (THsHaServer$Args.)
 -                            (.workerThreads 64)
 -                            (.executorService
 -                              (ThreadPoolExecutor.
 -                                worker-threads worker-threads 60 TimeUnit/SECONDS
 -                                (ArrayBlockingQueue. queue-size)))
 -                            (.protocolFactory (TBinaryProtocol$Factory.))
 -                            (.processor (DistributedRPC$Processor. service-handler))))
 -
 -          invoke-server
 -          (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT)))
 -                            (THsHaServer$Args.)
 -                            (.workerThreads 64)
 -                            (.protocolFactory (TBinaryProtocol$Factory.))
 -                            (.processor
 -                              (DistributedRPCInvocations$Processor. service-handler))))]
 +          drpc-http-port (int (conf DRPC-HTTP-PORT))
 +          drpc-port (int (conf DRPC-PORT))
 +          drpc-service-handler (service-handler conf)
 +          ;; requests and returns need to be on separate thread pools, since calls to
 +          ;; "execute" don't unblock until other thrift methods are called. So if 
 +          ;; 64 threads are calling execute, the server won't accept the result
 +          ;; invocations that will unblock those threads
 +          handler-server (when (> drpc-port 0)
 +                           (ThriftServer. conf
 +                             (DistributedRPC$Processor. drpc-service-handler)
 +                             ThriftConnectionType/DRPC))
 +          invoke-server (ThriftServer. conf
 +                          (DistributedRPCInvocations$Processor. drpc-service-handler)
 +                          ThriftConnectionType/DRPC_INVOCATIONS)
 +          http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)] 
-       (.addShutdownHook (Runtime/getRuntime) (Thread. (fn []
-                                                         (if handler-server (.stop handler-server))
-                                                         (.stop invoke-server))))
+       (add-shutdown-hook-with-force-kill-in-1-sec (fn []
 -                                                    (.stop handler-server)
++                                                    (if handler-server (.stop handler-server))
+                                                     (.stop invoke-server)))
        (log-message "Starting Distributed RPC servers...")
        (future (.serve invoke-server))
 -      (.serve handler-server))))
 +      (when (> drpc-http-port 0)
 +        (let [app (webapp drpc-service-handler http-creds-handler)
 +              filter-class (conf DRPC-HTTP-FILTER)
 +              filter-params (conf DRPC-HTTP-FILTER-PARAMS)
 +              filters-confs [{:filter-class filter-class
 +                              :filter-params filter-params}]
 +              https-port (int (conf DRPC-HTTPS-PORT))
 +              https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH)
 +              https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD)
 +              https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE)]
 +
 +          (run-jetty app
 +            {:port drpc-http-port :join? false
 +             :configurator (fn [server]
 +                             (config-ssl server
 +                                         https-port 
 +                                         https-ks-path 
 +                                         https-ks-password
 +                                         https-ks-type)
 +                             (config-filter server app filters-confs))})))
 +      (when handler-server
 +        (.serve handler-server)))))
  
  (defn -main []
    (launch-server!))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e2952d0,6265479..44ccc6c
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -81,11 -73,9 +81,11 @@@
       :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
       :timer (mk-timer :kill-fn (fn [t]
                                   (log-error t "Error when processing event")
-                                  (halt-process! 20 "Error when processing an event")
+                                  (exit-process! 20 "Error when processing an event")
                                   ))
       :scheduler (mk-scheduler conf inimbus)
 +     :id->sched-status (atom {})
 +     :cred-renewers (AuthUtils/GetCredentialRenewers conf)
       }))
  
  (defn inbox [nimbus]
@@@ -1304,13 -1148,18 +1304,14 @@@
  (defn launch-server! [conf nimbus]
    (validate-distributed-mode! conf)
    (let [service-handler (service-handler conf nimbus)
-         ;;TODO need to honor NIMBUS-THRIFT-MAX-BUFFER-SIZE for different transports
 -        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
 -                    (THsHaServer$Args.)
 -                    (.workerThreads 64)
 -                    (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
 -                    (.processor (Nimbus$Processor. service-handler))
 -                    )
 -       server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
 +        server (ThriftServer. conf (Nimbus$Processor. service-handler) 
 +                              ThriftConnectionType/NIMBUS)]
-     (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
+     (add-shutdown-hook-with-force-kill-in-1-sec (fn []
+                                                   (.shutdown service-handler)
+                                                   (.stop server)))
      (log-message "Starting Nimbus server...")
 -    (.serve server)))
 +    (.serve server)
 +    service-handler))
  
  ;; distributed implementation
  

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 2fe7fb8,cfa8f85..d8ff6b5
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -14,8 -14,8 +14,9 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns backtype.storm.daemon.supervisor
 +  (:import [java.io OutputStreamWriter BufferedWriter IOException])
-   (:import [backtype.storm.scheduler ISupervisor])
+   (:import [backtype.storm.scheduler ISupervisor]
+            [java.net JarURLConnection])
    (:use [backtype.storm bootstrap])
    (:use [backtype.storm.daemon common])
    (:require [backtype.storm.daemon [worker :as worker]])
@@@ -235,22 -190,16 +253,26 @@@
      (when thread-pid
        (psim/kill-process thread-pid))
      (doseq [pid pids]
 -      (kill-process-with-sig-term pid))
 +      (if as-user
++        (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -15 " pid))
++        (kill-process-with-sig-term pid)))
+     (if-not (empty? pids) (sleep-secs 1)) ;; allow 1 second for execution of cleanup threads on worker.
+     (doseq [pid pids]
 -      (force-kill-process pid)
 -      (try
 -        (rmpath (worker-pid-path conf id pid))
 -        (catch Exception e))) ;; on windows, the supervisor may still holds the lock on the worker directory
 -    (try-cleanup-worker conf id))
++      (if as-user
 +        (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
-         (ensure-process-killed! pid))
++        (force-kill-process pid))
 +      (if as-user
-         (rmr-as-user conf id user (worker-pid-path conf id pid))
++        (rmr-as-user conf id user (worker-pid-path conf id pid)) 
 +        (try
 +          (rmpath (worker-pid-path conf id pid))
-           (catch Exception e)) ;; on windows, the supervisor may still holds the lock on the worker directory
-       ))
++          (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
 +    (try-cleanup-worker conf id user))
    (log-message "Shut down " (:supervisor-id supervisor) ":" id))
  
 +(def SUPERVISOR-ZK-ACLS
 +  [(first ZooDefs$Ids/CREATOR_ALL_ACL) 
 +   (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
 +
  (defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
    {:conf conf
     :shared-context shared-context
@@@ -582,14 -495,20 +612,23 @@@
            jlp (jlp stormroot conf)
            stormjar (supervisor-stormjar-path stormroot)
            storm-conf (read-supervisor-storm-conf conf storm-id)
-           classpath (add-to-classpath (current-classpath) [stormjar])
+           topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
+                            [cp]
+                            [])
+           classpath (-> (current-classpath)
+                         (add-to-classpath [stormjar])
+                         (add-to-classpath topo-classpath))
 +          top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
 +          gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port)
 +          user (storm-conf TOPOLOGY-SUBMITTER-USER)
 +          logfilename (logs-filename storm-id port)
- 
-           worker-childopts (substitute-childopts (conf WORKER-CHILDOPTS) worker-id storm-id port)
-           topo-worker-childopts (substitute-childopts (storm-conf TOPOLOGY-WORKER-CHILDOPTS) worker-id storm-id port)
+           worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
 -                             (substitute-worker-childopts s port))
++                             (substitute-childopts s worker-id storm-id port))
+           topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
 -                                  (substitute-worker-childopts s port))
++                                  (substitute-childopts s worker-id storm-id port))
+           topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)]
+                                         (merge env {"LD_LIBRARY_PATH" jlp})
+                                         {"LD_LIBRARY_PATH" jlp})
 -          logfilename (str "worker-" port ".log")
            command (concat
                      [(java-cmd) "-server"]
                      worker-childopts
@@@ -608,21 -526,13 +647,20 @@@
                       (:assignment-id supervisor)
                       port
                       worker-id])
 -          command (->> command (map str) (filter (complement empty?)))
 -          shell-cmd (->> command
 -                         (map #(str \' (clojure.string/escape % {\' "\\'"}) \'))
 -                         (clojure.string/join " "))]
 -      (log-message "Launching worker with command: " shell-cmd)
 -      (launch-process command :environment topology-worker-environment)
 -      ))
 +          command (->> command (map str) (filter (complement empty?)))]
- 
 +      (log-message "Launching worker with command: " (shell-cmd command))
 +      (write-log-metadata! storm-conf user worker-id storm-id port conf)
 +      (set-worker-user! conf worker-id user)
 +      (let [log-prefix (str "Worker Process " worker-id)
 +           callback (fn [exit-code] 
 +                          (log-message log-prefix " exited with code: " exit-code)
 +                          (add-dead-worker worker-id))]
 +        (remove-dead-worker worker-id) 
 +        (if run-worker-as-user
 +          (let [worker-dir (worker-root conf worker-id)]
-             (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment {"LD_LIBRARY_PATH" jlp})] :log-prefix log-prefix :exit-code-callback callback))
-           (launch-process command :environment {"LD_LIBRARY_PATH" jlp} :log-prefix log-prefix :exit-code-callback callback)
++            (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback))
++          (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback)
 +      ))))
  
  ;; local implementation
  

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index d09ea35,aeabdf6..f1acdec
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -190,17 -175,22 +191,19 @@@
                         )
              :timer-name timer-name))
  
- (defn recursive-map-worker-data [conf mq-context storm-id assignment-id port
-                                   storm-conf
-                                   worker-id 
-                                   cluster-state 
-                                   storm-cluster-state
-                                   executors 
-                                   transfer-queue
-                                   executor-receive-queue-map 
-                                   receive-queue-map
-                                   topology]
-   (recursive-map
 -(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
++(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
+   (let [assignment-versions (atom {})
 -        cluster-state (cluster/mk-distributed-cluster-state conf)
 -        storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
 -        storm-conf (read-supervisor-storm-conf conf storm-id)
+         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
+         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
+         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+         
+         receive-queue-map (->> executor-receive-queue-map
+                                (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
+                                (into {}))
+ 
+         topology (read-supervisor-topology conf storm-id)]
+     (recursive-map
        :conf conf
        :mq-context (if mq-context
                        mq-context

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/testing.clj
index 39f3759,54f40e0..f603086
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@@ -208,8 -208,8 +213,8 @@@
                    supervisors
                    ; because a worker may already be dead
                    workers)]
-     (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
+     (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
 -                   (Thread/sleep 10)
 +                   (Thread/sleep (rand-int 20))
                     ;;      (doseq [d daemons]
                     ;;        (if-not ((memfn waiting?) d)
                     ;;          (println d)))
@@@ -481,10 -483,7 +490,10 @@@
      (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
  
      (let [storm-id (common/get-storm-id state storm-name)]
 +      ;;Give the topology time to come up without using it to wait for the spouts to complete
 +      (simulate-wait cluster-map)
 +
-       (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
+       (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
                       (simulate-wait cluster-map))
  
        (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
@@@ -583,22 -582,25 +592,24 @@@
  (defn tracked-wait
    "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
    ([tracked-topology]
-    (tracked-wait tracked-topology 1))
+      (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
    ([tracked-topology amt]
+      (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+   ([tracked-topology amt timeout-ms]
 -      (let [target (+ amt @(:last-spout-emit tracked-topology))
 -            track-id (-> tracked-topology :cluster ::track-id)
 -            waiting? (fn []
 -                       (or (not= target (global-amt track-id "spout-emitted"))
 -                           (not= (global-amt track-id "transferred")                                 
 -                                 (global-amt track-id "processed"))
 -                           ))]
 -        (while-timeout TEST-TIMEOUT-MS (waiting?)
 -                       ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
 -                       ;; (println "Processed: " (global-amt track-id "processed"))
 -                       ;; (println "Transferred: " (global-amt track-id "transferred"))
 -                       (Thread/sleep 500))
 -        (reset! (:last-spout-emit tracked-topology) target))))
 -
 -(defnk test-tuple 
 +    (let [target (+ amt @(:last-spout-emit tracked-topology))
 +          track-id (-> tracked-topology :cluster ::track-id)
 +          waiting? (fn []
 +                     (or (not= target (global-amt track-id "spout-emitted"))
 +                         (not= (global-amt track-id "transferred")                                 
 +                               (global-amt track-id "processed"))))]
-       (while-timeout TEST-TIMEOUT-MS (waiting?)
++      (while-timeout timeout-ms (waiting?)
 +                     ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
 +                     ;; (println "Processed: " (global-amt track-id "processed"))
 +                     ;; (println "Transferred: " (global-amt track-id "transferred"))
 +                    (Thread/sleep (rand-int 200)))
 +      (reset! (:last-spout-emit tracked-topology) target))))
 +
 +(defnk test-tuple
    [values
     :stream Utils/DEFAULT_STREAM_ID
     :component "component"

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 3c1be09,13e4d41..ec9759c
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -728,10 -688,10 +728,10 @@@
                      reverse)]
      {"componentErrors"
       (for [^ErrorInfo e errors]
-        {"time" (date-str (.get_error_time_secs e))
+        {"time" (* 1000 (long (.get_error_time_secs e)))
          "errorHost" (.get_host e)
          "errorPort"  (.get_port e)
 -        "errorWorkerLogLink"  (worker-log-link (.get_host e) (.get_port e))
 +        "errorWorkerLogLink"  (worker-log-link (.get_host e) (.get_port e) topology-id)
          "error" (.get_error e)})}))
  
  (defn spout-stats

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/util.clj
index ed2e30b,3df25b7..0173081
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@@ -398,25 -411,30 +414,35 @@@
      (catch ExecuteException e
        (log-message "Error when trying to kill " pid ". Process is probably already dead."))))
  
 +(defn read-and-log-stream
 +  [prefix stream]
 +  (try
 +    (let [reader (BufferedReader. (InputStreamReader. stream))]
 +      (loop []
 +        (if-let [line (.readLine reader)]
 +                (do
 +                  (log-warn (str prefix ":" line))
 +                  (recur)))))
 +    (catch IOException e
 +      (log-warn "Error while trying to log stream" e))))
 +
- (defn sleep-secs [secs]
-   (when (pos? secs)
-     (Time/sleep (* (long secs) 1000))))
+ (defn force-kill-process
+   [pid]
+   (send-signal-to-process pid sig-kill))
  
- (defn sleep-until-secs [target-secs]
-   (Time/sleepUntil (* (long target-secs) 1000)))
+ (defn kill-process-with-sig-term
+   [pid]
+   (send-signal-to-process pid sig-term))
+ 
+ (defn add-shutdown-hook-with-force-kill-in-1-sec
+   "adds the user supplied function as a shutdown hook for cleanup.
+    Also adds a function that sleeps for a second and then sends kill -9 to process to avoid any zombie process in case
+    cleanup function hangs."
+   [func]
+   (.addShutdownHook (Runtime/getRuntime) (Thread. #(func)))
+   (.addShutdownHook (Runtime/getRuntime) (Thread. #((sleep-secs 1)
+                                                     (.halt (Runtime/getRuntime) 20)))))
  
 -(defnk launch-process [command :environment {}]
 -  (let [builder (ProcessBuilder. command)
 -        process-env (.environment builder)]
 -    (doseq [[k v] environment]
 -      (.put process-env k v))
 -    (.start builder)))
 -
  (defprotocol SmartThread
    (start [this])
    (join [this])

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index ea54313,ac8b6b6..d6b45ea
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -17,10 -17,8 +17,9 @@@
   */
  package backtype.storm;
  
- import backtype.storm.ConfigValidation;
  import backtype.storm.serialization.IKryoDecorator;
  import backtype.storm.serialization.IKryoFactory;
 +import backtype.storm.utils.Utils;
  import com.esotericsoftware.kryo.Serializer;
  import java.util.ArrayList;
  import java.util.HashMap;
@@@ -66,20 -57,20 +65,20 @@@ public class Config extends HashMap<Str
      /**
       * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
       */
-     public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
-     public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
+     public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries"; 
+     public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * Netty based messaging: The min # of milliseconds that a peer will wait. 
 +     * Netty based messaging: The min # of milliseconds that a peer will wait.
       */
-     public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
-     public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
+     public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms"; 
+     public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * Netty based messaging: The max # of milliseconds that a peer will wait. 
 +     * Netty based messaging: The max # of milliseconds that a peer will wait.
       */
-     public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
-     public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
+     public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms"; 
+     public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
       * Netty based messaging: The # of worker threads for the server.
@@@ -103,9 -94,9 +102,8 @@@
       * We check with this interval that whether the Netty channel is writable and try to write pending messages
       */
      public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
-     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
- 
- 
+     public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
+     
 -    
      /**
       * A list of hosts of ZooKeeper servers used to manage the cluster.
       */
@@@ -234,10 -200,10 +232,10 @@@
       * The ceiling of the interval between retries of a Zookeeper operation.
       */
      public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
-     public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
+     public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
 +     * The cluster Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
       */
      public static final String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
      public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = String.class;
@@@ -307,35 -231,15 +305,35 @@@
       * connect to this port to upload jars and submit topologies.
       */
      public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
-     public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;
+     public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 +     * The number of threads that should be used by the nimbus thrift server.
 +     */
 +    public static final String NIMBUS_THRIFT_THREADS = "nimbus.thrift.threads";
 +    public static final Object NIMBUS_THRIFT_THREADS_SCHEMA = Number.class;
 +
 +    /**
 +     * A list of users that are cluster admins and can run any command.  To use this set
 +     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
 +     */
 +    public static final String NIMBUS_ADMINS = "nimbus.admins";
 +    public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
 +
 +    /**
 +     * A list of users that run the supervisors and should be authorized to interact with
 +     * nimbus as a supervisor would.  To use this set
 +     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
 +     */
 +    public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
 +    public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
 +
 +    /**
       * The maximum buffer size thrift should use when reading messages.
       */
      public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
-     public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = Number.class;
+     public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
  
 -
      /**
       * This parameter is used by the storm-deploy project to configure the
       * jvm options for the nimbus daemon.
@@@ -395,10 -299,10 +393,10 @@@
       * to launching new JVM's and configuring them.</p>
       */
      public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
-     public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
+     public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * Whether or not nimbus should reassign tasks if it detects that a task goes down. 
 +     * Whether or not nimbus should reassign tasks if it detects that a task goes down.
       * Defaults to true, and it's not recommended to change this value.
       */
      public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@@ -565,77 -369,27 +563,77 @@@
       * This port is used by Storm DRPC for receiving DPRC requests from clients.
       */
      public static final String DRPC_PORT = "drpc.port";
-     public static final Object DRPC_PORT_SCHEMA = Number.class;
+     public static final Object DRPC_PORT_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * DRPC thrift server worker threads 
 +     * Class name for authorization plugin for DRPC client
 +     */
 +    public static final String DRPC_AUTHORIZER = "drpc.authorizer";
 +    public static final Object DRPC_AUTHORIZER_SCHEMA = String.class;
 +
 +    /**
 +     * The Access Control List for the DRPC Authorizer.
 +     * @see DRPCSimpleAclAuthorizer
 +     */
 +    public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
 +    public static final Object DRPC_AUTHORIZER_ACL_SCHEMA = Map.class;
 +
 +    /**
 +     * File name of the DRPC Authorizer ACL.
 +     * @see DRPCSimpleAclAuthorizer
 +     */
 +    public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename";
 +    public static final Object DRPC_AUTHORIZER_ACL_FILENAME_SCHEMA = String.class;
 +
 +    /**
 +     * Whether the DRPCSimpleAclAuthorizer should deny requests for operations
 +     * involving functions that have no explicit ACL entry. When set to false
 +     * (the default) DRPC functions that have no entry in the ACL will be
 +     * permitted, which is appropriate for a development environment. When set
 +     * to true, explicit ACL entries are required for every DRPC function, and
 +     * any request for functions will be denied.
 +     * @see DRPCSimpleAclAuthorizer
 +     */
 +    public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict";
 +    public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
 +
 +    /**
 +     * DRPC thrift server worker threads
       */
      public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
-     public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
+     public static final Object DRPC_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * DRPC thrift server queue size 
 +     * The maximum buffer size thrift should use when reading messages for DRPC.
 +     */
 +    public static final String DRPC_MAX_BUFFER_SIZE = "drpc.max_buffer_size";
 +    public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
 +
 +    /**
 +     * DRPC thrift server queue size
       */
      public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
-     public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
+     public static final Object DRPC_QUEUE_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back. 
 +     * The DRPC invocations transport plug-in for Thrift client/server communication
 +     */
 +    public static final String DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN = "drpc.invocations.thrift.transport";
 +    public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 +
 +    /**
 +     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
       */
      public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
-     public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
+     public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 +     * DRPC invocations thrift server worker threads
 +     */
 +    public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
 +    public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
 +
 +    /**
       * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also
       * timeout based on the socket timeout on the DRPC client, and separately based on the topology message
       * timeout for the topology implementing the DRPC function.
@@@ -672,32 -414,8 +670,32 @@@
       * how many workers run on each machine.
       */
      public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
-     public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
+     public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.IntegersValidator;
  
 +    /**
 +     * A number representing the maximum number of workers any single topology can acquire.
 +     */
 +    public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
 +    public static final Object NIMBUS_SLOTS_PER_TOPOLOGY_SCHEMA = Number.class;
 +
 +    /**
 +     * A class implementing javax.servlet.Filter for DRPC HTTP requests
 +     */
 +    public static final String DRPC_HTTP_FILTER = "drpc.http.filter";
 +    public static final Object DRPC_HTTP_FILTER_SCHEMA = String.class;
 +
 +    /**
 +     * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
 +     * service
 +     */
 +    public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
 +    public static final Object DRPC_HTTP_FILTER_PARAMS_SCHEMA = Map.class;
 +
 +    /**
 +     * A number representing the maximum number of executors any single topology can acquire.
 +     */
 +    public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
 +    public static final Object NIMBUS_EXECUTORS_PER_TOPOLOGY_SCHEMA = Number.class;
  
      /**
       * This parameter is used by the storm-deploy project to configure the
@@@ -711,8 -430,9 +709,8 @@@
       * restart the worker process.
       */
      public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
-     public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = Number.class;
+     public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
 -
      /**
       * How long a worker can go without heartbeating during the initial launch before
       * the supervisor tries to restart the worker process. This value override
@@@ -720,8 -440,9 +718,8 @@@
       * overhead to starting and configuring the JVM on launch.
       */
      public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
-     public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = Number.class;
+     public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
 -
      /**
       * Whether or not the supervisor should launch workers assigned to it. Defaults
       * to true -- and you should probably never change this value. This configuration
@@@ -742,25 -464,11 +740,25 @@@
       * need to be restarted.
       */
      public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
-     public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
+     public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 +     * Should the supervior try to run the worker as the lauching user or not.  Defaults to false.
 +     */
 +    public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
 +    public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
 +
 +    /**
 +     * Full path to the worker-laucher executable that will be used to lauch workers when
 +     * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
 +     */
 +    public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
 +    public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
 +
 +    /**
       * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
 -     * with an identifier for this worker.
 +     * with an identifier for this worker. Also, "%WORKER-ID%", "%STORM-ID%" and "%WORKER-PORT%" are
 +     * replaced with appropriate runtime values for this worker.
       */
      public static final String WORKER_CHILDOPTS = "worker.childopts";
      public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
@@@ -800,22 -500,9 +798,22 @@@
       * come through.
       */
      public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
-     public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = Number.class;
+     public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
  
 +    /**
 +     * How often a task should sync credentials, worst case.
 +     */
 +    public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 +    public static final Object TASK_CREDENTIALS_POLL_SECS_SCHEMA = Number.class;
 +
 +
 +    /**
 +     * A list of users that are allowed to interact with the topology.  To use this set
 +     * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
 +     */
 +    public static final String TOPOLOGY_USERS = "topology.users";
 +    public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
  
      /**
       * True if Storm should timeout messages or not. Defaults to true. This is meant to be used
@@@ -936,15 -623,15 +934,15 @@@
  
  
      /**
 -     * The maximum number of tuples that can be pending on a spout task at any given time. 
 -     * This config applies to individual tasks, not to spouts or topologies as a whole. 
 -     * 
 +     * The maximum number of tuples that can be pending on a spout task at any given time.
 +     * This config applies to individual tasks, not to spouts or topologies as a whole.
 +     *
       * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
 -     * Note that this config parameter has no effect for unreliable spouts that don't tag 
 +     * Note that this config parameter has no effect for unreliable spouts that don't tag
       * their tuples with a message id.
       */
-     public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
-     public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
+     public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
+     public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
       * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
@@@ -973,13 -660,13 +971,13 @@@
       * The percentage of tuples to sample to produce stats for a task.
       */
      public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
-     public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
+     public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = ConfigValidation.DoubleValidator;
  
      /**
 -     * The time period that builtin metrics data in bucketed into. 
 +     * The time period that builtin metrics data in bucketed into.
       */
      public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
-     public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
+     public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
       * Whether or not to use Java serialization in a topology.
@@@ -994,12 -681,19 +992,25 @@@
      public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
  
      /**
 +     * Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS.
 +     */
 +    public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts";
 +    public static final Object TOPOLOGY_WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 +
 +    /**
+      * Topology-specific classpath for the worker child process. This is combined to the usual classpath.
+      */
+     public static final String TOPOLOGY_CLASSPATH="topology.classpath";
+     public static final Object TOPOLOGY_CLASSPATH_SCHEMA = ConfigValidation.StringOrStringListValidator;
+ 
+     /**
+      * Topology-specific environment variables for the worker child process. 
+      * This is added to the existing environment (that of the supervisor)
+      */
+      public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
+      public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class;
+ 
+     /**
       * This config is available for TransactionalSpouts, and contains the id ( a String) for
       * the transactional topology. This id is used to store the state of the transactional
       * topology in Zookeeper.
@@@ -1061,10 -755,10 +1072,10 @@@
      * via the TopologyContext.
      */
      public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
-     public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
+     public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 -     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, 
 +     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
       * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
       * reported to Zookeeper per task for every 10 second interval of time.
       */
@@@ -1183,25 -853,27 +1194,41 @@@
       * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
       */
      public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
 -    public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = Map.class;
 +    public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
 +
 +    /**
 +     * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
 +     * to backtype.storm.scheduler.multitenant.MultitenantScheduler
 +     */
 +    public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools";
 +    public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
 +
 +    /**
 +     * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
 +     * to backtype.storm.scheduler.multitenant.MultitenantScheduler
 +     */
 +    public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
 +    public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
  
+     public static void setClasspath(Map conf, String cp) {
+         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
+     }
+ 
+     public void setClasspath(String cp) {
+         setClasspath(this, cp);
+     }
+ 
+     public static void setEnvironment(Map conf, Map env) {
+         conf.put(Config.TOPOLOGY_ENVIRONMENT, env);
+     }
+ 
+     public void setEnvironment(Map env) {
+         setEnvironment(this, env);
+     }
+ 
      public static void setDebug(Map conf, boolean isOn) {
          conf.put(Config.TOPOLOGY_DEBUG, isOn);
 -    } 
 +    }
  
      public void setDebug(boolean isOn) {
          setDebug(this, isOn);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/ConfigValidation.java
index e990921,156ccf8..14394a0
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@@ -177,9 -79,79 +177,79 @@@ public class ConfigValidation 
      /**
       * Validates is a list of Maps.
       */
 -    public static Object MapsValidator = FieldListValidatorFactory(Map.class);
 +    public static Object MapsValidator = listFv(Map.class, true);
  
      /**
+      * Validates a Integer.
+      */
+     public static Object IntegerValidator = new FieldValidator() {
+         @Override
+         public void validateField(String name, Object o) throws IllegalArgumentException {
+             if (o == null) {
+                 // A null value is acceptable.
+                 return;
+             }
+             final long i;
+             if (o instanceof Number &&
+                     (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) {
+                 if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
+                     return;
+                 }
+             }
+ 
+             throw new IllegalArgumentException("Field " + name + " must be an Integer within type range.");
+         }
+     };
+ 
+     /**
+      * Validates is a list of Integers.
+      */
+     public static Object IntegersValidator = new FieldValidator() {
+         @Override
+         public void validateField(String name, Object field)
+                 throws IllegalArgumentException {
+             if (field == null) {
+                 // A null value is acceptable.
+                 return;
+             }
+             if (field instanceof Iterable) {
+                 for (Object o : (Iterable)field) {
+                     final long i;
+                     if (o instanceof Number &&
+                             ((i = ((Number)o).longValue()) == ((Number)o).doubleValue()) &&
+                             (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE)) {
+                         // pass the test
+                     } else {
+                         throw new IllegalArgumentException(
+                                 "Each element of the list " + name + " must be an Integer within type range.");
+                     }
+                 }
+                 return;
+             }
+         }
+     };
+ 
+     /**
+      * Validates a Double.
+      */
+     public static Object DoubleValidator = new FieldValidator() {
+         @Override
+         public void validateField(String name, Object o) throws IllegalArgumentException {
+             if (o == null) {
+                 // A null value is acceptable.
+                 return;
+             }
+ 
+             // we can provide a lenient way to convert int/long to double with losing some precision
+             if (o instanceof Number) {
+                 return;
+             }
+ 
+             throw new IllegalArgumentException("Field " + name + " must be an Double.");
+         }
+     };
+ 
+     /**
       * Validates a power of 2.
       */
      public static Object PowerOf2Validator = new FieldValidator() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/Utils.java
index 87f1654,364b53f..fff91e6
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@@ -323,18 -314,19 +323,21 @@@ public class Utils 
        if (null == o) {
          return defaultValue;
        }
-       
-       if(o instanceof Long) {
-           return ((Long) o ).intValue();
-       } else if (o instanceof Integer) {
-           return (Integer) o;
-       } else if (o instanceof Short) {
-           return ((Short) o).intValue();
+ 
+       if (o instanceof Integer ||
+           o instanceof Short ||
+           o instanceof Byte) {
+           return ((Number) o).intValue();
+       } else if (o instanceof Long) {
+           final long l = (Long) o;
+           if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+               return (int) l;
+           }
 +      } else if (o instanceof String) {
 +          return Integer.parseInt((String) o);
-       } else {
-           throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
        }
+ 
+       throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
      }
  
      public static boolean getBoolean(Object o, boolean defaultValue) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/ui/public/component.html
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/component.html
index 092eb40,6353142..803fffb
--- a/storm-core/src/ui/public/component.html
+++ b/storm-core/src/ui/public/component.html
@@@ -26,10 -26,9 +26,11 @@@
  <script src="/js/purl.js" type="text/javascript"></script>
  <script src="/js/bootstrap-twipsy.js" type="text/javascript"></script>
  <script src="/js/script.js" type="text/javascript"></script>
+ <script src="/js/moment.min.js" type="text/javascript"></script>
  </head>
  <body>
 +<div id="ui-user">
 +</div>
  <h1><a href="/">Storm UI</a></h1>
  <div id="component-summary">
  </div>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/ui/public/js/script.js
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/cluster_test.clj
index f30c6a8,63efd30..7ed1028
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@@ -220,10 -206,11 +220,11 @@@
    (with-inprocess-zookeeper zk-port
      (with-simulated-time
        (let [state (mk-storm-state zk-port)]
 -        (.report-error state "a" "1"(local-hostname) 6700  (RuntimeException.))
 +        (.report-error state "a" "1" (local-hostname) 6700 (RuntimeException.))
          (validate-errors! state "a" "1" ["RuntimeException"])
+         (advance-time-secs! 1)
          (.report-error state "a" "1" (local-hostname) 6700 (IllegalArgumentException.))
-         (validate-errors! state "a" "1" ["RuntimeException" "IllegalArgumentException"])
+         (validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
          (doseq [i (range 10)]
            (.report-error state "a" "2" (local-hostname) 6700 (RuntimeException.))
            (advance-time-secs! 2))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/supervisor_test.clj
index 3dcd275,584f0d9..6b40060
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@@ -311,100 -303,39 +313,137 @@@
                                        mock-worker-id)
              (verify-first-call-args-for-indices launch-process
                                                  [0]
-                                                 exp-args)))))))
+                                                 exp-args))))
+       (testing "testing topology.classpath is added to classpath"
+         (let [topo-cp "/any/path"
+               exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
+           (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
+                      supervisor-stormdist-root nil
+                      supervisor/jlp nil
++                     set-worker-user! nil
++                     supervisor/write-log-metadata! nil
+                      launch-process nil
+                      current-classpath "/base"]
+                     (supervisor/launch-worker mock-supervisor
+                                               mock-storm-id
+                                               mock-port
+                                               mock-worker-id)
+                     (verify-first-call-args-for-indices launch-process
+                                                         [0]
+                                                         exp-args))))
+       (testing "testing topology.environment is added to environment for worker launch"
+         (let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
++              full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
+               exp-args (exp-args-fn [] [] mock-cp)
+               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
+           (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
+                      supervisor-stormdist-root nil
+                      supervisor/jlp nil
+                      launch-process nil
++                     set-worker-user! nil
++                     supervisor/write-log-metadata! nil
+                      current-classpath "/base"]
+                     (supervisor/launch-worker mock-supervisor
+                                               mock-storm-id
+                                               mock-port
+                                               mock-worker-id)
+                     (verify-first-call-args-for-indices launch-process
+                                                         [2]
 -                                                        (merge topo-env {"LD_LIBRARY_PATH" nil}))))))))
++                                                        full-env)))))))
 +
 +(defn rm-r [f]
 +  (if (.isDirectory f)
 +    (for [sub (.listFiles f)] (rm-r sub))
 +    (.delete f) 
 +  ))
 +
 +(deftest test-worker-launch-command-run-as-user
 +  (testing "*.worker.childopts configuration"
 +    (let [mock-port "42"
 +          mock-storm-id "fake-storm-id"
 +          mock-worker-id "fake-worker-id"
 +          mock-cp "mock-classpath'quote-on-purpose"
 +          storm-local (str "/tmp/" (UUID/randomUUID))
 +          worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
 +          exp-launch ["/bin/worker-launcher"
 +                      "me"
 +                      "worker"
 +                      (str storm-local "/workers/" mock-worker-id)
 +                      worker-script]
 +          exp-script-fn (fn [opts topo-opts]
 +                       (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java' '-server'"
 +                                " " (shell-cmd opts)
 +                                " " (shell-cmd topo-opts)
 +                                " '-Djava.library.path='"
 +                                " '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
 +                                " '-Dstorm.home='"
 +                                " '-Dlogback.configurationFile=/logback/cluster.xml'"
 +                                " '-Dstorm.id=" mock-storm-id "'"
 +                                " '-Dworker.id=" mock-worker-id "'"
 +                                " '-Dworker.port=" mock-port "'"
 +                                " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
 +                                " 'backtype.storm.daemon.worker'"
 +                                " '" mock-storm-id "'"
 +                                " '" mock-port "'"
 +                                " '" mock-worker-id "';"))]
 +      (.mkdirs (io/file storm-local "workers" mock-worker-id))
 +      (try
 +      (testing "testing *.worker.childopts as strings with extra spaces"
 +        (let [string-opts "-Dfoo=bar  -Xmx1024m"
 +              topo-string-opts "-Dkau=aux   -Xmx2048m"
 +              exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
 +                                    ["-Dkau=aux" "-Xmx2048m"])
 +              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
 +                                      STORM-LOCAL-DIR storm-local
 +                                      SUPERVISOR-RUN-WORKER-AS-USER true
 +                                      WORKER-CHILDOPTS string-opts}}]
 +          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
 +                                                 topo-string-opts
 +                                                 TOPOLOGY-SUBMITTER-USER "me"}
 +                     add-to-classpath mock-cp
 +                     supervisor-stormdist-root nil
 +                     launch-process nil
 +                     set-worker-user! nil
 +                     supervisor/java-cmd "java"
 +                     supervisor/jlp nil
 +                     supervisor/write-log-metadata! nil]
 +            (supervisor/launch-worker mock-supervisor
 +                                      mock-storm-id
 +                                      mock-port
 +                                      mock-worker-id)
 +            (verify-first-call-args-for-indices launch-process
 +                                                [0]
 +                                                exp-launch))
 +          (is (= (slurp worker-script) exp-script))))
 +      (testing "testing *.worker.childopts as list of strings, with spaces in values"
 +        (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
 +              topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
 +              exp-script (exp-script-fn list-opts topo-list-opts)
 +              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
 +                                      STORM-LOCAL-DIR storm-local
 +                                      SUPERVISOR-RUN-WORKER-AS-USER true
 +                                      WORKER-CHILDOPTS list-opts}}]
 +          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
 +                                                 topo-list-opts
 +                                                 TOPOLOGY-SUBMITTER-USER "me"}
 +                     add-to-classpath mock-cp
 +                     supervisor-stormdist-root nil
 +                     launch-process nil
 +                     set-worker-user! nil
 +                     supervisor/java-cmd "java"
 +                     supervisor/jlp nil
 +                     supervisor/write-log-metadata! nil]
 +            (supervisor/launch-worker mock-supervisor
 +                                      mock-storm-id
 +                                      mock-port
 +                                      mock-worker-id)
 +            (verify-first-call-args-for-indices launch-process
 +                                                [0]
 +                                                exp-launch))
 +          (is (= (slurp worker-script) exp-script))))
 +(finally (rm-r (io/file storm-local)))
 +))))
  
  (deftest test-workers-go-bananas
    ;; test that multiple workers are started for a port, and test that
@@@ -422,142 -353,59 +461,198 @@@
    ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
    )
  
 +(deftest test-supervisor-data-acls
 +  (testing "supervisor-data uses correct ACLs"
 +    (let [scheme "digest"
 +          digest "storm:thisisapoorpassword"
 +          auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
 +                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
 +          expected-acls supervisor/SUPERVISOR-ZK-ACLS
 +          fake-isupervisor (reify ISupervisor
 +                             (getSupervisorId [this] nil)
 +                             (getAssignmentId [this] nil))]
 +      (stubbing [uptime-computer nil
 +                 cluster/mk-storm-cluster-state nil
 +                 supervisor-state nil
 +                 local-hostname nil
 +                 mk-timer nil]
 +        (supervisor/supervisor-data auth-conf nil fake-isupervisor)
 +        (verify-call-times-for cluster/mk-storm-cluster-state 1)
 +        (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
 +                                            expected-acls)))))
 +
 +(deftest test-write-log-metadata
 +  (testing "supervisor writes correct data to logs metadata file"
 +    (let [exp-owner "alice"
 +          exp-worker-id "42"
 +          exp-storm-id "0123456789"
 +          exp-port 4242
 +          exp-logs-users ["bob" "charlie" "daryl"]
 +          storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
 +                      TOPOLOGY-USERS ["charlie" "bob"]
 +                      LOGS-USERS ["daryl"]}
 +          exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
 +                    "worker-id" exp-worker-id
 +                    LOGS-USERS exp-logs-users}
 +          conf {}]
 +      (mocking [supervisor/write-log-metadata-to-yaml-file!]
 +        (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
 +                                        exp-storm-id exp-port conf)
 +        (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
 +                                      exp-storm-id exp-port exp-data conf)))))
 +
 +(deftest test-worker-launcher-requires-user
 +  (testing "worker-launcher throws on blank user"
 +    (mocking [launch-process]
 +      (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
 +                                  #"(?i).*user cannot be blank.*"
 +                                  (supervisor/worker-launcher {} nil ""))))))
 +
 +(defn found? [sub-str input-str]
 +  (if (string? input-str)
 +    (contrib-str/substring? sub-str (str input-str))
 +    (some? #(contrib-str/substring? sub-str %) input-str)))
 +
 +(defn not-found? [sub-str input-str]
 +    (complement (found? sub-str input-str)))
 +
 +(deftest test-substitute-childopts-happy-path
 +  (testing "worker-launcher replaces ids in childopts"
 +    (let [ worker-id "w-01"
 +           storm-id "s-01"
 +           port 9999
 +           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"
 +           ]
 +      (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
 +      (is (not-found? "%WORKER-ID%" childopts-with-ids))
 +      (is (found? "w-01" childopts-with-ids))
 +      (is (not-found? "%STORM-ID%" childopts-with-ids))
 +      (is (found? "s-01" childopts-with-ids))
 +      (is (not-found? "%WORKER-PORT%" childopts-with-ids))
 +      (is (found? "-9999." childopts-with-ids))
 +      (is (not-found? "%ID%" childopts-with-ids))
 +      (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
 +    )))
 +
 +(deftest test-substitute-childopts-storm-id-alone
 +  (testing "worker-launcher replaces ids in childopts"
 +    (let [ worker-id "w-01"
 +           storm-id "s-01"
 +           port 9999
 +           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%STORM-ID%.log"]
 +           (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
 +           (is (not-found? "%WORKER-ID%" childopts-with-ids))
 +           (is (not-found? "w-01" childopts-with-ids))
 +           (is (not-found? "%STORM-ID%" childopts-with-ids))
 +           (is (found? "s-01" childopts-with-ids))
 +           (is (not-found? "%WORKER-PORT%" childopts-with-ids))
 +           (is (not-found? "-9999." childopts-with-ids))
 +           (is (not-found? "%ID%" childopts-with-ids))
 +           (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids))     )))
 +
 +(deftest test-substitute-childopts-no-keys
 +  (testing "worker-launcher has no ids to replace in childopts"
 +    (let [ worker-id "w-01"
 +           storm-id "s-01"
 +           port 9999
 +           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"]
 +           (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
 +           (is (not-found? "%WORKER-ID%" childopts-with-ids))
 +           (is (not-found? "w-01" childopts-with-ids))
 +           (is (not-found? "%STORM-ID%" childopts-with-ids))
 +           (is (not-found? "s-01" childopts-with-ids))
 +           (is (not-found? "%WORKER-PORT%" childopts-with-ids))
 +           (is (not-found? "-9999." childopts-with-ids))
 +           (is (not-found? "%ID%" childopts-with-ids))
 +           (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids))    )))
 +
 +(deftest test-substitute-childopts-nil-childopts
 +  (testing "worker-launcher has nil childopts"
 +    (let [ worker-id "w-01"
 +           storm-id "s-01"
 +           port 9999
 +           childopts nil]
 +           (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
 +           (is (not-found? "%WORKER-ID%" childopts-with-ids))
 +           (is (not-found? "w-01" childopts-with-ids))
 +           (is (not-found? "%STORM-ID%" childopts-with-ids))
 +           (is (not-found? "s-01" childopts-with-ids))
 +           (is (not-found? "%WORKER-PORT%" childopts-with-ids))
 +           (is (not-found? "-9999." childopts-with-ids))
 +           (is (not-found? "%ID%" childopts-with-ids))
 +           (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
 +    )))
 +
 +(deftest test-substitute-childopts-nil-ids
 +  (testing "worker-launcher has nil ids"
 +    (let [ worker-id nil
 +           storm-id "s-01"
 +           port 9999
 +           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"]
 +      (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
 +      (is (not-found? "%WORKER-ID%" childopts-with-ids))
 +      (is (not-found? "w-01" childopts-with-ids))
 +      (is (not-found? "%STORM-ID%" childopts-with-ids))
 +      (is (found? "s-01" childopts-with-ids))
 +      (is (not-found? "%WORKER-PORT%" childopts-with-ids))
 +      (is (found? "-9999." childopts-with-ids))
 +      (is (not-found? "%ID%" childopts-with-ids))
 +      (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
 +      )))
 +
+ (deftest test-retry-read-assignments
+   (with-simulated-time-local-cluster [cluster
+                                       :supervisors 0
+                                       :ports-per-supervisor 2
+                                       :daemon-conf {NIMBUS-REASSIGN false
+                                                     NIMBUS-MONITOR-FREQ-SECS 10
+                                                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                     TOPOLOGY-ACKER-EXECUTORS 0}]
+     (letlocals
+      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+      (bind topology1 (thrift/mk-topology
+                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                       {}))
+      (bind topology2 (thrift/mk-topology
+                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                       {}))
+      (bind state (:storm-cluster-state cluster))
+      (bind changed (capture-changed-workers
+                     (submit-mocked-assignment
+                      (:nimbus cluster)
+                      "topology1"
+                      {TOPOLOGY-WORKERS 2}
+                      topology1
+                      {1 "1"
+                       2 "1"}
+                      {[1] ["sup1" 1]
+                       [2] ["sup1" 2]
+                       })
+                     (submit-mocked-assignment
+                      (:nimbus cluster)
+                      "topology2"
+                      {TOPOLOGY-WORKERS 2}
+                      topology2
+                      {1 "1"
+                       2 "1"}
+                      {[1] ["sup1" 1]
+                       [2] ["sup1" 2]
+                       })
+                     (advance-cluster-time cluster 10)
+                     ))
+      (is (empty? (:launched changed)))
+      (bind options (RebalanceOptions.))
+      (.set_wait_secs options 0)
+      (bind changed (capture-changed-workers
+                     (.rebalance (:nimbus cluster) "topology2" options)
+                     (advance-cluster-time cluster 10)
+                     (heartbeat-workers cluster "sup1" [1 2 3 4])
+                     (advance-cluster-time cluster 10)
+                     ))
+      (validate-launched-once (:launched changed)
+                              {"sup1" [1 2]}
+                              (get-storm-id (:storm-cluster-state cluster) "topology1"))
+      (validate-launched-once (:launched changed)
+                              {"sup1" [3 4]}
+                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
+      )))