You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/03/04 19:13:39 UTC
[1/6] storm git commit: Change default temp dir for workers to worker
launch directory.
Repository: storm
Updated Branches:
refs/heads/master 595ed28e4 -> 96f81d793
Change default temp dir for workers to worker launch directory.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56a7a022
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56a7a022
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56a7a022
Branch: refs/heads/master
Commit: 56a7a022de62c122320d619dc153824b57b53be6
Parents: 31b57e8
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Sat Feb 6 02:32:20 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Sat Feb 6 02:32:20 2016 -0600
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 4 ++++
.../src/jvm/org/apache/storm/utils/ConfigUtils.java | 10 ++++++++++
storm-core/test/clj/org/apache/storm/supervisor_test.clj | 8 +++++++-
3 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/56a7a022/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 25f8968..e14c861 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -263,6 +263,7 @@
(rmr (ConfigUtils/workerHeartbeatsRoot conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
(rmr (ConfigUtils/workerPidsRoot conf id))
+ (rmr (ConfigUtils/workerTmpRoot conf id))
(rmr (ConfigUtils/workerRoot conf id))))
(ConfigUtils/removeWorkerUserWSE conf id)
(remove-dead-worker id)
@@ -376,6 +377,7 @@
(log-message "Launching worker with assignment "
(get-worker-assignment-helper-msg assignment supervisor port id))
(local-mkdirs (ConfigUtils/workerPidsRoot conf id))
+ (local-mkdirs (ConfigUtils/workerTmpRoot conf id))
(local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
(launch-worker supervisor
(:storm-id assignment)
@@ -1044,6 +1046,7 @@
storm-home (System/getProperty "storm.home")
storm-options (System/getProperty "storm.options")
storm-conf-file (System/getProperty "storm.conf.file")
+ worker-tmp-dir (ConfigUtils/workerTmpRoot conf worker-id)
storm-log-dir (ConfigUtils/getLogDir)
storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR)
storm-log4j2-conf-dir (if storm-log-conf-dir
@@ -1113,6 +1116,7 @@
(str "-Dstorm.conf.file=" storm-conf-file)
(str "-Dstorm.options=" storm-options)
(str "-Dstorm.log.dir=" storm-log-dir)
+ (str "-Djava.io.tmpdir=" worker-tmp-dir)
(str "-Dlogging.sensitivity=" logging-sensitivity)
(str "-Dlog4j.configurationFile=" log4j-configuration-file)
(str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector")
http://git-wip-us.apache.org/repos/asf/storm/blob/56a7a022/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 54523f9..e7bfec2 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -453,7 +453,12 @@ public class ConfigUtils {
return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port));
}
+ // we use this "wired" wrapper pattern temporarily for mocking in clojure test
public static String workerRoot(Map conf) {
+ return _instance.workerRootImpl(conf);
+ }
+
+ public String workerRootImpl(Map conf) {
return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers");
}
@@ -465,6 +470,11 @@ public class ConfigUtils {
return (workerRoot(conf, id) + FILE_SEPARATOR + "pids");
}
+ public static String workerTmpRoot(Map conf, String id) {
+ return (workerRoot(conf, id) + FILE_SEPARATOR + "tmp");
+ }
+
+
public static String workerPidPath(Map conf, String id, String pid) {
return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/56a7a022/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index edb161b..91c4057 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -299,6 +299,7 @@
"-Dstorm.conf.file="
"-Dstorm.options="
(str "-Dstorm.log.dir=" file-path-separator "logs")
+ (str "-Djava.io.tmpdir=/tmp/workers" file-path-separator mock-worker-id file-path-separator "tmp")
(str "-Dlogging.sensitivity=" mock-sensitivity)
(str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
@@ -325,6 +326,7 @@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
(stubbing [add-to-classpath mock-cp
launch-process nil
@@ -352,6 +354,7 @@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
(stubbing [add-to-classpath mock-cp
launch-process nil
@@ -376,6 +379,7 @@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
(stubbing [supervisor/jlp nil
supervisor/write-log-metadata! nil
@@ -401,6 +405,7 @@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
(stubbing [supervisor/jlp nil
launch-process nil
@@ -455,6 +460,7 @@
" '-Dstorm.conf.file='"
" '-Dstorm.options='"
" '-Dstorm.log.dir=/logs'"
+ " '-Djava.io.tmpdir=" (str storm-local "/workers/" mock-worker-id "/tmp'")
" '-Dlogging.sensitivity=" mock-sensitivity "'"
" '-Dlog4j.configurationFile=/log4j2/worker.xml'"
" '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
@@ -752,4 +758,4 @@
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
(get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
\ No newline at end of file
+ )))
[5/6] storm git commit: Merge branch 'storm1529' of
https://github.com/kishorvpatil/incubator-storm
Posted by ki...@apache.org.
Merge branch 'storm1529' of https://github.com/kishorvpatil/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9c53f669
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9c53f669
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9c53f669
Branch: refs/heads/master
Commit: 9c53f669c97acc3b714d6fa1fd02d96f86e1f450
Parents: 595ed28 761296a
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Mar 4 17:51:46 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Mar 4 17:51:46 2016 +0000
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 4 ++++
.../src/jvm/org/apache/storm/utils/ConfigUtils.java | 10 ++++++++++
storm-core/test/clj/org/apache/storm/supervisor_test.clj | 6 ++++++
3 files changed, 20 insertions(+)
----------------------------------------------------------------------
[4/6] storm git commit: Merge branch 'master' of
github.com:apache/storm into storm1529
Posted by ki...@apache.org.
Merge branch 'master' of github.com:apache/storm into storm1529
Conflicts:
storm-core/test/clj/org/apache/storm/supervisor_test.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/761296a1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/761296a1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/761296a1
Branch: refs/heads/master
Commit: 761296a1aba34e173c96314a31a9099aa4af3693
Parents: eddfea3 db04ce6
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Mar 4 17:22:29 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Mar 4 17:22:29 2016 +0000
----------------------------------------------------------------------
.gitignore | 1 +
CHANGELOG.md | 34 +-
README.markdown | 7 +-
bin/flight.bash | 4 +-
bin/storm.cmd | 4 +-
bin/storm.py | 10 +-
conf/defaults.yaml | 4 +-
.../storm/starter/ThroughputVsLatency.java | 2 +-
.../apache/storm/sql/compiler/CompilerUtil.java | 7 +-
.../apache/storm/sql/compiler/ExprCompiler.java | 32 +-
.../backends/standalone/RelNodeCompiler.java | 6 +-
.../apache/storm/sql/parser/StormParser.java | 5 +
.../test/org/apache/storm/sql/TestStormSql.java | 64 +-
.../storm/sql/compiler/TestCompilerUtils.java | 62 +-
.../storm/sql/compiler/TestExprSemantic.java | 18 +
.../backends/standalone/TestPlanCompiler.java | 20 +
.../backends/trident/TestPlanCompiler.java | 4 +-
.../test/org/apache/storm/sql/TestUtils.java | 32 +-
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 10 +-
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 8 +-
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 8 +-
.../storm/hdfs/bolt/SequenceFileBolt.java | 8 +-
.../org/apache/storm/kafka/IntSerializer.java | 10 +-
.../apache/storm/kafka/PartitionManager.java | 5 +-
.../kafka/trident/TridentKafkaEmitter.java | 5 +-
pom.xml | 23 +
.../src/clj/org/apache/storm/thrift.clj | 2 +-
.../src/clj/org/apache/storm/MockAutoCred.clj | 58 --
storm-core/src/clj/org/apache/storm/cluster.clj | 700 -------------------
.../cluster_state/zookeeper_state_factory.clj | 165 -----
.../clj/org/apache/storm/command/heartbeats.clj | 6 +-
.../clj/org/apache/storm/command/monitor.clj | 37 -
.../clj/org/apache/storm/command/rebalance.clj | 47 --
.../org/apache/storm/command/set_log_level.clj | 76 --
.../apache/storm/command/shell_submission.clj | 2 +-
.../apache/storm/command/upload_credentials.clj | 35 -
.../src/clj/org/apache/storm/converter.clj | 23 +-
.../src/clj/org/apache/storm/daemon/acker.clj | 108 ---
.../src/clj/org/apache/storm/daemon/common.clj | 29 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 38 +-
.../clj/org/apache/storm/daemon/executor.clj | 13 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 65 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 165 +++--
.../clj/org/apache/storm/daemon/supervisor.clj | 57 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 68 +-
.../clj/org/apache/storm/internal/thrift.clj | 2 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 141 ----
.../clj/org/apache/storm/process_simulator.clj | 49 --
storm-core/src/clj/org/apache/storm/stats.clj | 3 +-
storm-core/src/clj/org/apache/storm/testing.clj | 28 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 132 ++--
.../src/clj/org/apache/storm/ui/helpers.clj | 199 +-----
storm-core/src/clj/org/apache/storm/util.clj | 11 +
.../src/clj/org/apache/storm/zookeeper.clj | 74 --
storm-core/src/jvm/org/apache/storm/Config.java | 9 +
.../jvm/org/apache/storm/ProcessSimulator.java | 82 +++
.../storm/blobstore/LocalFsBlobStore.java | 2 +-
.../jvm/org/apache/storm/callback/Callback.java | 23 -
.../storm/callback/ZKStateChangedCallback.java | 25 +
.../org/apache/storm/cluster/ClusterState.java | 217 ------
.../storm/cluster/ClusterStateContext.java | 2 +-
.../storm/cluster/ClusterStateFactory.java | 28 -
.../org/apache/storm/cluster/ClusterUtils.java | 244 +++++++
.../org/apache/storm/cluster/ExecutorBeat.java | 44 ++
.../org/apache/storm/cluster/IStateStorage.java | 222 ++++++
.../storm/cluster/IStormClusterState.java | 124 ++++
.../storm/cluster/PaceMakerStateStorage.java | 216 ++++++
.../cluster/PaceMakerStateStorageFactory.java | 64 ++
.../storm/cluster/StateStorageFactory.java | 28 +
.../storm/cluster/StormClusterStateImpl.java | 697 ++++++++++++++++++
.../apache/storm/cluster/ZKStateStorage.java | 244 +++++++
.../storm/cluster/ZKStateStorageFactory.java | 36 +
.../src/jvm/org/apache/storm/command/CLI.java | 34 +-
.../src/jvm/org/apache/storm/command/List.java | 50 --
.../apache/storm/command/ListTopologies.java | 52 ++
.../jvm/org/apache/storm/command/Monitor.java | 65 ++
.../jvm/org/apache/storm/command/Rebalance.java | 86 +++
.../org/apache/storm/command/SetLogLevel.java | 116 +++
.../apache/storm/command/UploadCredentials.java | 61 ++
.../src/jvm/org/apache/storm/daemon/Acker.java | 128 ++++
.../storm/daemon/metrics/MetricsUtils.java | 2 +-
.../storm/metric/FileBasedEventLogger.java | 18 +-
.../apache/storm/pacemaker/PacemakerClient.java | 6 +-
.../apache/storm/security/auth/AuthUtils.java | 40 ++
.../security/auth/ThriftConnectionType.java | 2 +-
.../storm/security/auth/kerberos/AutoTGT.java | 64 +-
.../auth/kerberos/AutoTGTKrb5LoginModule.java | 8 +-
.../serialization/SerializationFactory.java | 2 +
.../testing/staticmocking/MockedCluster.java | 31 +
.../MockedPaceMakerStateStorageFactory.java | 32 +
.../apache/storm/topology/TopologyBuilder.java | 13 +-
.../apache/storm/trident/tuple/ConsList.java | 20 +-
.../apache/storm/ui/FilterConfiguration.java | 63 ++
.../jvm/org/apache/storm/ui/IConfigurator.java | 24 +
.../src/jvm/org/apache/storm/ui/UIHelpers.java | 267 +++++++
.../jvm/org/apache/storm/utils/ConfigUtils.java | 8 +-
.../src/jvm/org/apache/storm/utils/Time.java | 1 +
.../src/jvm/org/apache/storm/utils/Utils.java | 91 ++-
.../storm/utils/WorkerBackpressureCallback.java | 2 +-
.../storm/utils/WorkerBackpressureThread.java | 38 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 77 +-
storm-core/src/ui/public/component.html | 2 +-
.../templates/topology-page-template.html | 6 +-
storm-core/src/ui/public/topology.html | 2 +-
.../org/apache/storm/integration_test.clj | 13 +-
.../test/clj/org/apache/storm/cluster_test.clj | 202 +++---
.../storm/messaging/netty_integration_test.clj | 1 +
.../test/clj/org/apache/storm/nimbus_test.clj | 166 ++---
.../storm/pacemaker_state_factory_test.clj | 121 ++--
.../security/auth/auto_login_module_test.clj | 24 +-
.../storm/security/auth/nimbus_auth_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 173 ++---
.../test/clj/org/apache/storm/utils_test.clj | 111 ---
.../test/jvm/org/apache/storm/MockAutoCred.java | 75 ++
.../org/apache/storm/command/RebalanceTest.java | 41 ++
.../apache/storm/command/SetLogLevelTest.java | 54 ++
.../jvm/org/apache/storm/command/TestCLI.java | 44 +-
.../storm/topology/TopologyBuilderTest.java | 65 ++
.../jvm/org/apache/storm/utils/TimeTest.java | 112 +++
.../jvm/org/apache/storm/utils/UtilsTest.java | 219 ++++++
.../utils/WorkerBackpressureThreadTest.java | 50 ++
.../storm/utils/staticmocking/package-info.java | 2 +-
122 files changed, 4931 insertions(+), 2989 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/761296a1/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/761296a1/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/761296a1/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index eb02e50,415a56d..42dd766
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -321,8 -322,7 +322,8 @@@
"-Dworkers.artifacts=/tmp/workers-artifacts"
"-Dstorm.conf.file="
"-Dstorm.options="
- (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs")
+ (str "-Dstorm.log.dir=" storm-log-dir)
+ (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
(str "-Dlogging.sensitivity=" mock-sensitivity)
(str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
@@@ -517,8 -514,7 +519,8 @@@
" '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
" '-Dstorm.conf.file='"
" '-Dstorm.options='"
- " '-Dstorm.log.dir=/logs'"
+ " '-Dstorm.log.dir=" storm-log-dir "'"
+ " '-Djava.io.tmpdir=" (str storm-local "/workers/" mock-worker-id "/tmp'")
" '-Dlogging.sensitivity=" mock-sensitivity "'"
" '-Dlog4j.configurationFile=/log4j2/worker.xml'"
" '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
[6/6] storm git commit: Added STORM-1529 ot Changelog
Posted by ki...@apache.org.
Added STORM-1529 ot Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96f81d79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96f81d79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96f81d79
Branch: refs/heads/master
Commit: 96f81d7930316f5faa8c9bfb5db40aa3e270bfec
Parents: 9c53f66
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Mar 4 17:53:09 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Mar 4 17:53:09 2016 +0000
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/96f81d79/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bbaec75..82bf6b1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1529: Change default worker temp directory location for workers
* STORM-1543: DRPCSpout should always try to reconnect disconnected DRPCInvocationsClient
* STORM-1528: Fix CsvPreparableReporter log directory
* STORM-1561: Supervisor should relaunch worker if assignments have changed
[2/6] storm git commit: Merge branch 'master' of
github.com:apache/storm into storm1529
Posted by ki...@apache.org.
Merge branch 'master' of github.com:apache/storm into storm1529
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/61c9702a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/61c9702a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/61c9702a
Branch: refs/heads/master
Commit: 61c9702a924eb6b1b8bd3f0a829678a50f02bb67
Parents: 56a7a02 12ceb09
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Mon Feb 15 23:39:27 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Mon Feb 15 23:39:27 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 11 +
README.markdown | 1 +
bin/storm-config.cmd | 6 +-
bin/storm.cmd | 47 +-
bin/storm.py | 8 +-
dev-tools/travis/travis-script.sh | 4 +-
external/sql/storm-sql-core/pom.xml | 9 +
external/storm-elasticsearch/pom.xml | 2 +
.../storm/hbase/security/HBaseSecurityUtil.java | 36 +-
external/storm-mqtt/core/pom.xml | 4 +-
log4j2/cluster.xml | 2 +-
log4j2/worker.xml | 2 +-
pom.xml | 9 +-
storm-core/pom.xml | 11 +-
.../src/clj/org/apache/storm/LocalCluster.clj | 4 +-
storm-core/src/clj/org/apache/storm/clojure.clj | 8 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 27 +-
.../cluster_state/zookeeper_state_factory.clj | 14 +-
.../clj/org/apache/storm/command/blobstore.clj | 11 +-
.../org/apache/storm/command/config_value.clj | 25 -
.../org/apache/storm/command/dev_zookeeper.clj | 6 +-
.../clj/org/apache/storm/command/get_errors.clj | 12 +-
.../apache/storm/command/shell_submission.clj | 4 +-
storm-core/src/clj/org/apache/storm/config.clj | 18 +-
.../src/clj/org/apache/storm/converter.clj | 14 +-
.../src/clj/org/apache/storm/daemon/acker.clj | 21 +-
.../src/clj/org/apache/storm/daemon/common.clj | 29 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 23 +-
.../clj/org/apache/storm/daemon/executor.clj | 530 +++++-----
.../clj/org/apache/storm/daemon/logviewer.clj | 68 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 170 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 204 ++--
.../src/clj/org/apache/storm/daemon/task.clj | 2 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 78 +-
.../src/clj/org/apache/storm/disruptor.clj | 10 +-
storm-core/src/clj/org/apache/storm/event.clj | 2 +-
.../src/clj/org/apache/storm/local_state.clj | 9 +-
.../clj/org/apache/storm/messaging/loader.clj | 34 -
.../clj/org/apache/storm/messaging/local.clj | 23 -
.../org/apache/storm/pacemaker/pacemaker.clj | 7 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 24 +-
.../clj/org/apache/storm/process_simulator.clj | 4 +-
.../apache/storm/scheduler/DefaultScheduler.clj | 7 +-
.../apache/storm/scheduler/EvenScheduler.clj | 23 +-
.../storm/scheduler/IsolationScheduler.clj | 29 +-
storm-core/src/clj/org/apache/storm/stats.clj | 82 +-
storm-core/src/clj/org/apache/storm/testing.clj | 89 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 6 +-
storm-core/src/clj/org/apache/storm/timer.clj | 12 +-
.../clj/org/apache/storm/trident/testing.clj | 9 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 97 +-
.../src/clj/org/apache/storm/ui/helpers.clj | 14 +-
storm-core/src/clj/org/apache/storm/util.clj | 923 +----------------
.../src/clj/org/apache/storm/zookeeper.clj | 1 -
.../org/apache/storm/command/ConfigValue.java | 30 +
.../storm/logging/ThriftAccessLogger.java | 13 +-
.../serialization/SerializationFactory.java | 17 +-
.../staticmocking/MockedConfigUtils.java | 31 -
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +-
.../jvm/org/apache/storm/utils/Container.java | 11 +-
.../jvm/org/apache/storm/utils/IPredicate.java | 27 +
.../org/apache/storm/utils/NimbusClient.java | 2 +-
.../utils/StormConnectionStateConverter.java | 44 +
.../jvm/org/apache/storm/utils/TestUtils.java | 34 -
.../src/jvm/org/apache/storm/utils/Time.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 989 +++++++++++++++++--
.../storm/validation/ConfigValidation.java | 2 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 7 +
.../org/apache/storm/integration_test.clj | 100 +-
.../org/apache/storm/testing4j_test.clj | 37 +-
.../apache/storm/trident/integration_test.clj | 15 +-
.../test/clj/org/apache/storm/cluster_test.clj | 20 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../clj/org/apache/storm/logviewer_test.clj | 267 ++---
.../storm/messaging/netty_integration_test.clj | 2 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 131 ++-
.../scheduler/resource_aware_scheduler_test.clj | 21 +-
.../apache/storm/security/auth/auth_test.clj | 11 +-
.../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +-
.../BlowfishTupleSerializer_test.clj | 1 -
.../clj/org/apache/storm/serialization_test.clj | 23 +-
.../clj/org/apache/storm/supervisor_test.clj | 672 +++++++------
.../clj/org/apache/storm/transactional_test.clj | 18 +
.../clj/org/apache/storm/trident/state_test.clj | 5 +-
.../clj/org/apache/storm/trident/tuple_test.clj | 15 +-
.../test/clj/org/apache/storm/utils_test.clj | 16 +-
.../test/clj/org/apache/storm/worker_test.clj | 1 -
.../staticmocking/ConfigUtilsInstaller.java | 38 +
.../utils/staticmocking/UtilsInstaller.java | 38 +
.../storm/utils/staticmocking/package-info.java | 95 ++
90 files changed, 3194 insertions(+), 2435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index e14c861,ae9e92f..8d1b6a6
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -260,11 -264,10 +264,11 @@@
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(rmr-as-user conf id (ConfigUtils/workerRoot conf id))
(do
- (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
+ (Utils/forceDelete (ConfigUtils/workerHeartbeatsRoot conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
- (rmr (ConfigUtils/workerPidsRoot conf id))
- (rmr (ConfigUtils/workerTmpRoot conf id))
- (rmr (ConfigUtils/workerRoot conf id))))
+ (Utils/forceDelete (ConfigUtils/workerPidsRoot conf id))
++ (Utils/forceDelete (ConfigUtils/workerTmpRoot conf id))
+ (Utils/forceDelete (ConfigUtils/workerRoot conf id))))
(ConfigUtils/removeWorkerUserWSE conf id)
(remove-dead-worker id)
))
@@@ -373,12 -378,12 +379,13 @@@
mem-onheap (.get_mem_on_heap resources)]
;; This condition checks for required files exist before launching the worker
(if (required-topo-files-exist? conf storm-id)
- (do
+ (let [pids-path (ConfigUtils/workerPidsRoot conf id)
+ hb-path (ConfigUtils/workerHeartbeatsRoot conf id)]
(log-message "Launching worker with assignment "
(get-worker-assignment-helper-msg assignment supervisor port id))
- (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
- (local-mkdirs (ConfigUtils/workerTmpRoot conf id))
- (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
+ (FileUtils/forceMkdir (File. pids-path))
++ (FileUtils/forceMkdir (File. (ConfigUtils/workerTmpRoot conf id)))
+ (FileUtils/forceMkdir (File. hb-path))
(launch-worker supervisor
(:storm-id assignment)
port
http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 91c4057,9c31ddf..da5a5bc
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -298,10 -316,9 +316,10 @@@
"-Dworkers.artifacts=/tmp/workers-artifacts"
"-Dstorm.conf.file="
"-Dstorm.options="
- (str "-Dstorm.log.dir=" file-path-separator "logs")
- (str "-Djava.io.tmpdir=/tmp/workers" file-path-separator mock-worker-id file-path-separator "tmp")
+ (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs")
++ (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
(str "-Dlogging.sensitivity=" mock-sensitivity)
- (str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
+ (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
(str "-Dstorm.id=" mock-storm-id)
(str "-Dworker.id=" mock-worker-id)
@@@ -326,13 -348,12 +349,13 @@@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
- (stubbing [add-to-classpath mock-cp
- launch-process nil
- supervisor/jlp nil
- supervisor/write-log-metadata! nil
- supervisor/create-blobstore-links nil]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
+ (stubbing [supervisor/jlp nil
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@@ -348,45 -373,58 +375,60 @@@
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
WORKER-CHILDOPTS list-opts}}
mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts}]
- (with-open [_ (proxy [MockedConfigUtils] []
- (supervisorStormDistRootImpl ([conf] nil)
- ([conf storm-id] nil))
- (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
- (setWorkerUserWSEImpl [conf worker-id user] nil)
- (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
- (stubbing [add-to-classpath mock-cp
- launch-process nil
- supervisor/jlp nil
- supervisor/write-log-metadata! nil
- supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
- mock-storm-id
- mock-port
- mock-worker-id
- mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
- [0]
- exp-args)))))
+ topo-list-opts}
+ cu-proxy (proxy [ConfigUtils] []
+ (supervisorStormDistRootImpl ([conf] nil)
+ ([conf storm-id] nil))
+ (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
+ (setWorkerUserWSEImpl [conf worker-id user] nil)
++ (workerRootImpl [conf] "/tmp/workers")
+ (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ utils-spy (->>
+ (proxy [Utils] []
+ (addToClasspathImpl [classpath paths] mock-cp)
+ (launchProcessImpl [& _] nil))
+ Mockito/spy)]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
+ (stubbing [supervisor/jlp nil
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id
+ mock-mem-onheap)
+ (. (Mockito/verify utils-spy)
+ (launchProcessImpl (Matchers/eq exp-args)
+ (Matchers/any)
+ (Matchers/any)
+ (Matchers/any)
+ (Matchers/any)))))))
+
(testing "testing topology.classpath is added to classpath"
- (let [topo-cp (str file-path-separator "any" file-path-separator "path")
- exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+ (let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
+ exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
- mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
- (with-open [_ (proxy [MockedConfigUtils] []
+ mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
+ cu-proxy (proxy [ConfigUtils] []
(supervisorStormDistRootImpl ([conf] nil)
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+ (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ utils-spy (->>
+ (proxy [Utils] []
+ (currentClasspathImpl []
+ (str Utils/FILE_PATH_SEPARATOR "base"))
+ (launchProcessImpl [& _] nil))
+ Mockito/spy)]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
(stubbing [supervisor/jlp nil
supervisor/write-log-metadata! nil
- launch-process nil
- current-classpath (str file-path-separator "base")
supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
+ (supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id
@@@ -405,21 -446,29 +450,30 @@@
([conf storm-id] nil))
(readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
(setWorkerUserWSEImpl [conf worker-id user] nil)
+ (workerRootImpl [conf] "/tmp/workers")
- (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+ (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ utils-spy (->>
+ (proxy [Utils] []
+ (currentClasspathImpl []
+ (str Utils/FILE_PATH_SEPARATOR "base"))
+ (launchProcessImpl [& _] nil))
+ Mockito/spy)]
+ (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+ _ (UtilsInstaller. utils-spy)]
(stubbing [supervisor/jlp nil
- launch-process nil
- supervisor/write-log-metadata! nil
- current-classpath (str file-path-separator "base")
- supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
- mock-storm-id
- mock-port
- mock-worker-id
- mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
- [2]
- full-env))))))))
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id
+ mock-mem-onheap)
+ (. (Mockito/verify utils-spy)
+ (launchProcessImpl (Matchers/any)
+ (Matchers/eq full-env)
+ (Matchers/any)
+ (Matchers/any)
+ (Matchers/any))))))))))
(deftest test-worker-launch-command-run-as-user
(testing "*.worker.childopts configuration"
@@@ -578,184 -646,189 +652,190 @@@
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- expected-acls))))))
-
- (deftest test-write-log-metadata
- (testing "supervisor writes correct data to logs metadata file"
- (let [exp-owner "alice"
- exp-worker-id "42"
- exp-storm-id "0123456789"
- exp-port 4242
- exp-logs-users ["bob" "charlie" "daryl"]
- exp-logs-groups ["read-only-group" "special-group"]
- storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
- TOPOLOGY-USERS ["charlie" "bob"]
- TOPOLOGY-GROUPS ["special-group"]
- LOGS-GROUPS ["read-only-group"]
- LOGS-USERS ["daryl"]}
- exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
- "worker-id" exp-worker-id
- LOGS-USERS exp-logs-users
- LOGS-GROUPS exp-logs-groups}
- conf {}]
- (mocking [supervisor/write-log-metadata-to-yaml-file!]
- (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
- exp-storm-id exp-port conf)
- (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
- exp-storm-id exp-port exp-data conf)))))
-
- (deftest test-worker-launcher-requires-user
- (testing "worker-launcher throws on blank user"
- (mocking [launch-process]
- (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
- #"(?i).*user cannot be blank.*"
- (supervisor/worker-launcher {} nil ""))))))
-
- (defn found? [sub-str input-str]
- (if (string? input-str)
- (contrib-str/substring? sub-str (str input-str))
- (boolean (some #(contrib-str/substring? sub-str %) input-str))))
-
- (defn not-found? [sub-str input-str]
+ expected-acls)))))
+
+ (deftest test-write-log-metadata
+ (testing "supervisor writes correct data to logs metadata file"
+ (let [exp-owner "alice"
+ exp-worker-id "42"
+ exp-storm-id "0123456789"
+ exp-port 4242
+ exp-logs-users ["bob" "charlie" "daryl"]
+ exp-logs-groups ["read-only-group" "special-group"]
+ storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+ TOPOLOGY-USERS ["charlie" "bob"]
+ TOPOLOGY-GROUPS ["special-group"]
+ LOGS-GROUPS ["read-only-group"]
+ LOGS-USERS ["daryl"]}
+ exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+ "worker-id" exp-worker-id
+ LOGS-USERS exp-logs-users
+ LOGS-GROUPS exp-logs-groups}
+ conf {}]
+ (mocking [supervisor/write-log-metadata-to-yaml-file!]
+ (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+ exp-storm-id exp-port conf)
+ (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+ exp-storm-id exp-port exp-data conf)))))
+
+ (deftest test-worker-launcher-requires-user
+ (testing "worker-launcher throws on blank user"
+ (let [utils-proxy (proxy [Utils] []
+ (launchProcessImpl [& _] nil))]
+ (with-open [_ (UtilsInstaller. utils-proxy)]
+ (is (try
+ (supervisor/worker-launcher {} nil "")
+ false
+ (catch Throwable t
+ (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+ (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
+
+ (defn found? [sub-str input-str]
+ (if (string? input-str)
+ (contrib-str/substring? sub-str (str input-str))
+ (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+
+ (defn not-found? [sub-str input-str]
(complement (found? sub-str input-str)))
- (deftest test-substitute-childopts-happy-path-string
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-happy-path-list
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-happy-path-list-arraylist
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-topology-id-alone
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-no-keys
- (testing "worker-launcher has no ids to replace in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-nil-childopts
- (testing "worker-launcher has nil childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts nil
- expected-childopts nil
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-substitute-childopts-nil-ids
- (testing "worker-launcher has nil ids"
- (let [worker-id nil
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
-
- (deftest test-retry-read-assignments
- (with-simulated-time-local-cluster [cluster
- :supervisors 0
- :ports-per-supervisor 2
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
- TOPOLOGY-ACKER-EXECUTORS 0}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind topology1 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind topology2 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind state (:storm-cluster-state cluster))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology1"
- {TOPOLOGY-WORKERS 2}
- topology1
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- ))
- (is (empty? (:launched changed)))
- (bind options (RebalanceOptions.))
- (.set_wait_secs options 0)
- (bind changed (capture-changed-workers
- (.rebalance (:nimbus cluster) "topology2" options)
- (advance-cluster-time cluster 10)
- (heartbeat-workers cluster "sup1" [1 2 3 4])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed)
- {"sup1" [1 2]}
- (get-storm-id (:storm-cluster-state cluster) "topology1"))
- (validate-launched-once (:launched changed)
- {"sup1" [3 4]}
- (get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
+ (deftest test-substitute-childopts-happy-path-string
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-happy-path-list
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-happy-path-list-arraylist
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-topology-id-alone
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-no-keys
+ (testing "worker-launcher has no ids to replace in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-nil-childopts
+ (testing "worker-launcher has nil childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts nil
+ expected-childopts nil
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-substitute-childopts-nil-ids
+ (testing "worker-launcher has nil ids"
+ (let [worker-id nil
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
+
+ (deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+ (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
+ ))))
++
[3/6] storm git commit: Merge branch 'master' of
github.com:apache/storm into storm1529
Posted by ki...@apache.org.
Merge branch 'master' of github.com:apache/storm into storm1529
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eddfea36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eddfea36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eddfea36
Branch: refs/heads/master
Commit: eddfea3615a54834425582649a2fb9ba9706dc1c
Parents: 61c9702 e543bbf
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Feb 19 14:21:11 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 19 14:21:11 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 17 +
README.markdown | 1 +
bin/storm-config.cmd | 4 +
bin/storm.cmd | 24 +-
bin/storm.py | 12 +-
conf/cgconfig.conf.example | 41 +++
conf/defaults.yaml | 16 +-
examples/storm-starter/pom.xml | 10 +
.../org/apache/storm/starter/clj/word_count.clj | 3 +-
.../starter/ResourceAwareExampleTopology.java | 2 +-
.../spout/RandomNumberGeneratorSpout.java | 95 +++++
.../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++
.../TridentMinMaxOfVehiclesTopology.java | 180 ++++++++++
external/storm-hdfs/pom.xml | 23 +-
pom.xml | 16 +
storm-clojure/pom.xml | 74 ++++
.../src/clj/org/apache/storm/clojure.clj | 207 +++++++++++
.../src/clj/org/apache/storm/thrift.clj | 286 +++++++++++++++
storm-clojure/src/test/clj/clojure_test.clj | 158 +++++++++
storm-core/pom.xml | 9 +
storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
.../clj/org/apache/storm/command/activate.clj | 24 --
.../clj/org/apache/storm/command/deactivate.clj | 24 --
.../org/apache/storm/command/dev_zookeeper.clj | 28 --
.../clj/org/apache/storm/command/get_errors.clj | 3 +-
.../org/apache/storm/command/healthcheck.clj | 90 -----
.../org/apache/storm/command/kill_topology.clj | 29 --
.../src/clj/org/apache/storm/command/list.clj | 38 --
.../clj/org/apache/storm/command/monitor.clj | 2 +-
.../clj/org/apache/storm/command/rebalance.clj | 3 +-
.../org/apache/storm/command/set_log_level.clj | 3 +-
.../apache/storm/command/shell_submission.clj | 2 +-
.../src/clj/org/apache/storm/daemon/common.clj | 121 ++++---
.../clj/org/apache/storm/daemon/executor.clj | 114 +++---
.../clj/org/apache/storm/daemon/logviewer.clj | 19 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 104 +++---
.../clj/org/apache/storm/daemon/supervisor.clj | 251 +++++++++----
.../src/clj/org/apache/storm/daemon/task.clj | 4 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 170 +++++----
.../src/clj/org/apache/storm/disruptor.clj | 89 -----
storm-core/src/clj/org/apache/storm/event.clj | 71 ----
.../clj/org/apache/storm/internal/clojure.clj | 201 +++++++++++
.../clj/org/apache/storm/internal/thrift.clj | 96 +++++
.../src/clj/org/apache/storm/local_state.clj | 134 -------
.../org/apache/storm/local_state_converter.clj | 24 ++
storm-core/src/clj/org/apache/storm/testing.clj | 37 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 286 ---------------
storm-core/src/clj/org/apache/storm/timer.clj | 128 -------
storm-core/src/clj/org/apache/storm/ui/core.clj | 2 +-
storm-core/src/jvm/org/apache/storm/Config.java | 88 +++++
.../src/jvm/org/apache/storm/StormTimer.java | 241 +++++++++++++
storm-core/src/jvm/org/apache/storm/Thrift.java | 351 ++++++++++++++++++
.../jvm/org/apache/storm/command/Activate.java | 40 +++
.../src/jvm/org/apache/storm/command/CLI.java | 353 +++++++++++++++++++
.../org/apache/storm/command/Deactivate.java | 40 +++
.../org/apache/storm/command/DevZookeeper.java | 35 ++
.../org/apache/storm/command/HealthCheck.java | 125 +++++++
.../org/apache/storm/command/KillTopology.java | 51 +++
.../src/jvm/org/apache/storm/command/List.java | 50 +++
.../container/ResourceIsolationInterface.java | 51 +++
.../storm/container/cgroup/CgroupCenter.java | 216 ++++++++++++
.../storm/container/cgroup/CgroupCommon.java | 270 ++++++++++++++
.../container/cgroup/CgroupCommonOperation.java | 81 +++++
.../container/cgroup/CgroupCoreFactory.java | 74 ++++
.../storm/container/cgroup/CgroupManager.java | 210 +++++++++++
.../storm/container/cgroup/CgroupOperation.java | 79 +++++
.../storm/container/cgroup/CgroupUtils.java | 118 +++++++
.../apache/storm/container/cgroup/Device.java | 75 ++++
.../storm/container/cgroup/Hierarchy.java | 130 +++++++
.../storm/container/cgroup/SubSystem.java | 81 +++++
.../storm/container/cgroup/SubSystemType.java | 36 ++
.../storm/container/cgroup/SystemOperation.java | 75 ++++
.../storm/container/cgroup/core/BlkioCore.java | 213 +++++++++++
.../storm/container/cgroup/core/CgroupCore.java | 26 ++
.../storm/container/cgroup/core/CpuCore.java | 135 +++++++
.../container/cgroup/core/CpuacctCore.java | 71 ++++
.../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
.../container/cgroup/core/DevicesCore.java | 189 ++++++++++
.../container/cgroup/core/FreezerCore.java | 66 ++++
.../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
.../storm/container/cgroup/core/NetClsCore.java | 69 ++++
.../container/cgroup/core/NetPrioCore.java | 65 ++++
.../org/apache/storm/event/EventManager.java | 24 ++
.../org/apache/storm/event/EventManagerImp.java | 97 +++++
.../jvm/org/apache/storm/testing/NGrouping.java | 4 +-
.../storm/testing/PythonShellMetricsBolt.java | 14 +-
.../storm/testing/PythonShellMetricsSpout.java | 8 +-
.../jvm/org/apache/storm/trident/Stream.java | 121 ++++++-
.../operation/builtin/ComparisonAggregator.java | 91 +++++
.../storm/trident/operation/builtin/Max.java | 37 ++
.../operation/builtin/MaxWithComparator.java | 51 +++
.../storm/trident/operation/builtin/Min.java | 36 ++
.../operation/builtin/MinWithComparator.java | 51 +++
.../org/apache/storm/utils/DisruptorQueue.java | 15 +-
.../jvm/org/apache/storm/utils/LocalState.java | 112 +++++-
.../org/apache/storm/utils/NimbusClient.java | 19 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 37 +-
.../org/apache/storm/integration_test.clj | 259 ++++++++------
.../org/apache/storm/testing4j_test.clj | 72 ++--
.../test/clj/org/apache/storm/clojure_test.clj | 64 ++--
.../test/clj/org/apache/storm/cluster_test.clj | 3 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../test/clj/org/apache/storm/grouping_test.clj | 56 +--
.../storm/messaging/netty_integration_test.clj | 18 +-
.../clj/org/apache/storm/messaging_test.clj | 14 +-
.../test/clj/org/apache/storm/metrics_test.clj | 85 +++--
.../test/clj/org/apache/storm/nimbus_test.clj | 260 +++++++++-----
.../scheduler/resource_aware_scheduler_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 175 ++++-----
.../clj/org/apache/storm/tick_tuple_test.clj | 15 +-
.../clj/org/apache/storm/transactional_test.clj | 3 +-
.../test/jvm/org/apache/storm/TestCgroups.java | 130 +++++++
.../jvm/org/apache/storm/command/TestCLI.java | 59 ++++
.../resource/TestResourceAwareScheduler.java | 3 +
114 files changed, 7742 insertions(+), 2001 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/eddfea36/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/eddfea36/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index da5a5bc,ef40c4a..eb02e50
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -775,67 -772,68 +778,69 @@@
childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
(is (= expected-childopts childopts-with-ids)))))
- (deftest test-retry-read-assignments
- (with-simulated-time-local-cluster [cluster
- :supervisors 0
- :ports-per-supervisor 2
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
- TOPOLOGY-ACKER-EXECUTORS 0}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind topology1 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind topology2 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind state (:storm-cluster-state cluster))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology1"
- {TOPOLOGY-WORKERS 2}
- topology1
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- ))
- (is (empty? (:launched changed)))
- (bind options (RebalanceOptions.))
- (.set_wait_secs options 0)
- (bind changed (capture-changed-workers
- (.rebalance (:nimbus cluster) "topology2" options)
- (advance-cluster-time cluster 10)
- (heartbeat-workers cluster "sup1" [1 2 3 4])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed)
- {"sup1" [1 2]}
- (get-storm-id (:storm-cluster-state cluster) "topology1"))
- (validate-launched-once (:launched changed)
- {"sup1" [3 4]}
- (get-storm-id (:storm-cluster-state cluster) "topology2"))
- ))))
+ (deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. true) (Integer. 2))}
+ {}))
+ (bind topology2 (Thrift/buildTopology
+ {"1" (Thrift/prepareSpoutDetails
+ (TestPlannerSpout. true) (Integer. 2))}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+ (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
+ )))
+