You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/03/04 19:13:40 UTC

[2/6] storm git commit: Merge branch 'master' of github.com:apache/storm into storm1529

Merge branch 'master' of github.com:apache/storm into storm1529


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/61c9702a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/61c9702a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/61c9702a

Branch: refs/heads/master
Commit: 61c9702a924eb6b1b8bd3f0a829678a50f02bb67
Parents: 56a7a02 12ceb09
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Mon Feb 15 23:39:27 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Mon Feb 15 23:39:27 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                    |  11 +
 README.markdown                                 |   1 +
 bin/storm-config.cmd                            |   6 +-
 bin/storm.cmd                                   |  47 +-
 bin/storm.py                                    |   8 +-
 dev-tools/travis/travis-script.sh               |   4 +-
 external/sql/storm-sql-core/pom.xml             |   9 +
 external/storm-elasticsearch/pom.xml            |   2 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 log4j2/cluster.xml                              |   2 +-
 log4j2/worker.xml                               |   2 +-
 pom.xml                                         |   9 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 storm-core/src/clj/org/apache/storm/cluster.clj |  27 +-
 .../cluster_state/zookeeper_state_factory.clj   |  14 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/config_value.clj   |  25 -
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  29 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  23 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 530 +++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  68 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 170 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 204 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  78 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  10 +-
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../clj/org/apache/storm/messaging/loader.clj   |  34 -
 .../clj/org/apache/storm/messaging/local.clj    |  23 -
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |  24 +-
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  89 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  97 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 923 +----------------
 .../src/clj/org/apache/storm/zookeeper.clj      |   1 -
 .../org/apache/storm/command/ConfigValue.java   |  30 +
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../utils/StormConnectionStateConverter.java    |  44 +
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 989 +++++++++++++++++--
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       | 100 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  20 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 131 ++-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 672 +++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 90 files changed, 3194 insertions(+), 2435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index e14c861,ae9e92f..8d1b6a6
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -260,11 -264,10 +264,11 @@@
          (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
            (rmr-as-user conf id (ConfigUtils/workerRoot conf id))
            (do
-             (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
+             (Utils/forceDelete (ConfigUtils/workerHeartbeatsRoot conf id))
              ;; this avoids a race condition with worker or subprocess writing pid around same time
-             (rmr (ConfigUtils/workerPidsRoot conf id))
-             (rmr (ConfigUtils/workerTmpRoot conf id))
-             (rmr (ConfigUtils/workerRoot conf id))))
+             (Utils/forceDelete (ConfigUtils/workerPidsRoot conf id))
++            (Utils/forceDelete (ConfigUtils/workerTmpRoot conf id))
+             (Utils/forceDelete (ConfigUtils/workerRoot conf id))))
          (ConfigUtils/removeWorkerUserWSE conf id)
          (remove-dead-worker id)
        ))
