You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/03/04 19:13:40 UTC
[2/6] storm git commit: Merge branch 'master' of
github.com:apache/storm into storm1529
Merge branch 'master' of github.com:apache/storm into storm1529
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/61c9702a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/61c9702a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/61c9702a
Branch: refs/heads/master
Commit: 61c9702a924eb6b1b8bd3f0a829678a50f02bb67
Parents: 56a7a02 12ceb09
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Mon Feb 15 23:39:27 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Mon Feb 15 23:39:27 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 11 +
README.markdown | 1 +
bin/storm-config.cmd | 6 +-
bin/storm.cmd | 47 +-
bin/storm.py | 8 +-
dev-tools/travis/travis-script.sh | 4 +-
external/sql/storm-sql-core/pom.xml | 9 +
external/storm-elasticsearch/pom.xml | 2 +
.../storm/hbase/security/HBaseSecurityUtil.java | 36 +-
external/storm-mqtt/core/pom.xml | 4 +-
log4j2/cluster.xml | 2 +-
log4j2/worker.xml | 2 +-
pom.xml | 9 +-
storm-core/pom.xml | 11 +-
.../src/clj/org/apache/storm/LocalCluster.clj | 4 +-
storm-core/src/clj/org/apache/storm/clojure.clj | 8 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 27 +-
.../cluster_state/zookeeper_state_factory.clj | 14 +-
.../clj/org/apache/storm/command/blobstore.clj | 11 +-
.../org/apache/storm/command/config_value.clj | 25 -
.../org/apache/storm/command/dev_zookeeper.clj | 6 +-
.../clj/org/apache/storm/command/get_errors.clj | 12 +-
.../apache/storm/command/shell_submission.clj | 4 +-
storm-core/src/clj/org/apache/storm/config.clj | 18 +-
.../src/clj/org/apache/storm/converter.clj | 14 +-
.../src/clj/org/apache/storm/daemon/acker.clj | 21 +-
.../src/clj/org/apache/storm/daemon/common.clj | 29 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 23 +-
.../clj/org/apache/storm/daemon/executor.clj | 530 +++++-----
.../clj/org/apache/storm/daemon/logviewer.clj | 68 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 170 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 204 ++--
.../src/clj/org/apache/storm/daemon/task.clj | 2 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 78 +-
.../src/clj/org/apache/storm/disruptor.clj | 10 +-
storm-core/src/clj/org/apache/storm/event.clj | 2 +-
.../src/clj/org/apache/storm/local_state.clj | 9 +-
.../clj/org/apache/storm/messaging/loader.clj | 34 -
.../clj/org/apache/storm/messaging/local.clj | 23 -
.../org/apache/storm/pacemaker/pacemaker.clj | 7 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 24 +-
.../clj/org/apache/storm/process_simulator.clj | 4 +-
.../apache/storm/scheduler/DefaultScheduler.clj | 7 +-
.../apache/storm/scheduler/EvenScheduler.clj | 23 +-
.../storm/scheduler/IsolationScheduler.clj | 29 +-
storm-core/src/clj/org/apache/storm/stats.clj | 82 +-
storm-core/src/clj/org/apache/storm/testing.clj | 89 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 6 +-
storm-core/src/clj/org/apache/storm/timer.clj | 12 +-
.../clj/org/apache/storm/trident/testing.clj | 9 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 97 +-
.../src/clj/org/apache/storm/ui/helpers.clj | 14 +-
storm-core/src/clj/org/apache/storm/util.clj | 923 +----------------
.../src/clj/org/apache/storm/zookeeper.clj | 1 -
.../org/apache/storm/command/ConfigValue.java | 30 +
.../storm/logging/ThriftAccessLogger.java | 13 +-
.../serialization/SerializationFactory.java | 17 +-
.../staticmocking/MockedConfigUtils.java | 31 -
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +-
.../jvm/org/apache/storm/utils/Container.java | 11 +-
.../jvm/org/apache/storm/utils/IPredicate.java | 27 +
.../org/apache/storm/utils/NimbusClient.java | 2 +-
.../utils/StormConnectionStateConverter.java | 44 +
.../jvm/org/apache/storm/utils/TestUtils.java | 34 -
.../src/jvm/org/apache/storm/utils/Time.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 989 +++++++++++++++++--
.../storm/validation/ConfigValidation.java | 2 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 7 +
.../org/apache/storm/integration_test.clj | 100 +-
.../org/apache/storm/testing4j_test.clj | 37 +-
.../apache/storm/trident/integration_test.clj | 15 +-
.../test/clj/org/apache/storm/cluster_test.clj | 20 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../clj/org/apache/storm/logviewer_test.clj | 267 ++---
.../storm/messaging/netty_integration_test.clj | 2 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 131 ++-
.../scheduler/resource_aware_scheduler_test.clj | 21 +-
.../apache/storm/security/auth/auth_test.clj | 11 +-
.../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +-
.../BlowfishTupleSerializer_test.clj | 1 -
.../clj/org/apache/storm/serialization_test.clj | 23 +-
.../clj/org/apache/storm/supervisor_test.clj | 672 +++++++------
.../clj/org/apache/storm/transactional_test.clj | 18 +
.../clj/org/apache/storm/trident/state_test.clj | 5 +-
.../clj/org/apache/storm/trident/tuple_test.clj | 15 +-
.../test/clj/org/apache/storm/utils_test.clj | 16 +-
.../test/clj/org/apache/storm/worker_test.clj | 1 -
.../staticmocking/ConfigUtilsInstaller.java | 38 +
.../utils/staticmocking/UtilsInstaller.java | 38 +
.../storm/utils/staticmocking/package-info.java | 95 ++
90 files changed, 3194 insertions(+), 2435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index e14c861,ae9e92f..8d1b6a6
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -260,11 -264,10 +264,11 @@@
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(rmr-as-user conf id (ConfigUtils/workerRoot conf id))
(do
- (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
+ (Utils/forceDelete (ConfigUtils/workerHeartbeatsRoot conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
- (rmr (ConfigUtils/workerPidsRoot conf id))
- (rmr (ConfigUtils/workerTmpRoot conf id))
- (rmr (ConfigUtils/workerRoot conf id))))
+ (Utils/forceDelete (ConfigUtils/workerPidsRoot conf id))
++ (Utils/forceDelete (ConfigUtils/workerTmpRoot conf id))
+ (Utils/forceDelete (ConfigUtils/workerRoot conf id))))
(ConfigUtils/removeWorkerUserWSE conf id)
(remove-dead-worker id)
))
@@@ -373,12 -378,12 +379,13 @@@
mem-onheap (.get_mem_on_heap resources)]
;; This condition checks for required files exist before launching the worker
(if (required-topo-files-exist? conf storm-id)
- (do
+ (let [pids-path (ConfigUtils/workerPidsRoot conf id)
+ hb-path (ConfigUtils/workerHeartbeatsRoot conf id)]
(log-message "Launching worker with assignment "
(get-worker-assignment-helper-msg assignment supervisor port id))
- (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
- (local-mkdirs (ConfigUtils/workerTmpRoot conf id))
- (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
+ (FileUtils/forceMkdir (File. pids-path))
++ (FileUtils/forceMkdir (File. (ConfigUtils/workerTmpRoot conf id)))
+ (FileUtils/forceMkdir (File. hb-path))
(launch-worker supervisor
(:storm-id assignment)
port
http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 91c4057,9c31ddf..da5a5bc
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -298,10 -316,9 +316,10 @@@
"-Dworkers.artifacts=/tmp/workers-artifacts"
"-Dstorm.conf.file="
"-Dstorm.options="
- (str "-Dstorm.log.dir=" file-path-separator "logs")
- (str "-Djava.io.tmpdir=/tmp/workers" file-path-separator mock-worker-id file-path-separator "tmp")
+ (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs")
++ (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
(str "-Dlogging.sensitivity=" mock-sensitivity)
- (str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
+ (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
(str "-Dstorm.id=" mock-storm-id)
(str "-Dworker.id=" mock-worker-id)
@@@ -326,13 -348,12 +349,13 @@@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
- (stubbing [add-to-classpath mock-cp
- launch-process nil
- supervisor/jlp nil
- supervisor/write-log-metadata! nil
- supervisor/create-blobstore-links nil]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
+ (stubbing [supervisor/jlp nil
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@@ -348,45 -373,58 +375,60 @@@
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
WORKER-CHILDOPTS list-opts}}
mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts}]
- (with-open [_ (proxy [MockedConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (setWorkerUserWSEImpl [conf worker-id user] nil)
- (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
- (stubbing [add-to-classpath mock-cp
- launch-process nil
- supervisor/jlp nil
- supervisor/write-log-metadata! nil
- supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
- mock-storm-id
- mock-port
- mock-worker-id
- mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
- [0]
- exp-args)))))
+ topo-list-opts}
+ cu-proxy (proxy [ConfigUtils] []
+ (supervisorStormDistRootImpl ([conf] nil)
+ ([conf storm-id] nil))
+ (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
+ (setWorkerUserWSEImpl [conf worker-id user] nil)
++ (workerRootImpl [conf] "/tmp/workers")
+ (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ utils-spy (->>
+ (proxy [Utils] []
+ (addToClasspathImpl [classpath paths] mock-cp)
+ (launchProcessImpl [& _] nil))
+ Mockito/spy)]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
+ (stubbing [supervisor/jlp nil
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id
+ mock-mem-onheap)
+ (. (Mockito/verify utils-spy)
+ (launchProcessImpl (Matchers/eq exp-args)
+ (Matchers/any)
+ (Matchers/any)
+ (Matchers/any)
+ (Matchers/any)))))))
+
(testing "testing topology.classpath is added to classpath"
- (let [topo-cp (str file-path-separator "any" file-path-separator "path")
- exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+ (let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
+ exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
- mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
- (with-open [_ (proxy [MockedConfigUtils] []
+ mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
+ cu-proxy (proxy [ConfigUtils] []
(supervisorStormDistRootImpl ([conf] nil)
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+ (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ utils-spy (->>
+ (proxy [Utils] []
+ (currentClasspathImpl []
+ (str Utils/FILE_PATH_SEPARATOR "base"))
+ (launchProcessImpl [& _] nil))
+ Mockito/spy)]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
(stubbing [supervisor/jlp nil
supervisor/write-log-metadata! nil
- launch-process nil
- current-classpath (str file-path-separator "base")
supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
+ (supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id
@@@ -405,21 -446,29 +450,30 @@@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+ (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ utils-spy (->>
+ (proxy [Utils] []
+ (currentClasspathImpl []
+ (str Utils/FILE_PATH_SEPARATOR "base"))
+ (launchProcessImpl [& _] nil))
+ Mockito/spy)]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
(stubbing [supervisor/jlp nil
- launch-process nil
- supervisor/write-log-metadata! nil
- current-classpath (str file-path-separator "base")
- supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
- mock-storm-id
- mock-port
- mock-worker-id
- mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
- [2]
- full-env))))))))
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id
+ mock-mem-onheap)
+ (. (Mockito/verify utils-spy)
+ (launchProcessImpl (Matchers/any)
+ (Matchers/eq full-env)
+ (Matchers/any)
+ (Matchers/any)
+ (Matchers/any))))))))))
(deftest test-worker-launch-command-run-as-user
(testing "*.worker.childopts configuration"
@@@ -578,184 -646,189 +652,190 @@@
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- expected-acls))))))
-
- (deftest test-write-log-metadata
- (testing "supervisor writes correct data to logs metadata file"
- (let [exp-owner "alice"
- exp-worker-id "42"
- exp-storm-id "0123456789"
- exp-port 4242
- exp-logs-users ["bob" "charlie" "daryl"]
- exp-logs-groups ["read-only-group" "special-group"]
- storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
- TOPOLOGY-USERS ["charlie" "bob"]
- TOPOLOGY-GROUPS ["special-group"]
- LOGS-GROUPS ["read-only-group"]
- LOGS-USERS ["daryl"]}
- exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
- "worker-id" exp-worker-id
- LOGS-USERS exp-logs-users
- LOGS-GROUPS exp-logs-groups}
- conf {}]
- (mocking [supervisor/write-log-metadata-to-yaml-file!]
- (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
- exp-storm-id exp-port conf)
- (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
- exp-storm-id exp-port exp-data conf)))))
-
- (deftest test-worker-launcher-requires-user
- (testing "worker-launcher throws on blank user"
- (mocking [launch-process]
- (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
- #"(?i).*user cannot be blank.*"
- (supervisor/worker-launcher {} nil ""))))))
-
- (defn found? [sub-str input-str]
- (if (string? input-str)
- (contrib-str/substring? sub-str (str input-str))
- (boolean (some #(contrib-str/substring? sub-str %) input-str))))
-
- (defn not-found? [sub-str input-str]
+ expected-acls)))))
+
+ (deftest test-write-log-metadata
+ (testing "supervisor writes correct data to logs metadata file"
+ (let [exp-owner "alice"
+ exp-worker-id "42"
+ exp-storm-id "0123456789"
+ exp-port 4242
+ exp-logs-users ["bob" "charlie" "daryl"]
+ exp-logs-groups ["read-only-group" "special-group"]
+ storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+ TOPOLOGY-USERS ["charlie" "bob"]
+ TOPOLOGY-GROUPS ["special-group"]
+ LOGS-GROUPS ["read-only-group"]
+ LOGS-USERS ["daryl"]}
+ exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+ "worker-id" exp-worker-id
+ LOGS-USERS exp-logs-users
+ LOGS-GROUPS exp-logs-groups}
+ conf {}]
+ (mocking [supervisor/write-log-metadata-to-yaml-file!]
+ (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+ exp-storm-id exp-port conf)
+ (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+ exp-storm-id exp-port exp-data conf)))))
+
+ (deftest test-worker-launcher-requires-user
+ (testing "worker-launcher throws on blank user"
+ (let [utils-proxy (proxy [Utils] []
+ (launchProcessImpl [& _] nil))]
+ (with-open [_ (UtilsInstaller. utils-proxy)]
+ (is (try
+ (supervisor/worker-launcher {} nil "")
+ false
+ (catch Throwable t
+ (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+ (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
+
+ (defn found? [sub-str input-str]
+ (if (string? input-str)
+ (contrib-str/substring? sub-str (str input-str))
+ (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+
+ (defn not-found? [sub-str input-str]
(complement (found? sub-str input-str)))
- (deftest test-substitute-childopts-happy-path-string
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-happy-path-list
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-happy-path-list-arraylist
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-topology-id-alone
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-no-keys
- (testing "worker-launcher has no ids to replace in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-nil-childopts
- (testing "worker-launcher has nil childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts nil
- expected-childopts nil
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-nil-ids
- (testing "worker-launcher has nil ids"
- (let [worker-id nil
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-retry-read-assignments
- (with-simulated-time-local-cluster [cluster
- :supervisors 0
- :ports-per-supervisor 2
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
- TOPOLOGY-ACKER-EXECUTORS 0}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind topology1 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind topology2 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind state (:storm-cluster-state cluster))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology1"
- {TOPOLOGY-WORKERS 2}
- topology1
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- ))
- (is (empty? (:launched changed)))
- (bind options (RebalanceOptions.))
- (.set_wait_secs options 0)
- (bind changed (capture-changed-workers
- (.rebalance (:nimbus cluster) "topology2" options)
- (advance-cluster-time cluster 10)
- (heartbeat-workers cluster "sup1" [1 2 3 4])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed)
- {"sup1" [1 2]}
- (get-storm-id (:storm-cluster-state cluster) "topology1"))
- (validate-launched-once (:launched changed)
- {"sup1" [3 4]}
- (get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
+ (deftest test-substitute-childopts-happy-path-string
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-happy-path-list
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-happy-path-list-arraylist
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-topology-id-alone
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-no-keys
+ (testing "worker-launcher has no ids to replace in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-nil-childopts
+ (testing "worker-launcher has nil childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts nil
+ expected-childopts nil
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-nil-ids
+ (testing "worker-launcher has nil ids"
+ (let [worker-id nil
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+ (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
+ ))))
++