You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2016/03/04 19:13:39 UTC

[1/6] storm git commit: Change default temp dir for workers to worker launch directory.

Repository: storm
Updated Branches:
  refs/heads/master 595ed28e4 -> 96f81d793


Change default temp dir for workers to worker launch directory.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56a7a022
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56a7a022
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56a7a022

Branch: refs/heads/master
Commit: 56a7a022de62c122320d619dc153824b57b53be6
Parents: 31b57e8
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Sat Feb 6 02:32:20 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Sat Feb 6 02:32:20 2016 -0600

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/supervisor.clj |  4 ++++
 .../src/jvm/org/apache/storm/utils/ConfigUtils.java       | 10 ++++++++++
 storm-core/test/clj/org/apache/storm/supervisor_test.clj  |  8 +++++++-
 3 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/56a7a022/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 25f8968..e14c861 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -263,6 +263,7 @@
             (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
             ;; this avoids a race condition with worker or subprocess writing pid around same time
             (rmr (ConfigUtils/workerPidsRoot conf id))
+            (rmr (ConfigUtils/workerTmpRoot conf id))
             (rmr (ConfigUtils/workerRoot conf id))))
         (ConfigUtils/removeWorkerUserWSE conf id)
         (remove-dead-worker id)
@@ -376,6 +377,7 @@
               (log-message "Launching worker with assignment "
                 (get-worker-assignment-helper-msg assignment supervisor port id))
               (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
+              (local-mkdirs (ConfigUtils/workerTmpRoot conf id))
               (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
               (launch-worker supervisor
                 (:storm-id assignment)
@@ -1044,6 +1046,7 @@
           storm-home (System/getProperty "storm.home")
           storm-options (System/getProperty "storm.options")
           storm-conf-file (System/getProperty "storm.conf.file")
+          worker-tmp-dir (ConfigUtils/workerTmpRoot conf worker-id)
           storm-log-dir (ConfigUtils/getLogDir)
           storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR)
           storm-log4j2-conf-dir (if storm-log-conf-dir
@@ -1113,6 +1116,7 @@
                      (str "-Dstorm.conf.file=" storm-conf-file)
                      (str "-Dstorm.options=" storm-options)
                      (str "-Dstorm.log.dir=" storm-log-dir)
+                     (str "-Djava.io.tmpdir=" worker-tmp-dir)
                      (str "-Dlogging.sensitivity=" logging-sensitivity)
                      (str "-Dlog4j.configurationFile=" log4j-configuration-file)
                      (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector")

http://git-wip-us.apache.org/repos/asf/storm/blob/56a7a022/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 54523f9..e7bfec2 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -453,7 +453,12 @@ public class ConfigUtils {
         return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port));
     }
 
+    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
     public static String workerRoot(Map conf) {
+        return _instance.workerRootImpl(conf);
+    }
+
+    public String workerRootImpl(Map conf) {
         return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers");
     }
 
@@ -465,6 +470,11 @@ public class ConfigUtils {
         return (workerRoot(conf, id) + FILE_SEPARATOR + "pids");
     }
 
+    public static String workerTmpRoot(Map conf, String id) {
+        return (workerRoot(conf, id) + FILE_SEPARATOR + "tmp");
+    }
+
+
     public static String workerPidPath(Map conf, String id, String pid) {
         return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/56a7a022/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index edb161b..91c4057 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -299,6 +299,7 @@
                                 "-Dstorm.conf.file="
                                 "-Dstorm.options="
                                 (str "-Dstorm.log.dir=" file-path-separator "logs")
+                                (str "-Djava.io.tmpdir=/tmp/workers" file-path-separator mock-worker-id file-path-separator "tmp")
                                 (str "-Dlogging.sensitivity=" mock-sensitivity)
                                 (str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
                                 "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
@@ -325,6 +326,7 @@
                                                        ([conf storm-id] nil))
                           (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                           (setWorkerUserWSEImpl [conf worker-id user] nil)
+                          (workerRootImpl [conf] "/tmp/workers")
                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
               (stubbing [add-to-classpath mock-cp
                      launch-process nil
@@ -352,6 +354,7 @@
                                                          ([conf storm-id] nil))
                             (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                             (setWorkerUserWSEImpl [conf worker-id user] nil)
+                            (workerRootImpl [conf] "/tmp/workers")
                             (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
                 (stubbing [add-to-classpath mock-cp
                      launch-process nil
@@ -376,6 +379,7 @@
                                                        ([conf storm-id] nil))
                           (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                           (setWorkerUserWSEImpl [conf worker-id user] nil)
+                          (workerRootImpl [conf] "/tmp/workers")
                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
                 (stubbing [supervisor/jlp nil
                      supervisor/write-log-metadata! nil
@@ -401,6 +405,7 @@
                                                        ([conf storm-id] nil))
                           (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                           (setWorkerUserWSEImpl [conf worker-id user] nil)
+                          (workerRootImpl [conf] "/tmp/workers")
                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
             (stubbing [supervisor/jlp nil
                      launch-process nil
@@ -455,6 +460,7 @@
                                " '-Dstorm.conf.file='"
                                " '-Dstorm.options='"
                                " '-Dstorm.log.dir=/logs'"
+                               " '-Djava.io.tmpdir=" (str  storm-local "/workers/" mock-worker-id "/tmp'")
                                " '-Dlogging.sensitivity=" mock-sensitivity "'"
                                " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
                                " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
@@ -752,4 +758,4 @@
      (validate-launched-once (:launched changed)
                              {"sup1" [3 4]}
                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-     )))
\ No newline at end of file
+     )))


[5/6] storm git commit: Merge branch 'storm1529' of https://github.com/kishorvpatil/incubator-storm

Posted by ki...@apache.org.
Merge branch 'storm1529' of https://github.com/kishorvpatil/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9c53f669
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9c53f669
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9c53f669

Branch: refs/heads/master
Commit: 9c53f669c97acc3b714d6fa1fd02d96f86e1f450
Parents: 595ed28 761296a
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Mar 4 17:51:46 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Mar 4 17:51:46 2016 +0000

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/supervisor.clj |  4 ++++
 .../src/jvm/org/apache/storm/utils/ConfigUtils.java       | 10 ++++++++++
 storm-core/test/clj/org/apache/storm/supervisor_test.clj  |  6 ++++++
 3 files changed, 20 insertions(+)
----------------------------------------------------------------------



[4/6] storm git commit: Merge branch 'master' of github.com:apache/storm into storm1529

Posted by ki...@apache.org.
Merge branch 'master' of github.com:apache/storm into storm1529

Conflicts:
	storm-core/test/clj/org/apache/storm/supervisor_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/761296a1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/761296a1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/761296a1

Branch: refs/heads/master
Commit: 761296a1aba34e173c96314a31a9099aa4af3693
Parents: eddfea3 db04ce6
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Mar 4 17:22:29 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Mar 4 17:22:29 2016 +0000

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 CHANGELOG.md                                    |  34 +-
 README.markdown                                 |   7 +-
 bin/flight.bash                                 |   4 +-
 bin/storm.cmd                                   |   4 +-
 bin/storm.py                                    |  10 +-
 conf/defaults.yaml                              |   4 +-
 .../storm/starter/ThroughputVsLatency.java      |   2 +-
 .../apache/storm/sql/compiler/CompilerUtil.java |   7 +-
 .../apache/storm/sql/compiler/ExprCompiler.java |  32 +-
 .../backends/standalone/RelNodeCompiler.java    |   6 +-
 .../apache/storm/sql/parser/StormParser.java    |   5 +
 .../test/org/apache/storm/sql/TestStormSql.java |  64 +-
 .../storm/sql/compiler/TestCompilerUtils.java   |  62 +-
 .../storm/sql/compiler/TestExprSemantic.java    |  18 +
 .../backends/standalone/TestPlanCompiler.java   |  20 +
 .../backends/trident/TestPlanCompiler.java      |   4 +-
 .../test/org/apache/storm/sql/TestUtils.java    |  32 +-
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |  10 +-
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |   8 +-
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    |   8 +-
 .../storm/hdfs/bolt/SequenceFileBolt.java       |   8 +-
 .../org/apache/storm/kafka/IntSerializer.java   |  10 +-
 .../apache/storm/kafka/PartitionManager.java    |   5 +-
 .../kafka/trident/TridentKafkaEmitter.java      |   5 +-
 pom.xml                                         |  23 +
 .../src/clj/org/apache/storm/thrift.clj         |   2 +-
 .../src/clj/org/apache/storm/MockAutoCred.clj   |  58 --
 storm-core/src/clj/org/apache/storm/cluster.clj | 700 -------------------
 .../cluster_state/zookeeper_state_factory.clj   | 165 -----
 .../clj/org/apache/storm/command/heartbeats.clj |   6 +-
 .../clj/org/apache/storm/command/monitor.clj    |  37 -
 .../clj/org/apache/storm/command/rebalance.clj  |  47 --
 .../org/apache/storm/command/set_log_level.clj  |  76 --
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../apache/storm/command/upload_credentials.clj |  35 -
 .../src/clj/org/apache/storm/converter.clj      |  23 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   | 108 ---
 .../src/clj/org/apache/storm/daemon/common.clj  |  29 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  38 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  13 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  65 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 165 +++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  57 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  68 +-
 .../clj/org/apache/storm/internal/thrift.clj    |   2 +-
 .../storm/pacemaker/pacemaker_state_factory.clj | 141 ----
 .../clj/org/apache/storm/process_simulator.clj  |  49 --
 storm-core/src/clj/org/apache/storm/stats.clj   |   3 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  28 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj | 132 ++--
 .../src/clj/org/apache/storm/ui/helpers.clj     | 199 +-----
 storm-core/src/clj/org/apache/storm/util.clj    |  11 +
 .../src/clj/org/apache/storm/zookeeper.clj      |  74 --
 storm-core/src/jvm/org/apache/storm/Config.java |   9 +
 .../jvm/org/apache/storm/ProcessSimulator.java  |  82 +++
 .../storm/blobstore/LocalFsBlobStore.java       |   2 +-
 .../jvm/org/apache/storm/callback/Callback.java |  23 -
 .../storm/callback/ZKStateChangedCallback.java  |  25 +
 .../org/apache/storm/cluster/ClusterState.java  | 217 ------
 .../storm/cluster/ClusterStateContext.java      |   2 +-
 .../storm/cluster/ClusterStateFactory.java      |  28 -
 .../org/apache/storm/cluster/ClusterUtils.java  | 244 +++++++
 .../org/apache/storm/cluster/ExecutorBeat.java  |  44 ++
 .../org/apache/storm/cluster/IStateStorage.java | 222 ++++++
 .../storm/cluster/IStormClusterState.java       | 124 ++++
 .../storm/cluster/PaceMakerStateStorage.java    | 216 ++++++
 .../cluster/PaceMakerStateStorageFactory.java   |  64 ++
 .../storm/cluster/StateStorageFactory.java      |  28 +
 .../storm/cluster/StormClusterStateImpl.java    | 697 ++++++++++++++++++
 .../apache/storm/cluster/ZKStateStorage.java    | 244 +++++++
 .../storm/cluster/ZKStateStorageFactory.java    |  36 +
 .../src/jvm/org/apache/storm/command/CLI.java   |  34 +-
 .../src/jvm/org/apache/storm/command/List.java  |  50 --
 .../apache/storm/command/ListTopologies.java    |  52 ++
 .../jvm/org/apache/storm/command/Monitor.java   |  65 ++
 .../jvm/org/apache/storm/command/Rebalance.java |  86 +++
 .../org/apache/storm/command/SetLogLevel.java   | 116 +++
 .../apache/storm/command/UploadCredentials.java |  61 ++
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 128 ++++
 .../storm/daemon/metrics/MetricsUtils.java      |   2 +-
 .../storm/metric/FileBasedEventLogger.java      |  18 +-
 .../apache/storm/pacemaker/PacemakerClient.java |   6 +-
 .../apache/storm/security/auth/AuthUtils.java   |  40 ++
 .../security/auth/ThriftConnectionType.java     |   2 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |  64 +-
 .../auth/kerberos/AutoTGTKrb5LoginModule.java   |   8 +-
 .../serialization/SerializationFactory.java     |   2 +
 .../testing/staticmocking/MockedCluster.java    |  31 +
 .../MockedPaceMakerStateStorageFactory.java     |  32 +
 .../apache/storm/topology/TopologyBuilder.java  |  13 +-
 .../apache/storm/trident/tuple/ConsList.java    |  20 +-
 .../apache/storm/ui/FilterConfiguration.java    |  63 ++
 .../jvm/org/apache/storm/ui/IConfigurator.java  |  24 +
 .../src/jvm/org/apache/storm/ui/UIHelpers.java  | 267 +++++++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |   8 +-
 .../src/jvm/org/apache/storm/utils/Time.java    |   1 +
 .../src/jvm/org/apache/storm/utils/Utils.java   |  91 ++-
 .../storm/utils/WorkerBackpressureCallback.java |   2 +-
 .../storm/utils/WorkerBackpressureThread.java   |  38 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |  77 +-
 storm-core/src/ui/public/component.html         |   2 +-
 .../templates/topology-page-template.html       |   6 +-
 storm-core/src/ui/public/topology.html          |   2 +-
 .../org/apache/storm/integration_test.clj       |  13 +-
 .../test/clj/org/apache/storm/cluster_test.clj  | 202 +++---
 .../storm/messaging/netty_integration_test.clj  |   1 +
 .../test/clj/org/apache/storm/nimbus_test.clj   | 166 ++---
 .../storm/pacemaker_state_factory_test.clj      | 121 ++--
 .../security/auth/auto_login_module_test.clj    |  24 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 173 ++---
 .../test/clj/org/apache/storm/utils_test.clj    | 111 ---
 .../test/jvm/org/apache/storm/MockAutoCred.java |  75 ++
 .../org/apache/storm/command/RebalanceTest.java |  41 ++
 .../apache/storm/command/SetLogLevelTest.java   |  54 ++
 .../jvm/org/apache/storm/command/TestCLI.java   |  44 +-
 .../storm/topology/TopologyBuilderTest.java     |  65 ++
 .../jvm/org/apache/storm/utils/TimeTest.java    | 112 +++
 .../jvm/org/apache/storm/utils/UtilsTest.java   | 219 ++++++
 .../utils/WorkerBackpressureThreadTest.java     |  50 ++
 .../storm/utils/staticmocking/package-info.java |   2 +-
 122 files changed, 4931 insertions(+), 2989 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/761296a1/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/761296a1/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/761296a1/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index eb02e50,415a56d..42dd766
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -321,8 -322,7 +322,8 @@@
                                  "-Dworkers.artifacts=/tmp/workers-artifacts"
                                  "-Dstorm.conf.file="
                                  "-Dstorm.options="
-                                 (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs")
+                                 (str "-Dstorm.log.dir=" storm-log-dir)
 +                                (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
                                  (str "-Dlogging.sensitivity=" mock-sensitivity)
                                  (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
                                  "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
@@@ -517,8 -514,7 +519,8 @@@
                                 " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
                                 " '-Dstorm.conf.file='"
                                 " '-Dstorm.options='"
-                                " '-Dstorm.log.dir=/logs'"
+                                " '-Dstorm.log.dir=" storm-log-dir "'"
 +                               " '-Djava.io.tmpdir=" (str  storm-local "/workers/" mock-worker-id "/tmp'")
                                 " '-Dlogging.sensitivity=" mock-sensitivity "'"
                                 " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
                                 " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"


[6/6] storm git commit: Added STORM-1529 ot Changelog

Posted by ki...@apache.org.
Added STORM-1529 ot Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96f81d79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96f81d79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96f81d79

Branch: refs/heads/master
Commit: 96f81d7930316f5faa8c9bfb5db40aa3e270bfec
Parents: 9c53f66
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Mar 4 17:53:09 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Mar 4 17:53:09 2016 +0000

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/96f81d79/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bbaec75..82bf6b1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1529: Change default worker temp directory location for workers
  * STORM-1543: DRPCSpout should always try to reconnect disconnected DRPCInvocationsClient
  * STORM-1528: Fix CsvPreparableReporter log directory
  * STORM-1561: Supervisor should relaunch worker if assignments have changed


[2/6] storm git commit: Merge branch 'master' of github.com:apache/storm into storm1529

Posted by ki...@apache.org.
Merge branch 'master' of github.com:apache/storm into storm1529


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/61c9702a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/61c9702a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/61c9702a

Branch: refs/heads/master
Commit: 61c9702a924eb6b1b8bd3f0a829678a50f02bb67
Parents: 56a7a02 12ceb09
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Mon Feb 15 23:39:27 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Mon Feb 15 23:39:27 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                    |  11 +
 README.markdown                                 |   1 +
 bin/storm-config.cmd                            |   6 +-
 bin/storm.cmd                                   |  47 +-
 bin/storm.py                                    |   8 +-
 dev-tools/travis/travis-script.sh               |   4 +-
 external/sql/storm-sql-core/pom.xml             |   9 +
 external/storm-elasticsearch/pom.xml            |   2 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 log4j2/cluster.xml                              |   2 +-
 log4j2/worker.xml                               |   2 +-
 pom.xml                                         |   9 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 storm-core/src/clj/org/apache/storm/cluster.clj |  27 +-
 .../cluster_state/zookeeper_state_factory.clj   |  14 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/config_value.clj   |  25 -
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  29 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  23 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 530 +++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  68 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 170 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 204 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  78 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  10 +-
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../clj/org/apache/storm/messaging/loader.clj   |  34 -
 .../clj/org/apache/storm/messaging/local.clj    |  23 -
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |  24 +-
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  89 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  97 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 923 +----------------
 .../src/clj/org/apache/storm/zookeeper.clj      |   1 -
 .../org/apache/storm/command/ConfigValue.java   |  30 +
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../utils/StormConnectionStateConverter.java    |  44 +
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 989 +++++++++++++++++--
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       | 100 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  20 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 131 ++-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 672 +++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 90 files changed, 3194 insertions(+), 2435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index e14c861,ae9e92f..8d1b6a6
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -260,11 -264,10 +264,11 @@@
          (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
            (rmr-as-user conf id (ConfigUtils/workerRoot conf id))
            (do
-             (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
+             (Utils/forceDelete (ConfigUtils/workerHeartbeatsRoot conf id))
              ;; this avoids a race condition with worker or subprocess writing pid around same time
-             (rmr (ConfigUtils/workerPidsRoot conf id))
-             (rmr (ConfigUtils/workerTmpRoot conf id))
-             (rmr (ConfigUtils/workerRoot conf id))))
+             (Utils/forceDelete (ConfigUtils/workerPidsRoot conf id))
++            (Utils/forceDelete (ConfigUtils/workerTmpRoot conf id))
+             (Utils/forceDelete (ConfigUtils/workerRoot conf id))))
          (ConfigUtils/removeWorkerUserWSE conf id)
          (remove-dead-worker id)
        ))
@@@ -373,12 -378,12 +379,13 @@@
                mem-onheap (.get_mem_on_heap resources)]
            ;; This condition checks for required files exist before launching the worker
            (if (required-topo-files-exist? conf storm-id)
-             (do
+             (let [pids-path (ConfigUtils/workerPidsRoot conf id)
+                   hb-path (ConfigUtils/workerHeartbeatsRoot conf id)]
                (log-message "Launching worker with assignment "
                  (get-worker-assignment-helper-msg assignment supervisor port id))
-               (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
-               (local-mkdirs (ConfigUtils/workerTmpRoot conf id))
-               (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
+               (FileUtils/forceMkdir (File. pids-path))
++              (FileUtils/forceMkdir (File. (ConfigUtils/workerTmpRoot conf id)))
+               (FileUtils/forceMkdir (File. hb-path))
                (launch-worker supervisor
                  (:storm-id assignment)
                  port

http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/61c9702a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 91c4057,9c31ddf..da5a5bc
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -298,10 -316,9 +316,10 @@@
                                  "-Dworkers.artifacts=/tmp/workers-artifacts"
                                  "-Dstorm.conf.file="
                                  "-Dstorm.options="
-                                 (str "-Dstorm.log.dir=" file-path-separator "logs")
-                                 (str "-Djava.io.tmpdir=/tmp/workers" file-path-separator mock-worker-id file-path-separator "tmp")
+                                 (str "-Dstorm.log.dir=" Utils/FILE_PATH_SEPARATOR "logs")
++                                (str "-Djava.io.tmpdir=/tmp/workers" Utils/FILE_PATH_SEPARATOR mock-worker-id Utils/FILE_PATH_SEPARATOR "tmp")
                                  (str "-Dlogging.sensitivity=" mock-sensitivity)
-                                 (str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
+                                 (str "-Dlog4j.configurationFile=" Utils/FILE_PATH_SEPARATOR "log4j2" Utils/FILE_PATH_SEPARATOR "worker.xml")
                                  "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
                                  (str "-Dstorm.id=" mock-storm-id)
                                  (str "-Dworker.id=" mock-worker-id)
@@@ -326,13 -348,12 +349,13 @@@
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 +                          (workerRootImpl [conf] "/tmp/workers")
                            (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
-               (stubbing [add-to-classpath mock-cp
-                      launch-process nil
-                      supervisor/jlp nil
-                      supervisor/write-log-metadata! nil
-                      supervisor/create-blobstore-links nil]
+           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                       _ (UtilsInstaller. utils-spy)]
+               (stubbing [supervisor/jlp nil
+                          supervisor/write-log-metadata! nil
+                          supervisor/create-blobstore-links nil]
                  (supervisor/launch-worker mock-supervisor
                                        mock-storm-id
                                        mock-port
@@@ -348,45 -373,58 +375,60 @@@
                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
                                        WORKER-CHILDOPTS list-opts}}
                mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                             topo-list-opts}]
-             (with-open [_ (proxy [MockedConfigUtils] []
-                             (supervisorStormDistRootImpl ([conf] nil)
-                                                          ([conf storm-id] nil))
-                             (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
-                             (setWorkerUserWSEImpl [conf worker-id user] nil)
-                             (workerRootImpl [conf] "/tmp/workers")
-                             (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
-                 (stubbing [add-to-classpath mock-cp
-                      launch-process nil
-                      supervisor/jlp nil
-                      supervisor/write-log-metadata! nil
-                      supervisor/create-blobstore-links nil]
-                 (supervisor/launch-worker mock-supervisor
-                                       mock-storm-id
-                                       mock-port
-                                       mock-worker-id
-                                       mock-mem-onheap)
-                 (verify-first-call-args-for-indices launch-process
-                                                 [0]
-                                                 exp-args)))))
+                                             topo-list-opts}
+               cu-proxy (proxy [ConfigUtils] []
+                           (supervisorStormDistRootImpl ([conf] nil)
+                                                        ([conf storm-id] nil))
+                           (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
+                           (setWorkerUserWSEImpl [conf worker-id user] nil)
++                          (workerRootImpl [conf] "/tmp/workers")
+                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+               utils-spy (->>
+                           (proxy [Utils] []
+                             (addToClasspathImpl [classpath paths] mock-cp)
+                             (launchProcessImpl [& _] nil))
+                           Mockito/spy)]
+             (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                         _ (UtilsInstaller. utils-spy)]
+                 (stubbing [supervisor/jlp nil
+                            supervisor/write-log-metadata! nil
+                            supervisor/create-blobstore-links nil]
+                   (supervisor/launch-worker mock-supervisor
+                                             mock-storm-id
+                                             mock-port
+                                             mock-worker-id
+                                             mock-mem-onheap)
+                   (. (Mockito/verify utils-spy)
+                      (launchProcessImpl (Matchers/eq exp-args)
+                                         (Matchers/any)
+                                         (Matchers/any)
+                                         (Matchers/any)
+                                         (Matchers/any)))))))
+ 
        (testing "testing topology.classpath is added to classpath"
-         (let [topo-cp (str file-path-separator "any" file-path-separator "path")
-               exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+         (let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
+               exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
-               mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
-           (with-open [_ (proxy [MockedConfigUtils] []
+               mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
+               cu-proxy (proxy [ConfigUtils] []
                            (supervisorStormDistRootImpl ([conf] nil)
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 +                          (workerRootImpl [conf] "/tmp/workers")
-                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+               utils-spy (->>
+                           (proxy [Utils] []
+                             (currentClasspathImpl []
+                               (str Utils/FILE_PATH_SEPARATOR "base"))
+                             (launchProcessImpl [& _] nil))
+                           Mockito/spy)]
+           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                       _ (UtilsInstaller. utils-spy)]
                  (stubbing [supervisor/jlp nil
                       supervisor/write-log-metadata! nil
-                      launch-process nil
-                      current-classpath (str file-path-separator "base")
                       supervisor/create-blobstore-links nil]
-                     (supervisor/launch-worker mock-supervisor
+                   (supervisor/launch-worker mock-supervisor
                                                mock-storm-id
                                                mock-port
                                                mock-worker-id
@@@ -405,21 -446,29 +450,30 @@@
                                                         ([conf storm-id] nil))
                            (readSupervisorStormConfImpl [conf storm-id] mocked-supervisor-storm-conf)
                            (setWorkerUserWSEImpl [conf worker-id user] nil)
 +                          (workerRootImpl [conf] "/tmp/workers")
-                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))]
+                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+               utils-spy (->>
+                           (proxy [Utils] []
+                             (currentClasspathImpl []
+                               (str Utils/FILE_PATH_SEPARATOR "base"))
+                             (launchProcessImpl [& _] nil))
+                           Mockito/spy)]
+           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
+                       _ (UtilsInstaller. utils-spy)]
              (stubbing [supervisor/jlp nil
-                      launch-process nil
-                      supervisor/write-log-metadata! nil
-                      current-classpath (str file-path-separator "base")
-                      supervisor/create-blobstore-links nil]
-                     (supervisor/launch-worker mock-supervisor
-                                               mock-storm-id
-                                               mock-port
-                                               mock-worker-id
-                                               mock-mem-onheap)
-                     (verify-first-call-args-for-indices launch-process
-                                                         [2]
-                                                         full-env))))))))
+                        supervisor/write-log-metadata! nil
+                        supervisor/create-blobstore-links nil]
+               (supervisor/launch-worker mock-supervisor
+                                         mock-storm-id
+                                         mock-port
+                                         mock-worker-id
+                                         mock-mem-onheap)
+               (. (Mockito/verify utils-spy)
+                  (launchProcessImpl (Matchers/any)
+                                     (Matchers/eq full-env)
+                                     (Matchers/any)
+                                     (Matchers/any)
+                                     (Matchers/any))))))))))
  
  (deftest test-worker-launch-command-run-as-user
    (testing "*.worker.childopts configuration"
@@@ -578,184 -646,189 +652,190 @@@
            (supervisor/supervisor-data auth-conf nil fake-isupervisor)
            (verify-call-times-for cluster/mk-storm-cluster-state 1)
            (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
-                                               expected-acls))))))
- 
- (deftest test-write-log-metadata
-   (testing "supervisor writes correct data to logs metadata file"
-     (let [exp-owner "alice"
-           exp-worker-id "42"
-           exp-storm-id "0123456789"
-           exp-port 4242
-           exp-logs-users ["bob" "charlie" "daryl"]
-           exp-logs-groups ["read-only-group" "special-group"]
-           storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
-                       TOPOLOGY-USERS ["charlie" "bob"]
-                       TOPOLOGY-GROUPS ["special-group"]
-                       LOGS-GROUPS ["read-only-group"]
-                       LOGS-USERS ["daryl"]}
-           exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
-                     "worker-id" exp-worker-id
-                     LOGS-USERS exp-logs-users
-                     LOGS-GROUPS exp-logs-groups}
-           conf {}]
-       (mocking [supervisor/write-log-metadata-to-yaml-file!]
-         (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
-                                         exp-storm-id exp-port conf)
-         (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
-                                       exp-storm-id exp-port exp-data conf)))))
- 
- (deftest test-worker-launcher-requires-user
-   (testing "worker-launcher throws on blank user"
-     (mocking [launch-process]
-       (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
-                                   #"(?i).*user cannot be blank.*"
-                                   (supervisor/worker-launcher {} nil ""))))))
- 
- (defn found? [sub-str input-str]
-   (if (string? input-str)
-     (contrib-str/substring? sub-str (str input-str))
-     (boolean (some #(contrib-str/substring? sub-str %) input-str))))
- 
- (defn not-found? [sub-str input-str]
+               expected-acls)))))
+ 
+   (deftest test-write-log-metadata
+     (testing "supervisor writes correct data to logs metadata file"
+       (let [exp-owner "alice"
+             exp-worker-id "42"
+             exp-storm-id "0123456789"
+             exp-port 4242
+             exp-logs-users ["bob" "charlie" "daryl"]
+             exp-logs-groups ["read-only-group" "special-group"]
+             storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+                         TOPOLOGY-USERS ["charlie" "bob"]
+                         TOPOLOGY-GROUPS ["special-group"]
+                         LOGS-GROUPS ["read-only-group"]
+                         LOGS-USERS ["daryl"]}
+             exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+                       "worker-id" exp-worker-id
+                       LOGS-USERS exp-logs-users
+                       LOGS-GROUPS exp-logs-groups}
+             conf {}]
+         (mocking [supervisor/write-log-metadata-to-yaml-file!]
+           (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+             exp-storm-id exp-port conf)
+           (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+             exp-storm-id exp-port exp-data conf)))))
+ 
+   (deftest test-worker-launcher-requires-user
+     (testing "worker-launcher throws on blank user"
+       (let [utils-proxy (proxy [Utils] []
+                           (launchProcessImpl [& _] nil))]
+         (with-open [_ (UtilsInstaller. utils-proxy)]
+           (is (try
+                 (supervisor/worker-launcher {} nil "")
+                 false
+                 (catch Throwable t
+                   (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+                        (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
+ 
+   (defn found? [sub-str input-str]
+     (if (string? input-str)
+       (contrib-str/substring? sub-str (str input-str))
+       (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+ 
+   (defn not-found? [sub-str input-str]
      (complement (found? sub-str input-str)))
  
- (deftest test-substitute-childopts-happy-path-string
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-happy-path-list
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-happy-path-list-arraylist
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-topology-id-alone
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-no-keys
-   (testing "worker-launcher has no ids to replace in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-nil-childopts
-   (testing "worker-launcher has nil childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts nil
-           expected-childopts nil
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-substitute-childopts-nil-ids
-   (testing "worker-launcher has nil ids"
-     (let [worker-id nil
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
- 
- (deftest test-retry-read-assignments
-   (with-simulated-time-local-cluster [cluster
-                                       :supervisors 0
-                                       :ports-per-supervisor 2
-                                       :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                     NIMBUS-MONITOR-FREQ-SECS 10
-                                                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                     TOPOLOGY-ACKER-EXECUTORS 0}]
-     (letlocals
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind topology1 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind topology2 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind state (:storm-cluster-state cluster))
-      (bind changed (capture-changed-workers
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology1"
-                      {TOPOLOGY-WORKERS 2}
-                      topology1
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology2"
-                      {TOPOLOGY-WORKERS 2}
-                      topology2
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                     (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                     ))
-      (is (empty? (:launched changed)))
-      (bind options (RebalanceOptions.))
-      (.set_wait_secs options 0)
-      (bind changed (capture-changed-workers
-                     (.rebalance (:nimbus cluster) "topology2" options)
-                     (advance-cluster-time cluster 10)
-                     (heartbeat-workers cluster "sup1" [1 2 3 4])
-                     (advance-cluster-time cluster 10)
-                     ))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [1 2]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology1"))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [3 4]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-      )))
+   (deftest test-substitute-childopts-happy-path-string
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-happy-path-list
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-happy-path-list-arraylist
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-topology-id-alone
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-no-keys
+     (testing "worker-launcher has no ids to replace in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-nil-childopts
+     (testing "worker-launcher has nil childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts nil
+             expected-childopts nil
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-substitute-childopts-nil-ids
+     (testing "worker-launcher has nil ids"
+       (let [worker-id nil
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
+ 
+   (deftest test-retry-read-assignments
+     (with-simulated-time-local-cluster [cluster
+                                         :supervisors 0
+                                         :ports-per-supervisor 2
+                                         :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+                                                       NIMBUS-MONITOR-FREQ-SECS 10
+                                                       TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                       TOPOLOGY-ACKER-EXECUTORS 0}]
+       (letlocals
+         (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+         (bind topology1 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind topology2 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind state (:storm-cluster-state cluster))
+         (bind changed (capture-changed-workers
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology1"
+                           {TOPOLOGY-WORKERS 2}
+                           topology1
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology2"
+                           {TOPOLOGY-WORKERS 2}
+                           topology2
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+                         (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+                         ))
+         (is (empty? (:launched changed)))
+         (bind options (RebalanceOptions.))
+         (.set_wait_secs options 0)
+         (bind changed (capture-changed-workers
+                         (.rebalance (:nimbus cluster) "topology2" options)
+                         (advance-cluster-time cluster 10)
+                         (heartbeat-workers cluster "sup1" [1 2 3 4])
+                         (advance-cluster-time cluster 10)
+                         ))
+         (validate-launched-once (:launched changed)
+           {"sup1" [1 2]}
+           (get-storm-id (:storm-cluster-state cluster) "topology1"))
+         (validate-launched-once (:launched changed)
+           {"sup1" [3 4]}
+           (get-storm-id (:storm-cluster-state cluster) "topology2"))
+         ))))
++


[3/6] storm git commit: Merge branch 'master' of github.com:apache/storm into storm1529

Posted by ki...@apache.org.
Merge branch 'master' of github.com:apache/storm into storm1529


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eddfea36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eddfea36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eddfea36

Branch: refs/heads/master
Commit: eddfea3615a54834425582649a2fb9ba9706dc1c
Parents: 61c9702 e543bbf
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Fri Feb 19 14:21:11 2016 -0600
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Fri Feb 19 14:21:11 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                    |  17 +
 README.markdown                                 |   1 +
 bin/storm-config.cmd                            |   4 +
 bin/storm.cmd                                   |  24 +-
 bin/storm.py                                    |  12 +-
 conf/cgconfig.conf.example                      |  41 +++
 conf/defaults.yaml                              |  16 +-
 examples/storm-starter/pom.xml                  |  10 +
 .../org/apache/storm/starter/clj/word_count.clj |   3 +-
 .../starter/ResourceAwareExampleTopology.java   |   2 +-
 .../spout/RandomNumberGeneratorSpout.java       |  95 +++++
 .../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++
 .../TridentMinMaxOfVehiclesTopology.java        | 180 ++++++++++
 external/storm-hdfs/pom.xml                     |  23 +-
 pom.xml                                         |  16 +
 storm-clojure/pom.xml                           |  74 ++++
 .../src/clj/org/apache/storm/clojure.clj        | 207 +++++++++++
 .../src/clj/org/apache/storm/thrift.clj         | 286 +++++++++++++++
 storm-clojure/src/test/clj/clojure_test.clj     | 158 +++++++++
 storm-core/pom.xml                              |   9 +
 storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
 .../clj/org/apache/storm/command/activate.clj   |  24 --
 .../clj/org/apache/storm/command/deactivate.clj |  24 --
 .../org/apache/storm/command/dev_zookeeper.clj  |  28 --
 .../clj/org/apache/storm/command/get_errors.clj |   3 +-
 .../org/apache/storm/command/healthcheck.clj    |  90 -----
 .../org/apache/storm/command/kill_topology.clj  |  29 --
 .../src/clj/org/apache/storm/command/list.clj   |  38 --
 .../clj/org/apache/storm/command/monitor.clj    |   2 +-
 .../clj/org/apache/storm/command/rebalance.clj  |   3 +-
 .../org/apache/storm/command/set_log_level.clj  |   3 +-
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/common.clj  | 121 ++++---
 .../clj/org/apache/storm/daemon/executor.clj    | 114 +++---
 .../clj/org/apache/storm/daemon/logviewer.clj   |  19 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 104 +++---
 .../clj/org/apache/storm/daemon/supervisor.clj  | 251 +++++++++----
 .../src/clj/org/apache/storm/daemon/task.clj    |   4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  | 170 +++++----
 .../src/clj/org/apache/storm/disruptor.clj      |  89 -----
 storm-core/src/clj/org/apache/storm/event.clj   |  71 ----
 .../clj/org/apache/storm/internal/clojure.clj   | 201 +++++++++++
 .../clj/org/apache/storm/internal/thrift.clj    |  96 +++++
 .../src/clj/org/apache/storm/local_state.clj    | 134 -------
 .../org/apache/storm/local_state_converter.clj  |  24 ++
 storm-core/src/clj/org/apache/storm/testing.clj |  37 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  | 286 ---------------
 storm-core/src/clj/org/apache/storm/timer.clj   | 128 -------
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  88 +++++
 .../src/jvm/org/apache/storm/StormTimer.java    | 241 +++++++++++++
 storm-core/src/jvm/org/apache/storm/Thrift.java | 351 ++++++++++++++++++
 .../jvm/org/apache/storm/command/Activate.java  |  40 +++
 .../src/jvm/org/apache/storm/command/CLI.java   | 353 +++++++++++++++++++
 .../org/apache/storm/command/Deactivate.java    |  40 +++
 .../org/apache/storm/command/DevZookeeper.java  |  35 ++
 .../org/apache/storm/command/HealthCheck.java   | 125 +++++++
 .../org/apache/storm/command/KillTopology.java  |  51 +++
 .../src/jvm/org/apache/storm/command/List.java  |  50 +++
 .../container/ResourceIsolationInterface.java   |  51 +++
 .../storm/container/cgroup/CgroupCenter.java    | 216 ++++++++++++
 .../storm/container/cgroup/CgroupCommon.java    | 270 ++++++++++++++
 .../container/cgroup/CgroupCommonOperation.java |  81 +++++
 .../container/cgroup/CgroupCoreFactory.java     |  74 ++++
 .../storm/container/cgroup/CgroupManager.java   | 210 +++++++++++
 .../storm/container/cgroup/CgroupOperation.java |  79 +++++
 .../storm/container/cgroup/CgroupUtils.java     | 118 +++++++
 .../apache/storm/container/cgroup/Device.java   |  75 ++++
 .../storm/container/cgroup/Hierarchy.java       | 130 +++++++
 .../storm/container/cgroup/SubSystem.java       |  81 +++++
 .../storm/container/cgroup/SubSystemType.java   |  36 ++
 .../storm/container/cgroup/SystemOperation.java |  75 ++++
 .../storm/container/cgroup/core/BlkioCore.java  | 213 +++++++++++
 .../storm/container/cgroup/core/CgroupCore.java |  26 ++
 .../storm/container/cgroup/core/CpuCore.java    | 135 +++++++
 .../container/cgroup/core/CpuacctCore.java      |  71 ++++
 .../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
 .../container/cgroup/core/DevicesCore.java      | 189 ++++++++++
 .../container/cgroup/core/FreezerCore.java      |  66 ++++
 .../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
 .../storm/container/cgroup/core/NetClsCore.java |  69 ++++
 .../container/cgroup/core/NetPrioCore.java      |  65 ++++
 .../org/apache/storm/event/EventManager.java    |  24 ++
 .../org/apache/storm/event/EventManagerImp.java |  97 +++++
 .../jvm/org/apache/storm/testing/NGrouping.java |   4 +-
 .../storm/testing/PythonShellMetricsBolt.java   |  14 +-
 .../storm/testing/PythonShellMetricsSpout.java  |   8 +-
 .../jvm/org/apache/storm/trident/Stream.java    | 121 ++++++-
 .../operation/builtin/ComparisonAggregator.java |  91 +++++
 .../storm/trident/operation/builtin/Max.java    |  37 ++
 .../operation/builtin/MaxWithComparator.java    |  51 +++
 .../storm/trident/operation/builtin/Min.java    |  36 ++
 .../operation/builtin/MinWithComparator.java    |  51 +++
 .../org/apache/storm/utils/DisruptorQueue.java  |  15 +-
 .../jvm/org/apache/storm/utils/LocalState.java  | 112 +++++-
 .../org/apache/storm/utils/NimbusClient.java    |  19 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  37 +-
 .../org/apache/storm/integration_test.clj       | 259 ++++++++------
 .../org/apache/storm/testing4j_test.clj         |  72 ++--
 .../test/clj/org/apache/storm/clojure_test.clj  |  64 ++--
 .../test/clj/org/apache/storm/cluster_test.clj  |   3 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../test/clj/org/apache/storm/grouping_test.clj |  56 +--
 .../storm/messaging/netty_integration_test.clj  |  18 +-
 .../clj/org/apache/storm/messaging_test.clj     |  14 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |  85 +++--
 .../test/clj/org/apache/storm/nimbus_test.clj   | 260 +++++++++-----
 .../scheduler/resource_aware_scheduler_test.clj |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 175 ++++-----
 .../clj/org/apache/storm/tick_tuple_test.clj    |  15 +-
 .../clj/org/apache/storm/transactional_test.clj |   3 +-
 .../test/jvm/org/apache/storm/TestCgroups.java  | 130 +++++++
 .../jvm/org/apache/storm/command/TestCLI.java   |  59 ++++
 .../resource/TestResourceAwareScheduler.java    |   3 +
 114 files changed, 7742 insertions(+), 2001 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eddfea36/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/eddfea36/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index da5a5bc,ef40c4a..eb02e50
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -775,67 -772,68 +778,69 @@@
              childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
          (is (= expected-childopts childopts-with-ids)))))
  
-   (deftest test-retry-read-assignments
-     (with-simulated-time-local-cluster [cluster
-                                         :supervisors 0
-                                         :ports-per-supervisor 2
-                                         :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                       NIMBUS-MONITOR-FREQ-SECS 10
-                                                       TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                       TOPOLOGY-ACKER-EXECUTORS 0}]
-       (letlocals
-         (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-         (bind topology1 (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                           {}))
-         (bind topology2 (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                           {}))
-         (bind state (:storm-cluster-state cluster))
-         (bind changed (capture-changed-workers
-                         (submit-mocked-assignment
-                           (:nimbus cluster)
-                           (:storm-cluster-state cluster)
-                           "topology1"
-                           {TOPOLOGY-WORKERS 2}
-                           topology1
-                           {1 "1"
-                            2 "1"}
-                           {[1 1] ["sup1" 1]
-                            [2 2] ["sup1" 2]}
-                           {["sup1" 1] [0.0 0.0 0.0]
-                            ["sup1" 2] [0.0 0.0 0.0]
-                            })
-                         (submit-mocked-assignment
-                           (:nimbus cluster)
-                           (:storm-cluster-state cluster)
-                           "topology2"
-                           {TOPOLOGY-WORKERS 2}
-                           topology2
-                           {1 "1"
-                            2 "1"}
-                           {[1 1] ["sup1" 1]
-                            [2 2] ["sup1" 2]}
-                           {["sup1" 1] [0.0 0.0 0.0]
-                            ["sup1" 2] [0.0 0.0 0.0]
-                            })
-                         ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                         (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                         ))
-         (is (empty? (:launched changed)))
-         (bind options (RebalanceOptions.))
-         (.set_wait_secs options 0)
-         (bind changed (capture-changed-workers
-                         (.rebalance (:nimbus cluster) "topology2" options)
-                         (advance-cluster-time cluster 10)
-                         (heartbeat-workers cluster "sup1" [1 2 3 4])
-                         (advance-cluster-time cluster 10)
-                         ))
-         (validate-launched-once (:launched changed)
-           {"sup1" [1 2]}
-           (get-storm-id (:storm-cluster-state cluster) "topology1"))
-         (validate-launched-once (:launched changed)
-           {"sup1" [3 4]}
-           (get-storm-id (:storm-cluster-state cluster) "topology2"))
-         ))))
+ (deftest test-retry-read-assignments
+   (with-simulated-time-local-cluster [cluster
+                                       :supervisors 0
+                                       :ports-per-supervisor 2
+                                       :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+                                                     NIMBUS-MONITOR-FREQ-SECS 10
+                                                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                     TOPOLOGY-ACKER-EXECUTORS 0}]
+     (letlocals
+      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+      (bind topology1 (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 2))}
+                       {}))
+      (bind topology2 (Thrift/buildTopology
+                       {"1" (Thrift/prepareSpoutDetails
+                              (TestPlannerSpout. true) (Integer. 2))}
+                       {}))
+      (bind state (:storm-cluster-state cluster))
+      (bind changed (capture-changed-workers
+                     (submit-mocked-assignment
+                      (:nimbus cluster)
+                      (:storm-cluster-state cluster)
+                      "topology1"
+                      {TOPOLOGY-WORKERS 2}
+                      topology1
+                      {1 "1"
+                       2 "1"}
+                      {[1 1] ["sup1" 1]
+                       [2 2] ["sup1" 2]}
+                      {["sup1" 1] [0.0 0.0 0.0]
+                       ["sup1" 2] [0.0 0.0 0.0]
+                       })
+                     (submit-mocked-assignment
+                      (:nimbus cluster)
+                      (:storm-cluster-state cluster)
+                      "topology2"
+                      {TOPOLOGY-WORKERS 2}
+                      topology2
+                      {1 "1"
+                       2 "1"}
+                      {[1 1] ["sup1" 1]
+                       [2 2] ["sup1" 2]}
+                      {["sup1" 1] [0.0 0.0 0.0]
+                       ["sup1" 2] [0.0 0.0 0.0]
+                       })
+                     ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+                     (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+                     ))
+      (is (empty? (:launched changed)))
+      (bind options (RebalanceOptions.))
+      (.set_wait_secs options 0)
+      (bind changed (capture-changed-workers
+                     (.rebalance (:nimbus cluster) "topology2" options)
+                     (advance-cluster-time cluster 10)
+                     (heartbeat-workers cluster "sup1" [1 2 3 4])
+                     (advance-cluster-time cluster 10)
+                     ))
+      (validate-launched-once (:launched changed)
+                              {"sup1" [1 2]}
+                              (get-storm-id (:storm-cluster-state cluster) "topology1"))
+      (validate-launched-once (:launched changed)
+                              {"sup1" [3 4]}
+                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
+      )))
 +