@@@ -373,12 -378,12 +379,13 @@@
                mem-onheap (.get_mem_on_heap resources)]
            ;; This condition checks for required files exist before launching the worker
            (if (required-topo-files-exist? conf storm-id)
-             (do
+             (let [pids-path (ConfigUtils/workerPidsRoot conf id)
+                   hb-path (ConfigUtils/workerHeartbeatsRoot conf id)]
                (log-message "Launching worker with assignment "
                  (get-worker-assignment-helper-msg assignment supervisor port id))
-               (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
-               (local-mkdirs (ConfigUtils/workerTmpRoot conf id))
-               (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
+               (FileUtils/forceMkdir (File. pids-path))
++              (FileUtils/forceMkdir (File. (ConfigUtils/workerTmpRoot conf id)))
+               (FileUtils/forceMkdir (File. hb-path))
                (launch-worker supervisor
                  (:storm-id assignment)
                  port

http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 91c4057,9c31ddf..da5a5bc
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -298,10 -316,9 +316,10 @@@
                                  "-Dworkers.artifacts=/tmp/workers-artifacts"
                                  "-Dstorm.conf.file="
                                  "-Dstorm.options="
-                                 (str "-Dstorm.log.dir=" file-path-separator "logs")
-                                 (str "-Djava.io.tmpdir=/tmp/workers" file-path-separator mock-worker-id file-path-separator "tmp")
+                                 (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs")
++                                (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
                                  (str "-Dlogging.sensitivity=" mock-sensitivity)
-                                 (str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
+                                 (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
                                  "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
                                  (str "-Dstorm.id=" mock-storm-id)
                                  (str "-Dworker.id=" mock-worker-id)
@@@ -326,13 -348,12 +349,13 @@@
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 +                          (workerRootImpl [conf] "/tmp/workers")
                            (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
-               (stubbing [add-to-classpath mock-cp
-                      launch-process nil
-                      supervisor/jlp nil
-                      supervisor/write-log-metadata! nil
-                      supervisor/create-blobstore-links nil]
+           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                       _ (UtilsInstaller. utils-spy)]
+               (stubbing [supervisor/jlp nil
+                          supervisor/write-log-metadata! nil
+                          supervisor/create-blobstore-links nil]
                  (supervisor/launch-worker mock-supervisor
                                        mock-storm-id
                                        mock-port
@@@ -348,45 -373,58 +375,60 @@@
                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
                                        WORKER-CHILDOPTS list-opts}}
                mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                             topo-list-opts}]
-             (with-open [_ (proxy [MockedConfigUtils] []
-                             (supervisorStormDistRootImpl ([conf] nil)
-                                                          ([conf storm-id] nil))
-                             (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                             (setWorkerUserWSEImpl [conf worker-id user] nil)
-                             (workerRootImpl [conf] "/tmp/workers")
-                             (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
-                 (stubbing [add-to-classpath mock-cp
-                      launch-process nil
-                      supervisor/jlp nil
-                      supervisor/write-log-metadata! nil
-                      supervisor/create-blobstore-links nil]
-                 (supervisor/launch-worker mock-supervisor
-                                       mock-storm-id
-                                       mock-port
-                                       mock-worker-id
-                                       mock-mem-onheap)
-                 (verify-first-call-args-for-indices launch-process
-                                                 [0]
-                                                 exp-args)))))
+                                             topo-list-opts}
+               cu-proxy (proxy [ConfigUtils] []
+                           (supervisorStormDistRootImpl ([conf] nil)
+                                                        ([conf storm-id] nil))
+                           (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
+                           (setWorkerUserWSEImpl [conf worker-id user] nil)
++                          (workerRootImpl [conf] "/tmp/workers")
+                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+               utils-spy (->>
+                           (proxy [Utils] []
+                             (addToClasspathImpl [classpath paths] mock-cp)
+                             (launchProcessImpl [& _] nil))
+                           Mockito/spy)]
+             (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                         _ (UtilsInstaller. utils-spy)]
+                 (stubbing [supervisor/jlp nil
+                            supervisor/write-log-metadata! nil
+                            supervisor/create-blobstore-links nil]
+                   (supervisor/launch-worker mock-supervisor
+                                             mock-storm-id
+                                             mock-port
+                                             mock-worker-id
+                                             mock-mem-onheap)
+                   (. (Mockito/verify utils-spy)
+                      (launchProcessImpl (Matchers/eq exp-args)
+                                         (Matchers/any)
+                                         (Matchers/any)
+                                         (Matchers/any)
+                                         (Matchers/any)))))))
+ 
        (testing "testing topology.classpath is added to classpath"
-         (let [topo-cp (str file-path-separator "any" file-path-separator "path")
-               exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+         (let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
+               exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
-               mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
-           (with-open [_ (proxy [MockedConfigUtils] []
+               mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
+               cu-proxy (proxy [ConfigUtils] []
                            (supervisorStormDistRootImpl ([conf] nil)
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 +                          (workerRootImpl [conf] "/tmp/workers")
-                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+               utils-spy (->>
+                           (proxy [Utils] []
+                             (currentClasspathImpl []
+                               (str Utils/FILE_PATH_SEPARATOR "base"))
+                             (launchProcessImpl [& _] nil))
+                           Mockito/spy)]
+           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                       _ (UtilsInstaller. utils-spy)]
                  (stubbing [supervisor/jlp nil
                       supervisor/write-log-metadata! nil
-                      launch-process nil
-                      current-classpath (str file-path-separator "base")
                       supervisor/create-blobstore-links nil]
-                     (supervisor/launch-worker mock-supervisor
+                   (supervisor/launch-worker mock-supervisor
                                                mock-storm-id
                                                mock-port
                                                mock-worker-id
@@@ -405,21 -446,29 +450,30 @@@
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 +                          (workerRootImpl [conf] "/tmp/workers")
-                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+               utils-spy (->>
+                           (proxy [Utils] []
+                             (currentClasspathImpl []
+                               (str Utils/FILE_PATH_SEPARATOR "base"))
+                             (launchProcessImpl [& _] nil))
+                           Mockito/spy)]
+           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                       _ (UtilsInstaller. utils-spy)]
              (stubbing [supervisor/jlp nil
-                      launch-process nil
-                      supervisor/write-log-metadata! nil
-                      current-classpath (str file-path-separator "base")
-                      supervisor/create-blobstore-links nil]
-                     (supervisor/launch-worker mock-supervisor
-                                               mock-storm-id
-                                               mock-port
-                                               mock-worker-id
-                                               mock-mem-onheap)
-                     (verify-first-call-args-for-indices launch-process
-                                                         [2]
-                                                         full-env))))))))
+                        supervisor/write-log-metadata! nil
+                        supervisor/create-blobstore-links nil]
+               (supervisor/launch-worker mock-supervisor
+                                         mock-storm-id
+                                         mock-port
+                                         mock-worker-id
+                                         mock-mem-onheap)
+               (. (Mockito/verify utils-spy)
+                  (launchProcessImpl (Matchers/any)
+                                     (Matchers/eq full-env)
+                                     (Matchers/any)
+                                     (Matchers/any)
+                                     (Matchers/any))))))))))
  
  (deftest test-worker-launch-command-run-as-user
    (testing "*.worker.childopts configuration"
@@@ -578,184 -646,189 +652,190 @@@
            (supervisor/supervisor-data auth-conf nil fake-isupervisor)
            (verify-call-times-for cluster/mk-storm-cluster-state 1)
            (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
-                                               expected-acls))))))
- 
- (deftest test-write-log-metadata
-   (testing "supervisor writes correct data to logs metadata file"
-     (let [exp-owner "alice"
-           exp-worker-id "42"
-           exp-storm-id "0123456789"
-           exp-port 4242
-           exp-logs-users ["bob" "charlie" "daryl"]
-           exp-logs-groups ["read-only-group" "special-group"]
-           storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
-                       TOPOLOGY-USERS ["charlie" "bob"]
-                       TOPOLOGY-GROUPS ["special-group"]
-                       LOGS-GROUPS ["read-only-group"]
-                       LOGS-USERS ["daryl"]}
-           exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
-                     "worker-id" exp-worker-id
-                     LOGS-USERS exp-logs-users
-                     LOGS-GROUPS exp-logs-groups}
-           conf {}]
-       (mocking [supervisor/write-log-metadata-to-yaml-file!]
-         (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
-                                         exp-storm-id exp-port conf)
-         (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
-                                       exp-storm-id exp-port exp-data conf)))))
- 
- (deftest test-worker-launcher-requires-user
-   (testing "worker-launcher throws on blank user"
-     (mocking [launch-process]
-       (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
-                                   #"(?i).*user cannot be blank.*"
-                                   (supervisor/worker-launcher {} nil ""))))))
- 
- (defn found? [sub-str input-str]
-   (if (string? input-str)
-     (contrib-str/substring? sub-str (str input-str))
-     (boolean (some #(contrib-str/substring? sub-str %) input-str))))
- 
- (defn not-found? [sub-str input-str]
+               expected-acls)))))
+ 
+   (deftest test-write-log-metadata
+     (testing "supervisor writes correct data to logs metadata file"
+       (let [exp-owner "alice"
+             exp-worker-id "42"
+             exp-storm-id "0123456789"
+             exp-port 4242
+             exp-logs-users ["bob" "charlie" "daryl"]
+             exp-logs-groups ["read-only-group" "special-group"]
+             storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+                         TOPOLOGY-USERS ["charlie" "bob"]
+                         TOPOLOGY-GROUPS ["special-group"]
+                         LOGS-GROUPS ["read-only-group"]
+                         LOGS-USERS ["daryl"]}
+             exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+                       "worker-id" exp-worker-id
+                       LOGS-USERS exp-logs-users
+                       LOGS-GROUPS exp-logs-groups}
+             conf {}]
+         (mocking [supervisor/write-log-metadata-to-yaml-file!]
+           (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+             exp-storm-id exp-port conf)
+           (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+             exp-storm-id exp-port exp-data conf)))))
+ 
+   (deftest test-worker-launcher-requires-user
+     (testing "worker-launcher throws on blank user"
+       (let [utils-proxy (proxy [Utils] []
+                           (launchProcessImpl [& _] nil))]
+         (with-open [_ (UtilsInstaller. utils-proxy)]
+           (is (try
+                 (supervisor/worker-launcher {} nil "")
+                 false
+                 (catch Throwable t
+                   (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+                        (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
+ 
+   (defn found? [sub-str input-str]
+     (if (string? input-str)
+       (contrib-str/substring? sub-str (str input-str))
+       (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+ 
+   (defn not-found? [sub-str input-str]
      (complement (found? sub-str input-str)))
  
- (deftest test-substitute-childopts-happy-path-string
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-happy-path-list
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-happy-path-list-arraylist
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-topology-id-alone
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-no-keys
-   (testing "worker-launcher has no ids to replace in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-nil-childopts
-   (testing "worker-launcher has nil childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts nil
-           expected-childopts nil
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-nil-ids
-   (testing "worker-launcher has nil ids"
-     (let [worker-id nil
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-retry-read-assignments
-   (with-simulated-time-local-cluster [cluster
-                                       :supervisors 0
-                                       :ports-per-supervisor 2
-                                       :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                     NIMBUS-MONITOR-FREQ-SECS 10
-                                                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                     TOPOLOGY-ACKER-EXECUTORS 0}]
-     (letlocals
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind topology1 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind topology2 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind state (:storm-cluster-state cluster))
-      (bind changed (capture-changed-workers
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology1"
-                      {TOPOLOGY-WORKERS 2}
-                      topology1
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology2"
-                      {TOPOLOGY-WORKERS 2}
-                      topology2
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                     (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                     ))
-      (is (empty? (:launched changed)))
-      (bind options (RebalanceOptions.))
-      (.set_wait_secs options 0)
-      (bind changed (capture-changed-workers
-                     (.rebalance (:nimbus cluster) "topology2" options)
-                     (advance-cluster-time cluster 10)
-                     (heartbeat-workers cluster "sup1" [1 2 3 4])
-                     (advance-cluster-time cluster 10)
-                     ))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [1 2]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology1"))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [3 4]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-      )))
+   (deftest test-substitute-childopts-happy-path-string
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-happy-path-list
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-happy-path-list-arraylist
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-topology-id-alone
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-no-keys
+     (testing "worker-launcher has no ids to replace in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-nil-childopts
+     (testing "worker-launcher has nil childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts nil
+             expected-childopts nil
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-nil-ids
+     (testing "worker-launcher has nil ids"
+       (let [worker-id nil
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-retry-read-assignments
+     (with-simulated-time-local-cluster [cluster
+                                         :supervisors 0
+                                         :ports-per-supervisor 2
+                                         :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+                                                       NIMBUS-MONITOR-FREQ-SECS 10
+                                                       TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                       TOPOLOGY-ACKER-EXECUTORS 0}]
+       (letlocals
+         (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+         (bind topology1 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind topology2 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind state (:storm-cluster-state cluster))
+         (bind changed (capture-changed-workers
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology1"
+                           {TOPOLOGY-WORKERS 2}
+                           topology1
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology2"
+                           {TOPOLOGY-WORKERS 2}
+                           topology2
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+                         (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+                         ))
+         (is (empty? (:launched changed)))
+         (bind options (RebalanceOptions.))
+         (.set_wait_secs options 0)
+         (bind changed (capture-changed-workers
+                         (.rebalance (:nimbus cluster) "topology2" options)
+                         (advance-cluster-time cluster 10)
+                         (heartbeat-workers cluster "sup1" [1 2 3 4])
+                         (advance-cluster-time cluster 10)
+                         ))
+         (validate-launched-once (:launched changed)
+           {"sup1" [1 2]}
+           (get-storm-id (:storm-cluster-state cluster) "topology1"))
+         (validate-launched-once (:launched changed)
+           {"sup1" [3 4]}
+           (get-storm-id (:storm-cluster-state cluster) "topology2"))
+         ))))
++