You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:02 UTC

[02/10] storm git commit: STORM-1281: LocalCluster, testing4j and testing.clj to java

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index cc760a4..bb949d5 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -16,16 +16,18 @@
 (ns org.apache.storm.nimbus-test
   (:use [clojure test])
   (:require [org.apache.storm [util :as util]])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+  (:import [java.util.function UnaryOperator])
+  (:import [org.apache.storm.testing InProcessZookeeper MockLeaderElector TestWordCounter TestWordSpout TestGlobalCount
             TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
            [org.apache.storm.blobstore BlobStore]
            [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
            [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
            [org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
-           [org.apache.storm Thrift MockAutoCred]
+           [org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
            [org.apache.storm.stats BoltExecutorStats StatsUtil]
            [org.apache.storm.security.auth IGroupMappingServiceProvider IAuthorizer])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
+  (:import [org.apache.storm.testing TmpPath])
   (:import [org.apache.storm.scheduler INimbus])
   (:import [org.mockito Mockito Matchers])
   (:import [org.mockito.exceptions.base MockitoAssertionError])
@@ -38,18 +40,22 @@
   (:import [java.util HashMap HashSet Optional])
   (:import [java.io File])
   (:import [javax.security.auth Subject])
-  (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate StormCommonInstaller]
+  (:import [org.apache.storm.utils Time Time$SimulatedTime Utils Utils$UptimeComputer ConfigUtils IPredicate StormCommonInstaller]
            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
   (:import [org.apache.storm.zookeeper Zookeeper])
   (:import [org.apache.commons.io FileUtils])
   (:import [org.json.simple JSONValue])
   (:import [org.apache.storm.daemon StormCommon])
   (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
-  (:use [org.apache.storm testing util config log converter])
+  (:use [org.apache.storm util config log])
   (:require [conjure.core])
 
   (:use [conjure core]))
 
+(defn- mk-nimbus
+  [conf inimbus blob-store leader-elector group-mapper cluster-state]
+  (Nimbus. conf inimbus cluster-state nil blob-store leader-elector group-mapper))
+
 (defn- from-json
        [^String str]
        (if str
@@ -58,28 +64,30 @@
          nil))
 
 (defn storm-component->task-info [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)
-        nimbus (:nimbus cluster)]
+  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+        nimbus (.getNimbus cluster)]
     (-> (.getUserTopology nimbus storm-id)
         (#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
         (Utils/reverseMap)
         clojurify-structure)))
 
 (defn getCredentials [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)]
-    (clojurify-crdentials (.credentials (:storm-cluster-state cluster) storm-id nil))))
+  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+        creds (.credentials (.getClusterState cluster) storm-id nil)]
+    (if creds (into {} (.get_creds creds)))))
 
 (defn storm-component->executor-info [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)
-        nimbus (:nimbus cluster)
+  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+        nimbus (.getNimbus cluster)
         storm-conf (from-json (.getTopologyConf nimbus storm-id))
         topology (.getUserTopology nimbus storm-id)
         task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))
-        state (:storm-cluster-state cluster)
+        state (.getClusterState cluster)
         get-component (comp task->component first)]
-    (->> (clojurify-assignment (.assignmentInfo state storm-id nil))
-         :executor->node+port
-         keys
+    (->> (.assignmentInfo state storm-id nil)
+         .get_executor_node_port
+         .keySet
+         clojurify-structure
          (map (fn [e] {e (get-component e)}))
          (apply merge)
          (Utils/reverseMap)
@@ -87,26 +95,25 @@
 
 (defn storm-num-workers [state storm-name]
   (let [storm-id (StormCommon/getStormId state storm-name)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-    (count (clojurify-structure (Utils/reverseMap (:executor->node+port assignment))))
-    ))
+        assignment (.assignmentInfo state storm-id nil)]
+    (.size (Utils/reverseMap (.get_executor_node_port assignment)))))
 
 (defn topology-nodes [state storm-name]
   (let [storm-id (StormCommon/getStormId state storm-name)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
+        assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
-         :executor->node+port
-         vals
-         (map first)
+         .get_executor_node_port
+         .values
+         (map (fn [np] (.get_node np)))
          set         
          )))
 
 (defn topology-slots [state storm-name]
   (let [storm-id (StormCommon/getStormId state storm-name)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
+        assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
-         :executor->node+port
-         vals
+         .get_executor_node_port
+         .values
          set         
          )))
 
@@ -114,12 +121,12 @@
 ; map-val is a temporary kluge for clojure.
 (defn topology-node-distribution [state storm-name]
   (let [storm-id (StormCommon/getStormId state storm-name)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
+        assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
-         :executor->node+port
-         vals
+         .get_executor_node_port
+         .values
          set
-         (group-by first)
+         (group-by (fn [np] (.get_node np)))
          (map-val count)
          (map (fn [[_ amt]] {amt 1}))
          (apply merge-with +)       
@@ -129,20 +136,22 @@
   (count (topology-nodes state storm-name)))
 
 (defn executor-assignment [cluster storm-id executor-id]
-  (let [state (:storm-cluster-state cluster)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-    ((:executor->node+port assignment) executor-id)
+  (let [state (.getClusterState cluster)
+        assignment (.assignmentInfo state storm-id nil)]
+    (.get (.get_executor_node_port assignment) executor-id)
     ))
 
 (defn executor-start-times [cluster storm-id]
-  (let [state (:storm-cluster-state cluster)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-    (:executor->start-time-secs assignment)))
+  (let [state (.getClusterState cluster)
+        assignment (.assignmentInfo state storm-id nil)]
+    (clojurify-structure (.get_executor_start_time_secs assignment))))
 
 (defn do-executor-heartbeat [cluster storm-id executor]
-  (let [state (:storm-cluster-state cluster)
-        executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state storm-id nil)))
-        [node port] (get executor->node+port executor)
+  (let [state (.getClusterState cluster)
+        executor->node+port (.get_executor_node_port (.assignmentInfo state storm-id nil))
+        np (.get executor->node+port executor)
+        node (.get_node np)
+        port (first (.get_port np))
         curr-beat (StatsUtil/convertZkWorkerHb (.getWorkerHeartbeat state storm-id node port))
         stats (if (get curr-beat "executor-stats")
                 (get curr-beat "executor-stats")
@@ -156,28 +165,28 @@
       (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))))
 
 (defn slot-assignments [cluster storm-id]
-  (let [state (:storm-cluster-state cluster)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-        (clojurify-structure (Utils/reverseMap (:executor->node+port assignment)))))
+  (let [state (.getClusterState cluster)
+        assignment (.assignmentInfo state storm-id nil)]
+        (clojurify-structure (Utils/reverseMap (.get_executor_node_port assignment)))))
 
 (defn task-ids [cluster storm-id]
-  (let [nimbus (:nimbus cluster)]
+  (let [nimbus (.getNimbus cluster)]
     (-> (.getUserTopology nimbus storm-id)
         (#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
         clojurify-structure
         keys)))
 
 (defn topology-executors [cluster storm-id]
-  (let [state (:storm-cluster-state cluster)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
-    ret-keys (keys (:executor->node+port assignment))
+  (let [state (.getClusterState cluster)
+        assignment (.assignmentInfo state storm-id nil)
+    ret-keys (keys (.get_executor_node_port assignment))
         _ (log-message "ret-keys: " (pr-str ret-keys)) ]
     ret-keys
     ))
 
 (defn check-distribution [items distribution]
-  (let [counts (map count items)]
-    (is (ms= counts distribution))))
+  (let [counts (map long (map count items))]
+    (is (Testing/multiseteq counts (map long distribution)))))
 
 (defn disjoint? [& sets]
   (let [combined (apply concat sets)]
@@ -188,14 +197,14 @@
   clojurify-structure (StormCommon/executorIdToTasks executor-id))
 
 (defnk check-consistency [cluster storm-name :assigned? true]
-  (let [state (:storm-cluster-state cluster)
+  (let [state (.getClusterState cluster)
         storm-id (StormCommon/getStormId state storm-name)
         task-ids (task-ids cluster storm-id)
-        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
-        executor->node+port (:executor->node+port assignment)
+        assignment (.assignmentInfo state storm-id nil)
+        executor->node+port (.get_executor_node_port assignment)
         task->node+port (StormCommon/taskToNodeport executor->node+port)
         assigned-task-ids (mapcat executor->tasks (keys executor->node+port))
-        all-nodes (set (map first (vals executor->node+port)))]
+        all-nodes (set (map (fn [np] (.get_node np)) (.values executor->node+port)))]
     (when assigned?
       (is (= (sort task-ids) (sort assigned-task-ids)))
       (doseq [t task-ids]
@@ -203,17 +212,17 @@
     (doseq [[e s] executor->node+port]
       (is (not-nil? s)))
     
-    ;;(map str (-> (Thread/currentThread) .getStackTrace))
-    (is (= all-nodes (set (keys (:node->host assignment)))))
+    (is (= all-nodes (set (keys (.get_node_host assignment)))))
     (doseq [[e s] executor->node+port]
-      (is (not-nil? ((:executor->start-time-secs assignment) e))))
+      (is (not-nil? (.get (.get_executor_start_time_secs assignment) e))))
     ))
 
 (deftest test-bogusId
-  (with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
-                       :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
-    (let [state (:storm-cluster-state cluster)
-          nimbus (:nimbus cluster)]
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSupervisors 4) 
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+    (let [state (.getClusterState cluster)
+          nimbus (.getNimbus cluster)]
        (is (thrown? NotAliveException (.getTopologyConf nimbus "bogus-id")))
        (is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
        (is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
@@ -222,10 +231,11 @@
       )))
 
 (deftest test-assignment
-  (with-simulated-time-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
-                       :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
-    (let [state (:storm-cluster-state cluster)
-          nimbus (:nimbus cluster)
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 4) 
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+    (let [state (.getClusterState cluster)
           topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails
                             (TestPlannerSpout. false) (Integer. 3))}
@@ -254,8 +264,8 @@
                               (Utils/getGlobalStreamId "2" nil)
                               (Thrift/prepareNoneGrouping)}
                              (TestPlannerBolt.) (Integer. 4))})
-          _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
-          _ (advance-cluster-time cluster 11)
+          _ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
+          _ (.advanceClusterTime cluster 11)
           task-info (storm-component->task-info cluster "mystorm")]
       (check-consistency cluster "mystorm")
       ;; 3 should be assigned once (if it were optimized, we'd have
@@ -265,8 +275,8 @@
       (is (= 4 (count (task-info "2"))))
       (is (= 1 (count (task-info "3"))))
       (is (= 4 (storm-num-workers state "mystorm")))
-      (submit-local-topology nimbus "storm2" {TOPOLOGY-WORKERS 20} topology2)
-      (advance-cluster-time cluster 11)
+      (.submitTopology cluster "storm2" {TOPOLOGY-WORKERS 20} topology2)
+      (.advanceClusterTime cluster 11)
       (check-consistency cluster "storm2")
       (is (= 2 (count (.assignments state nil))))
       (let [task-info (storm-component->task-info cluster "storm2")]
@@ -297,17 +307,17 @@
 
 
 (deftest test-auto-credentials
-  (with-simulated-time-local-cluster [cluster :supervisors 6
-                                      :ports-per-supervisor 3
-                                      :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 6)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                                                     TOPOLOGY-ACKER-EXECUTORS 0
                                                     TOPOLOGY-EVENTLOGGER-EXECUTORS 0
                                                     NIMBUS-CREDENTIAL-RENEW-FREQ-SECS 10
                                                     NIMBUS-CREDENTIAL-RENEWERS (list "org.apache.storm.MockAutoCred")
                                                     NIMBUS-AUTO-CRED-PLUGINS (list "org.apache.storm.MockAutoCred")
-                                                    }]
-    (let [state (:storm-cluster-state cluster)
-          nimbus (:nimbus cluster)
+                                                    })))]
+    (let [state (.getClusterState cluster)
           topology-name "test-auto-cred-storm"
           submitOptions (SubmitOptions. TopologyInitialStatus/INACTIVE)
           - (.set_creds submitOptions (Credentials. (HashMap.)))
@@ -322,14 +332,14 @@
                             {(Utils/getGlobalStreamId "2" nil)
                              (Thrift/prepareNoneGrouping)}
                             (TestPlannerBolt.))})
-          _ (submit-local-topology-with-opts nimbus topology-name {TOPOLOGY-WORKERS 4
+          _ (.submitTopologyWithOpts cluster topology-name {TOPOLOGY-WORKERS 4
                                                                TOPOLOGY-AUTO-CREDENTIALS (list "org.apache.storm.MockAutoCred")
                                                                } topology submitOptions)
           credentials (getCredentials cluster topology-name)]
       ; check that the credentials have nimbus auto generated cred
       (is (= (.get credentials MockAutoCred/NIMBUS_CRED_KEY) MockAutoCred/NIMBUS_CRED_VAL))
       ;advance cluster time so the renewers can execute
-      (advance-cluster-time cluster 20)
+      (.advanceClusterTime cluster 20)
       ;check that renewed credentials replace the original credential.
       (is (= (.get (getCredentials cluster topology-name) MockAutoCred/NIMBUS_CRED_KEY) MockAutoCred/NIMBUS_CRED_RENEW_VAL))
       (is (= (.get (getCredentials cluster topology-name) MockAutoCred/GATEWAY_CRED_KEY) MockAutoCred/GATEWAY_CRED_RENEW_VAL)))))
@@ -347,19 +357,19 @@
        ~(first lexpr))))
 
 (deftest test-isolated-assignment
-  (with-simulated-time-local-cluster [cluster :supervisors 6
-                               :ports-per-supervisor 3
-                               :inimbus (isolation-nimbus)
-                               :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 6)
+                                      (.withINimbus (isolation-nimbus))
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                                              TOPOLOGY-ACKER-EXECUTORS 0
                                              TOPOLOGY-EVENTLOGGER-EXECUTORS 0
                                              STORM-SCHEDULER "org.apache.storm.scheduler.IsolationScheduler"
                                              ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
                                              NIMBUS-MONITOR-FREQ-SECS 10
-                                             }]
+                                             })))]
     (letlocals
-      (bind state (:storm-cluster-state cluster))
-      (bind nimbus (:nimbus cluster))
+      (bind state (.getClusterState cluster))
       (bind topology (Thrift/buildTopology
                       {"1" (Thrift/prepareSpoutDetails
                              (TestPlannerSpout. false) (Integer. 3))}
@@ -372,14 +382,14 @@
                               (Thrift/prepareNoneGrouping)}
                              (TestPlannerBolt.))}))
 
-      (submit-local-topology nimbus "noniso" {TOPOLOGY-WORKERS 4} topology)
-      (advance-cluster-time cluster 11)
+      (.submitTopology cluster "noniso" {TOPOLOGY-WORKERS 4} topology)
+      (.advanceClusterTime cluster 11)
       (is (= 4 (topology-num-nodes state "noniso")))
       (is (= 4 (storm-num-workers state "noniso")))
 
-      (submit-local-topology nimbus "tester1" {TOPOLOGY-WORKERS 6} topology)
-      (submit-local-topology nimbus "tester2" {TOPOLOGY-WORKERS 6} topology)
-      (advance-cluster-time cluster 11)
+      (.submitTopology cluster "tester1" {TOPOLOGY-WORKERS 6} topology)
+      (.submitTopology cluster "tester2" {TOPOLOGY-WORKERS 6} topology)
+      (.advanceClusterTime cluster 11)
 
       (bind task-info-tester1 (storm-component->task-info cluster "tester1"))
       (bind task-info-tester2 (storm-component->task-info cluster "tester2"))
@@ -401,7 +411,7 @@
       (bind tester1-slots (topology-slots state "tester1"))
       (bind tester2-slots (topology-slots state "tester2"))
       (bind noniso-slots (topology-slots state "noniso"))
-      (advance-cluster-time cluster 20)
+      (.advanceClusterTime cluster 20)
       (is (= tester1-slots (topology-slots state "tester1")))
       (is (= tester2-slots (topology-slots state "tester2")))
       (is (= noniso-slots (topology-slots state "noniso")))
@@ -409,9 +419,11 @@
       )))
 
 (deftest test-zero-executor-or-tasks
-  (with-simulated-time-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
-    (let [state (:storm-cluster-state cluster)
-          nimbus (:nimbus cluster)
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 6)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+    (let [state (.getClusterState cluster)
           topology (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails
                            (TestPlannerSpout. false) (Integer. 3)
@@ -426,8 +438,8 @@
                             (Thrift/prepareNoneGrouping)}
                            (TestPlannerBolt.) nil
                            {TOPOLOGY-TASKS 5})})
-          _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
-          _ (advance-cluster-time cluster 11)
+          _ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
+          _ (.advanceClusterTime cluster 11)
           task-info (storm-component->task-info cluster "mystorm")]
       (check-consistency cluster "mystorm")
       (is (= 0 (count (task-info "1"))))
@@ -438,9 +450,10 @@
 
 ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (deftest test-executor-assignments
-  (with-simulated-time-local-cluster[cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
-    (let [nimbus (:nimbus cluster)
-          topology (Thrift/buildTopology
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+    (let [topology (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails
                            (TestPlannerSpout. true) (Integer. 3)
                            {TOPOLOGY-TASKS 5})}
@@ -453,8 +466,8 @@
                            {(Utils/getGlobalStreamId "2" nil)
                             (Thrift/prepareNoneGrouping)}
                            (TestPlannerBolt.) (Integer. 3))})
-          _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
-          _ (advance-cluster-time cluster 11)
+          _ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
+          _ (.advanceClusterTime cluster 11)
           task-info (storm-component->task-info cluster "mystorm")
           executor-info (->> (storm-component->executor-info cluster "mystorm")
                              (map-val #(map executor->tasks %)))]
@@ -470,10 +483,12 @@
       )))
 
 (deftest test-over-parallelism-assignment
-  (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
-                       :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
-    (let [state (:storm-cluster-state cluster)
-          nimbus (:nimbus cluster)
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 2)
+                                      (.withPortsPerSupervisor 5)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+    (let [state (.getClusterState cluster)
           topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails
                             (TestPlannerSpout. true) (Integer. 21))}
@@ -489,8 +504,8 @@
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareNoneGrouping)}
                             (TestPlannerBolt.) (Integer. 10))})
-          _ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology)
-          _ (advance-cluster-time cluster 11)
+          _ (.submitTopology cluster "test" {TOPOLOGY-WORKERS 7} topology)
+          _ (.advanceClusterTime cluster 11)
           task-info (storm-component->task-info cluster "test")]
       (check-consistency cluster "test")
       (is (= 21 (count (task-info "1"))))
@@ -502,81 +517,84 @@
 
 (deftest test-topo-history
   (let [group-mapper (Mockito/mock IGroupMappingServiceProvider)]
-    (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
-                                        :group-mapper group-mapper
-                                        :daemon-conf {SUPERVISOR-ENABLE false
+    (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                        (.withSimulatedTime)
+                                        (.withSupervisors 2)
+                                        (.withPortsPerSupervisor 5)
+                                        (.withGroupMapper group-mapper)
+                                        (.withDaemonConf {SUPERVISOR-ENABLE false
                                                       NIMBUS-ADMINS ["admin-user"]
                                                       NIMBUS-TASK-TIMEOUT-SECS 30
                                                       NIMBUS-MONITOR-FREQ-SECS 10
-                                                      TOPOLOGY-ACKER-EXECUTORS 0}]
+                                                      TOPOLOGY-ACKER-EXECUTORS 0})))]
       (.thenReturn (Mockito/when (.getGroups group-mapper (Mockito/anyObject))) #{"alice-group"})
       (letlocals
-        (bind conf (:daemon-conf cluster))
+        (bind conf (.getDaemonConf cluster))
         (bind topology (Thrift/buildTopology
                          {"1" (Thrift/prepareSpoutDetails
                                 (TestPlannerSpout. true) (Integer. 4))}
                          {}))
-        (bind state (:storm-cluster-state cluster))
-        (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
+        (bind state (.getClusterState cluster))
+        (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
         (bind storm-id (StormCommon/getStormId state "test"))
-        (advance-cluster-time cluster 5)
-        (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
-        (.killTopology (:nimbus cluster) "test")
+        (.advanceClusterTime cluster 5)
+        (is (not-nil? (.stormBase state storm-id nil)))
+        (is (not-nil? (.assignmentInfo state storm-id nil)))
+        (.killTopology (.getNimbus cluster) "test")
         ;; check that storm is deactivated but alive
-        (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type)))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
-        (advance-cluster-time cluster 35)
+        (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id nil))))
+        (is (not-nil? (.assignmentInfo state storm-id nil)))
+        (.advanceClusterTime cluster 35)
         ;; kill topology read on group
-        (submit-local-topology (:nimbus cluster) "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
+        (.submitTopology cluster "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
         (bind storm-id-killgroup (StormCommon/getStormId state "killgrouptest"))
-        (advance-cluster-time cluster 5)
-        (is (not-nil? (clojurify-storm-base (.stormBase state storm-id-killgroup nil))))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil))))
-        (.killTopology (:nimbus cluster) "killgrouptest")
+        (.advanceClusterTime cluster 5)
+        (is (not-nil? (.stormBase state storm-id-killgroup nil)))
+        (is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
+        (.killTopology (.getNimbus cluster) "killgrouptest")
         ;; check that storm is deactivated but alive
-        (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id-killgroup nil)) :status :type)))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil))))
-        (advance-cluster-time cluster 35)
+        (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id-killgroup nil))))
+        (is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
+        (.advanceClusterTime cluster 35)
         ;; kill topology can't read
-        (submit-local-topology (:nimbus cluster) "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
+        (.submitTopology cluster "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
         (bind storm-id-killnoread (StormCommon/getStormId state "killnoreadtest"))
-        (advance-cluster-time cluster 5)
-        (is (not-nil? (clojurify-storm-base (.stormBase state storm-id-killnoread nil))))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil))))
-        (.killTopology (:nimbus cluster) "killnoreadtest")
+        (.advanceClusterTime cluster 5)
+        (is (not-nil? (.stormBase state storm-id-killnoread nil)))
+        (is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
+        (.killTopology (.getNimbus cluster) "killnoreadtest")
         ;; check that storm is deactivated but alive
-        (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id-killnoread nil)) :status :type)))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil))))
-        (advance-cluster-time cluster 35)
+        (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id-killnoread nil))))
+        (is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
+        (.advanceClusterTime cluster 35)
 
         ;; active topology can read
-        (submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
-        (advance-cluster-time cluster 11)
+        (.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
+        (.advanceClusterTime cluster 11)
         (bind storm-id2 (StormCommon/getStormId state "2test"))
-        (is (not-nil? (clojurify-storm-base (.stormBase state storm-id2 nil))))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id2 nil))))
+        (is (not-nil? (.stormBase state storm-id2 nil)))
+        (is (not-nil? (.assignmentInfo state storm-id2 nil)))
         ;; active topology can not read
-        (submit-local-topology (:nimbus cluster) "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice"]} topology)
-        (advance-cluster-time cluster 11)
+        (.submitTopology cluster "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice"]} topology)
+        (.advanceClusterTime cluster 11)
         (bind storm-id3 (StormCommon/getStormId state "testnoread"))
-        (is (not-nil? (clojurify-storm-base (.stormBase state storm-id3 nil))))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id3 nil))))
+        (is (not-nil? (.stormBase state storm-id3 nil)))
+        (is (not-nil? (.assignmentInfo state storm-id3 nil)))
         ;; active topology can read based on group
-        (submit-local-topology (:nimbus cluster) "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
-        (advance-cluster-time cluster 11)
+        (.submitTopology cluster "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
+        (.advanceClusterTime cluster 11)
         (bind storm-id4 (StormCommon/getStormId state "testreadgroup"))
-        (is (not-nil? (clojurify-storm-base (.stormBase state storm-id4 nil))))
-        (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
+        (is (not-nil? (.stormBase state storm-id4 nil)))
+        (is (not-nil? (.assignmentInfo state storm-id4 nil)))
         ;; at this point have 1 running, 1 killed topo
-        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) (System/getProperty "user.name")))))]
+        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) (System/getProperty "user.name")))))]
           (log-message "Checking user " (System/getProperty "user.name") " " hist-topo-ids)
           (is (= 4 (count hist-topo-ids)))
           (is (= storm-id2 (get hist-topo-ids 0)))
           (is (= storm-id-killgroup (get hist-topo-ids 1)))
           (is (= storm-id (get hist-topo-ids 2)))
           (is (= storm-id4 (get hist-topo-ids 3))))
-        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) "alice"))))]
+        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "alice"))))]
           (log-message "Checking user alice " hist-topo-ids)
           (is (= 5 (count hist-topo-ids)))
           (is (= storm-id2 (get hist-topo-ids 0)))
@@ -584,7 +602,7 @@
           (is (= storm-id (get hist-topo-ids 2)))
           (is (= storm-id3 (get hist-topo-ids 3)))
           (is (= storm-id4 (get hist-topo-ids 4))))
-        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) "admin-user"))))]
+        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "admin-user"))))]
           (log-message "Checking user admin-user " hist-topo-ids)
           (is (= 6 (count hist-topo-ids)))
           (is (= storm-id2 (get hist-topo-ids 0)))
@@ -593,122 +611,128 @@
           (is (= storm-id (get hist-topo-ids 3)))
           (is (= storm-id3 (get hist-topo-ids 4)))
           (is (= storm-id4 (get hist-topo-ids 5))))
-        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) "group-only-user"))))]
+        (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "group-only-user"))))]
           (log-message "Checking user group-only-user " hist-topo-ids)
           (is (= 2 (count hist-topo-ids)))
           (is (= storm-id-killgroup (get hist-topo-ids 0)))
           (is (= storm-id4 (get hist-topo-ids 1))))))))
 
 (deftest test-kill-storm
-  (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 2)
+                                      (.withPortsPerSupervisor 5)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-TASK-TIMEOUT-SECS 30
                   NIMBUS-MONITOR-FREQ-SECS 10
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
-      (bind conf (:daemon-conf cluster))
+      (bind conf (.getDaemonConf cluster))
       (bind topology (Thrift/buildTopology
                        {"1" (Thrift/prepareSpoutDetails
                               (TestPlannerSpout. true) (Integer. 14))}
                        {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
       (bind storm-id (StormCommon/getStormId state "test"))
-      (advance-cluster-time cluster 15)
-      (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
-      (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
-      (.killTopology (:nimbus cluster) "test")
+      (.advanceClusterTime cluster 15)
+      (is (not-nil? (.stormBase state storm-id nil)))
+      (is (not-nil? (.assignmentInfo state storm-id nil)))
+      (.killTopology (.getNimbus cluster) "test")
       ;; check that storm is deactivated but alive
-      (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type)))
-      (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
-      (advance-cluster-time cluster 18)
+      (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id nil))))
+      (is (not-nil? (.assignmentInfo state storm-id nil)))
+      (.advanceClusterTime cluster 18)
       ;; check that storm is deactivated but alive
       (is (= 1 (count (.heartbeatStorms state))))
-      (advance-cluster-time cluster 3)
-      (is (nil? (clojurify-storm-base (.stormBase state storm-id nil))))
-      (is (nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
+      (.advanceClusterTime cluster 3)
+      (is (nil? (.stormBase state storm-id nil)))
+      (is (nil? (.assignmentInfo state storm-id nil)))
 
       ;; cleanup happens on monitoring thread
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (is (empty? (.heartbeatStorms state)))
       ;; TODO: check that code on nimbus was cleaned up locally...
 
-      (is (thrown? NotAliveException (.killTopology (:nimbus cluster) "lalala")))
-      (submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
-      (advance-cluster-time cluster 11)
-      (is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
-      (advance-cluster-time cluster 11)
+      (is (thrown? NotAliveException (.killTopology (.getNimbus cluster) "lalala")))
+      (.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
+      (.advanceClusterTime cluster 11)
+      (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
+      (.advanceClusterTime cluster 11)
       (bind storm-id (StormCommon/getStormId state "2test"))
-      (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
-      (.killTopology (:nimbus cluster) "2test")
-      (is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
-      (advance-cluster-time cluster 11)
+      (is (not-nil? (.stormBase state storm-id nil)))
+      (.killTopology (.getNimbus cluster) "2test")
+      (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
+      (.advanceClusterTime cluster 11)
       (is (= 1 (count (.heartbeatStorms state))))
 
-      (advance-cluster-time cluster 6)
-      (is (nil? (clojurify-storm-base (.stormBase state storm-id nil))))
-      (is (nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 6)
+      (is (nil? (.stormBase state storm-id nil)))
+      (is (nil? (.assignmentInfo state storm-id nil)))
+      (.advanceClusterTime cluster 11)
       (is (= 0 (count (.heartbeatStorms state))))
 
-      (submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
+      (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
       (bind storm-id3 (StormCommon/getStormId state "test3"))
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (.removeStorm state storm-id3)
-      (is (nil? (clojurify-storm-base (.stormBase state storm-id3 nil))))
-      (is (nil? (clojurify-assignment (.assignmentInfo state storm-id3 nil))))
+      (is (nil? (.stormBase state storm-id3 nil)))
+      (is (nil? (.assignmentInfo state storm-id3 nil)))
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (is (= 0 (count (.heartbeatStorms state))))
 
       ;; this guarantees that monitor thread won't trigger for 10 more seconds
-      (advance-time-secs! 11)
-      (wait-until-cluster-waiting cluster)
+      (Time/advanceTimeSecs 11)
+      (.waitForIdle cluster)
 
-      (submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
+      (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
       (bind storm-id3 (StormCommon/getStormId state "test3"))
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind executor-id (first (topology-executors cluster storm-id3)))
 
       (do-executor-heartbeat cluster storm-id3 executor-id)
 
-      (.killTopology (:nimbus cluster) "test3")
-      (advance-cluster-time cluster 6)
+      (.killTopology (.getNimbus cluster) "test3")
+      (.advanceClusterTime cluster 6)
       (is (= 1 (count (.heartbeatStorms state))))
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (is (= 0 (count (.heartbeatStorms state))))
 
       ;; test kill with opts
-      (submit-local-topology (:nimbus cluster) "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
-      (advance-cluster-time cluster 11)
-      (.killTopologyWithOpts (:nimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
+      (.submitTopology cluster "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
+      (.advanceClusterTime cluster 11)
+      (.killTopologyWithOpts (.getNimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
       (bind storm-id4 (StormCommon/getStormId state "test4"))
-      (advance-cluster-time cluster 9)
-      (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
-      (advance-cluster-time cluster 2)
-      (is (nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
+      (.advanceClusterTime cluster 9)
+      (is (not-nil? (.assignmentInfo state storm-id4 nil)))
+      (.advanceClusterTime cluster 2)
+      (is (nil? (.assignmentInfo state storm-id4 nil)))
       )))
 
 (deftest test-reassignment
-  (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 2)
+                                      (.withPortsPerSupervisor 5)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-TASK-LAUNCH-SECS 60
                   NIMBUS-TASK-TIMEOUT-SECS 20
                   NIMBUS-MONITOR-FREQ-SECS 10
                   NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
-      (bind conf (:daemon-conf cluster))
+      (bind conf (.getDaemonConf cluster))
       (bind topology (Thrift/buildTopology
                        {"1" (Thrift/prepareSpoutDetails
                               (TestPlannerSpout. true) (Integer. 2))}
                        {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
-      (advance-cluster-time cluster 11)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
+      (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
       (bind storm-id (StormCommon/getStormId state "test"))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
@@ -717,7 +741,7 @@
       (bind _ (log-message "ass1, t0: " (pr-str ass1)))
       (bind _ (log-message "ass2, t0: " (pr-str ass2)))
 
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
       (bind _ (log-message "ass1, t30, pre beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t30, pre beat: " (pr-str ass2)))
       (do-executor-heartbeat cluster storm-id executor-id1)
@@ -725,7 +749,7 @@
       (bind _ (log-message "ass1, t30, post beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t30, post beat: " (pr-str ass2)))
 
-      (advance-cluster-time cluster 13)
+      (.advanceClusterTime cluster 13)
       (bind _ (log-message "ass1, t43, pre beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t43, pre beat: " (pr-str ass2)))
       (is (= ass1 (executor-assignment cluster storm-id executor-id1)))
@@ -734,7 +758,7 @@
       (bind _ (log-message "ass1, t43, post beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t43, post beat: " (pr-str ass2)))
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind _ (log-message "ass1, t54, pre beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t54, pre beat: " (pr-str ass2)))
       (do-executor-heartbeat cluster storm-id executor-id1)
@@ -745,7 +769,7 @@
 
       ; have to wait an extra 10 seconds because nimbus may not
       ; resynchronize its heartbeat time till monitor-time secs after
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind _ (log-message "ass1, t65, pre beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t65, pre beat: " (pr-str ass2)))
       (do-executor-heartbeat cluster storm-id executor-id1)
@@ -754,7 +778,7 @@
       (is (= ass1 (executor-assignment cluster storm-id executor-id1)))
       (check-consistency cluster "test")
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind _ (log-message "ass1, t76, pre beat: " (pr-str ass1)))
       (bind _ (log-message "ass2, t76, pre beat: " (pr-str ass2)))
       (is (= ass1 (executor-assignment cluster storm-id executor-id1)))
@@ -764,98 +788,100 @@
       (bind _ (log-message "ass2, t76, post beat: " (pr-str ass2)))
       (check-consistency cluster "test")
 
-      (advance-cluster-time cluster 31)
+      (.advanceClusterTime cluster 31)
       (is (not= ass1 (executor-assignment cluster storm-id executor-id1)))
       (is (= ass2 (executor-assignment cluster storm-id executor-id2)))  ; tests launch timeout
       (check-consistency cluster "test")
 
 
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
-      (bind active-supervisor (first ass2))
-      (kill-supervisor cluster active-supervisor)
+      (bind active-supervisor (.get_node ass2))
+      (.killSupervisor cluster active-supervisor)
 
       (doseq [i (range 12)]
         (do-executor-heartbeat cluster storm-id executor-id1)
         (do-executor-heartbeat cluster storm-id executor-id2)
-        (advance-cluster-time cluster 10)
+        (.advanceClusterTime cluster 10)
         )
       ;; tests that it doesn't reassign executors if they're heartbeating even if supervisor times out
       (is (= ass1 (executor-assignment cluster storm-id executor-id1)))
       (is (= ass2 (executor-assignment cluster storm-id executor-id2)))
       (check-consistency cluster "test")
 
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
 
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
       (is (not-nil? ass1))
       (is (not-nil? ass2))
-      (is (not= active-supervisor (first (executor-assignment cluster storm-id executor-id2))))
-      (is (not= active-supervisor (first (executor-assignment cluster storm-id executor-id1))))
+      (is (not= active-supervisor (.get_node (executor-assignment cluster storm-id executor-id2))))
+      (is (not= active-supervisor (.get_node (executor-assignment cluster storm-id executor-id1))))
       (check-consistency cluster "test")
 
       (doseq [supervisor-id (.supervisors state nil)]
-        (kill-supervisor cluster supervisor-id))
+        (.killSupervisor cluster supervisor-id))
 
-      (advance-cluster-time cluster 90)
+      (.advanceClusterTime cluster 90)
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
       (is (nil? ass1))
       (is (nil? ass2))
       (check-consistency cluster "test" :assigned? false)
 
-      (add-supervisor cluster)
-      (advance-cluster-time cluster 11)
+      (.addSupervisor cluster)
+      (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
       )))
 
 
 (deftest test-reassignment-to-constrained-cluster
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 0)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-TASK-LAUNCH-SECS 60
                   NIMBUS-TASK-TIMEOUT-SECS 20
                   NIMBUS-MONITOR-FREQ-SECS 10
                   NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
-      (add-supervisor cluster :ports 1 :id "a")
-      (add-supervisor cluster :ports 1 :id "b")
-      (bind conf (:daemon-conf cluster))
+      (.addSupervisor cluster 1 "a")
+      (.addSupervisor cluster 1 "b")
+      (bind conf (.getDaemonConf cluster))
       (bind topology (Thrift/buildTopology
                        {"1" (Thrift/prepareSpoutDetails
                               (TestPlannerSpout. true) (Integer. 2))}
                        {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
-      (advance-cluster-time cluster 11)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
+      (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
       (bind storm-id (StormCommon/getStormId state "test"))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
 
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
       (do-executor-heartbeat cluster storm-id executor-id1)
       (do-executor-heartbeat cluster storm-id executor-id2)
 
-      (advance-cluster-time cluster 13)
+      (.advanceClusterTime cluster 13)
       (is (= ass1 (executor-assignment cluster storm-id executor-id1)))
       (is (= ass2 (executor-assignment cluster storm-id executor-id2)))
-      (kill-supervisor cluster "b")
+      (.killSupervisor cluster "b")
       (do-executor-heartbeat cluster storm-id executor-id1)
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (do-executor-heartbeat cluster storm-id executor-id1)
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (do-executor-heartbeat cluster storm-id executor-id1)
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (do-executor-heartbeat cluster storm-id executor-id1)
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (do-executor-heartbeat cluster storm-id executor-id1)
 
       (check-consistency cluster "test")
@@ -866,42 +892,45 @@
   (check-distribution (vals slot-executors) distribution))
 
 (defn check-num-nodes [slot-executors num-nodes]
-  (let [nodes (->> slot-executors keys (map first) set)]
+  (let [nodes (->> slot-executors keys (map (fn [np] (.get_node np))) set)]
     (is (= num-nodes (count nodes)))
     ))
 
 (deftest test-reassign-squeezed-topology
-  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 1
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 1)
+                                      (.withPortsPerSupervisor 1)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-TASK-LAUNCH-SECS 60
                   NIMBUS-TASK-TIMEOUT-SECS 20
                   NIMBUS-MONITOR-FREQ-SECS 10
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
       (bind topology (Thrift/buildTopology
                         {"1" (Thrift/prepareSpoutDetails
                                (TestPlannerSpout. true) (Integer. 9))}
                         {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} topology)  ; distribution should be 2, 2, 2, 3 ideally
-      (advance-cluster-time cluster 11)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster "test" {TOPOLOGY-WORKERS 4} topology)  ; distribution should be 2, 2, 2, 3 ideally
+      (.advanceClusterTime cluster 11)
       (bind storm-id (StormCommon/getStormId state "test"))
       (bind slot-executors (slot-assignments cluster storm-id))
       (check-executor-distribution slot-executors [9])
       (check-consistency cluster "test")
 
-      (add-supervisor cluster :ports 2)
-      (advance-cluster-time cluster 11)
+      (.addSupervisor cluster 2)
+      (.advanceClusterTime cluster 11)
       (bind slot-executors (slot-assignments cluster storm-id))
       (bind executor->start (executor-start-times cluster storm-id))
       (check-executor-distribution slot-executors [3 3 3])
       (check-consistency cluster "test")
 
-      (add-supervisor cluster :ports 8)
+      (.addSupervisor cluster 8)
       ;; this actually works for any time > 0, since zookeeper fires an event causing immediate reassignment
       ;; doesn't work for time = 0 because it's not waiting for cluster yet, so test might happen before reassignment finishes
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind slot-executors2 (slot-assignments cluster storm-id))
       (bind executor->start2 (executor-start-times cluster storm-id))
       (check-executor-distribution slot-executors2 [2 2 2 3])
@@ -922,47 +951,49 @@
       )))
 
 (deftest test-rebalance
-  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 3
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 1)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-MONITOR-FREQ-SECS 10
                   TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
       (bind topology (Thrift/buildTopology
                         {"1" (Thrift/prepareSpoutDetails
                                (TestPlannerSpout. true) (Integer. 3))}
                         {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster
                              "test"
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind storm-id (StormCommon/getStormId state "test"))
-      (add-supervisor cluster :ports 3)
-      (add-supervisor cluster :ports 3)
+      (.addSupervisor cluster 3)
+      (.addSupervisor cluster 3)
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
 
       (bind slot-executors (slot-assignments cluster storm-id))
       ;; check that all workers are on one machine
       (check-executor-distribution slot-executors [1 1 1])
       (check-num-nodes slot-executors 1)
-      (.rebalance (:nimbus cluster) "test" (RebalanceOptions.))
+      (.rebalance (.getNimbus cluster) "test" (RebalanceOptions.))
 
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
       (check-executor-distribution slot-executors [1 1 1])
       (check-num-nodes slot-executors 1)
 
 
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
       (bind slot-executors (slot-assignments cluster storm-id))
       (check-executor-distribution slot-executors [1 1 1])
       (check-num-nodes slot-executors 3)
 
       (is (thrown? InvalidTopologyException
-                   (.rebalance (:nimbus cluster) "test"
+                   (.rebalance (.getNimbus cluster) "test"
                      (doto (RebalanceOptions.)
                        (.set_num_executors {"1" (int 0)})
                        ))))
@@ -970,23 +1001,25 @@
 
 ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (deftest test-rebalance-change-parallelism
-  (with-simulated-time-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 4)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-MONITOR-FREQ-SECS 10
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
       (bind topology (Thrift/buildTopology
                         {"1" (Thrift/prepareSpoutDetails
                                (TestPlannerSpout. true) (Integer. 6)
                                 {TOPOLOGY-TASKS 12})}
                         {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster
                              "test"
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (bind storm-id (StormCommon/getStormId state "test"))
       (bind checker (fn [distribution]
                       (check-executor-distribution
@@ -994,30 +1027,30 @@
                         distribution)))
       (checker [2 2 2])
 
-      (.rebalance (:nimbus cluster) "test"
+      (.rebalance (.getNimbus cluster) "test"
                   (doto (RebalanceOptions.)
                     (.set_num_workers (int 6))
                     ))
-      (advance-cluster-time cluster 29)
+      (.advanceClusterTime cluster 29)
       (checker [2 2 2])
-      (advance-cluster-time cluster 3)
+      (.advanceClusterTime cluster 3)
       (checker [1 1 1 1 1 1])
 
-      (.rebalance (:nimbus cluster) "test"
+      (.rebalance (.getNimbus cluster) "test"
                   (doto (RebalanceOptions.)
                     (.set_num_executors {"1" (int 1)})
                     ))
-      (advance-cluster-time cluster 29)
+      (.advanceClusterTime cluster 29)
       (checker [1 1 1 1 1 1])
-      (advance-cluster-time cluster 3)
+      (.advanceClusterTime cluster 3)
       (checker [1])
 
-      (.rebalance (:nimbus cluster) "test"
+      (.rebalance (.getNimbus cluster) "test"
                   (doto (RebalanceOptions.)
                     (.set_num_executors {"1" (int 8)})
                     (.set_num_workers 4)
                     ))
-      (advance-cluster-time cluster 32)
+      (.advanceClusterTime cluster 32)
       (checker [2 2 2 2])
       (check-consistency cluster "test")
 
@@ -1033,9 +1066,9 @@
  (let [assignments (.assignments state nil)]
    (log-message "Assignemts: " assignments)
    (let [id->node->ports (into {} (for [id assignments
-                                                :let [executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state id nil)))
+                                                :let [executor->node+port (.get_executor_node_port (.assignmentInfo state id nil))
                                                       node+ports (set (.values executor->node+port))
-                                                      node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [[node port] node+ports] {node [port]}))]]
+                                                      node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [np node+ports] {(.get_node np) [(first (.get_port np))]}))]]
                                                 {id node->ports}))
          _ (log-message "id->node->ports: " id->node->ports)
          all-nodes (apply merge-with (fn [a b] 
@@ -1047,12 +1080,15 @@
 )))
 
 (deftest test-rebalance-constrained-cluster
-  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 4
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withSupervisors 1)
+                                      (.withPortsPerSupervisor 4)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   NIMBUS-MONITOR-FREQ-SECS 10
                   TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
                   TOPOLOGY-ACKER-EXECUTORS 0
-                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+                  TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
     (letlocals
       (bind topology (Thrift/buildTopology
                         {"1" (Thrift/prepareSpoutDetails
@@ -1066,43 +1102,44 @@
                         {"1" (Thrift/prepareSpoutDetails
                                (TestPlannerSpout. true) (Integer. 3))}
                         {}))
-      (bind state (:storm-cluster-state cluster))
-      (submit-local-topology (:nimbus cluster)
+      (bind state (.getClusterState cluster))
+      (.submitTopology cluster
                              "test"
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
-      (submit-local-topology (:nimbus cluster)
+      (.submitTopology cluster
                              "test2"
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
-      (submit-local-topology (:nimbus cluster)
+      (.submitTopology cluster
                              "test3"
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
 
       (check-for-collisions state)
-      (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.)
+      (.rebalance (.getNimbus cluster) "test" (doto (RebalanceOptions.)
                     (.set_num_workers 4)
                     (.set_wait_secs 0)
                     ))
 
-      (advance-cluster-time cluster 11)
+      (.advanceClusterTime cluster 11)
       (check-for-collisions state)
 
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
       (check-for-collisions state)
       )))
 
 
 (deftest test-submit-invalid
-  (with-simulated-time-local-cluster [cluster
-    :daemon-conf {SUPERVISOR-ENABLE false
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withSimulatedTime)
+                                      (.withDaemonConf {SUPERVISOR-ENABLE false
                   TOPOLOGY-ACKER-EXECUTORS 0
                   TOPOLOGY-EVENTLOGGER-EXECUTORS 0
                   NIMBUS-EXECUTORS-PER-TOPOLOGY 8
-                  NIMBUS-SLOTS-PER-TOPOLOGY 8}]
+                  NIMBUS-SLOTS-PER-TOPOLOGY 8})))]
     (letlocals
       (bind topology (Thrift/buildTopology
                         {"1" (Thrift/prepareSpoutDetails
@@ -1110,7 +1147,7 @@
                                {TOPOLOGY-TASKS 1})}
                         {}))
       (is (thrown? InvalidTopologyException
-        (submit-local-topology (:nimbus cluster)
+        (.submitTopology cluster
                                "test/aaa"
                                {}
                                topology)))
@@ -1119,9 +1156,9 @@
                              (TestPlannerSpout. true) (Integer. 16)
                              {TOPOLOGY-TASKS 16})}
                       {}))
-      (bind state (:storm-cluster-state cluster))
+      (bind state (.getClusterState cluster))
       (is (thrown? InvalidTopologyException
-                   (submit-local-topology (:nimbus cluster)
+                   (.submitTopology cluster
                                           "test"
                                           {TOPOLOGY-WORKERS 3}
                                           topology)))
@@ -1131,67 +1168,67 @@
                              {TOPOLOGY-TASKS 5})}
                       {}))
       (is (thrown? InvalidTopologyException
-                   (submit-local-topology (:nimbus cluster)
+                   (.submitTopology cluster
                                           "test"
                                           {TOPOLOGY-WORKERS 16}
-                                          topology)))
-      (is (nil? (submit-local-topology (:nimbus cluster)
-                                       "test"
-                                       {TOPOLOGY-WORKERS 8}
-                                       topology))))))
+                                          topology))))))
 
 (deftest test-clean-inbox
   "Tests that the inbox correctly cleans jar files."
-  (with-simulated-time
-    (with-local-tmp [dir-location]
-      (let [dir (File. dir-location)
-            mk-file (fn [name seconds-ago]
-                      (let [f (File. (str dir-location "/" name))
-                            t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
-                        (FileUtils/touch f)
-                        (.setLastModified f t)))
-            assert-files-in-dir (fn [compare-file-names]
-                                  (let [file-names (map #(.getName %) (file-seq dir))]
-                                    (is (= (sort compare-file-names)
-                                          (sort (filter #(.endsWith % ".jar") file-names))
-                                          ))))]
-        ;; Make three files a.jar, b.jar, c.jar.
-        ;; a and b are older than c and should be deleted first.
-        (advance-time-secs! 100)
-        (doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
-          (apply mk-file fs))
-        (assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
-        (Nimbus/cleanInbox dir-location 10)
-        (assert-files-in-dir ["c.jar"])
-        ;; Cleanit again, c.jar should stay
-        (advance-time-secs! 5)
-        (Nimbus/cleanInbox dir-location 10)
-        (assert-files-in-dir ["c.jar"])
-        ;; Advance time, clean again, c.jar should be deleted.
-        (advance-time-secs! 5)
-        (Nimbus/cleanInbox dir-location 10)
-        (assert-files-in-dir [])
-        ))))
+  (with-open [_ (Time$SimulatedTime.)
+              tmp-path (TmpPath. )]
+    (let [dir-location (.getPath tmp-path)
+          dir (File. dir-location)
+          mk-file (fn [name seconds-ago]
+                    (let [f (File. (str dir-location "/" name))
+                          t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
+                      (FileUtils/touch f)
+                      (.setLastModified f t)))
+          assert-files-in-dir (fn [compare-file-names]
+                                (let [file-names (map #(.getName %) (file-seq dir))]
+                                  (is (= (sort compare-file-names)
+                                        (sort (filter #(.endsWith % ".jar") file-names))
+                                        ))))]
+      ;; Make three files a.jar, b.jar, c.jar.
+      ;; a and b are older than c and should be deleted first.
+      (Time/advanceTimeSecs 100)
+      (doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
+        (apply mk-file fs))
+      (assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
+      (Nimbus/cleanInbox dir-location 10)
+      (assert-files-in-dir ["c.jar"])
+      ;; Cleanit again, c.jar should stay
+      (Time/advanceTimeSecs 5)
+      (Nimbus/cleanInbox dir-location 10)
+      (assert-files-in-dir ["c.jar"])
+      ;; Advance time, clean again, c.jar should be deleted.
+      (Time/advanceTimeSecs 5)
+      (Nimbus/cleanInbox dir-location 10)
+      (assert-files-in-dir [])
+      )))
 
 (defn wait-for-status [nimbus name status]
-  (while-timeout 5000
-    (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
-          topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
-      (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
-      (not= topo-status status))
-    (Thread/sleep 100)))
+  (Testing/whileTimeout 5000
+    (reify Testing$Condition
+      (exec [this]
+        (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
+              topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
+          (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
+          (not= topo-status status))))
+    (fn [] (Thread/sleep 100))))
 
 (deftest test-leadership
   "Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
-  (with-inprocess-zookeeper zk-port
-    (with-local-tmp [nimbus-dir]
-      (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
+  (with-open [zk (InProcessZookeeper. )]
+    (with-open [tmp-nimbus-dir (TmpPath.)
+                _ (MockedZookeeper. (proxy [Zookeeper] []
+                      (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+      (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                        {STORM-ZOOKEEPER-SERVERS ["localhost"]
                         STORM-CLUSTER-MODE "local"
-                        STORM-ZOOKEEPER-PORT zk-port
+                        STORM-ZOOKEEPER-PORT (.getPort zk)
                         STORM-LOCAL-DIR nimbus-dir}))
           (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
           (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
@@ -1202,7 +1239,7 @@
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector :is-leader false))))]
+                          (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. false))))]
 
             (letlocals
               (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1210,7 +1247,7 @@
               (.launchServer non-leader-nimbus)
 
               ;first we verify that the master nimbus can perform all actions, even with another nimbus present.
-              (submit-local-topology nimbus "t1" {} topology)
+              (.submitTopology nimbus "t1" nil "{}" topology)
               ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
               (.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
               (wait-for-status nimbus "t1" "ACTIVE") 
@@ -1221,9 +1258,10 @@
 
               ;now we verify that non master nimbus can not perform any of the actions.
               (is (thrown? RuntimeException
-                    (submit-local-topology non-leader-nimbus
+                    (.submitTopology non-leader-nimbus
                       "failing"
-                      {}
+                      nil
+                      "{}"
                       topology)))
 
               (is (thrown? RuntimeException
@@ -1245,15 +1283,14 @@
           (.disconnect cluster-state))))))
 
 (deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
-  (with-local-cluster [cluster
-                       :daemon-conf {NIMBUS-AUTHORIZER
-                          "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
+  (with-open [cluster (.build (doto (LocalCluster$Builder. )
+                                      (.withDaemonConf {NIMBUS-AUTHORIZER
+                          "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
     (let [
-          nimbus (:nimbus cluster)
           topology (Thrift/buildTopology {} {})
          ]
       (is (thrown? AuthorizationException
-          (submit-local-topology-with-opts nimbus "mystorm" {} topology
+          (.submitTopologyWithOpts cluster "mystorm" {} topology
             (SubmitOptions. TopologyInitialStatus/INACTIVE))
         ))
     )
@@ -1263,9 +1300,12 @@
 (deftest test-nimbus-iface-methods-check-authorization
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)]
-    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
-                         :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
-      (let [nimbus (:nimbus cluster)
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withClusterState cluster-state)
+                            (.withBlobStore blob-store)
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
+      (let [nimbus (.getNimbus cluster)
             topology-name "test"
             topology-id "test-id"]
         (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
@@ -1278,13 +1318,14 @@
 
 (deftest test-nimbus-check-authorization-params
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)
-        mk-nimbus (fn
-                    [conf inimbus blob-store leader-elector group-mapper cluster-state]
-                    (Mockito/spy (mk-nimbus conf inimbus blob-store leader-elector group-mapper cluster-state)))] 
-  (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store :mk-nimbus mk-nimbus
-                       :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
-    (let [nimbus (:nimbus cluster)
+        blob-store (Mockito/mock BlobStore)]
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withClusterState cluster-state)
+                            (.withBlobStore blob-store)
+                            (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+    (let [nimbus (.getNimbus cluster)
           topology-name "test-nimbus-check-autho-params"
           topology-id "fake-id"
           topology (Thrift/buildTopology {} {})
@@ -1334,13 +1375,14 @@
 
 (deftest test-check-authorization-getSupervisorPageInfo
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)
-        mk-nimbus (fn
-                    [conf inimbus blob-store leader-elector group-mapper cluster-state]
-                    (Mockito/spy (mk-nimbus conf inimbus blob-store leader-elector group-mapper cluster-state)))]
-  (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store :mk-nimbus mk-nimbus
-                       :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
-    (let [nimbus (:nimbus cluster)
+        blob-store (Mockito/mock BlobStore)]
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withClusterState cluster-state)
+                            (.withBlobStore blob-store)
+                            (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+    (let [nimbus (.getNimbus cluster)
           expected-name "test-nimbus-check-autho-params"
           expected-conf {TOPOLOGY-NAME expected-name
                          TOPOLOGY-WORKERS 1
@@ -1354,7 +1396,6 @@
                      (.set_spouts {})
                      (.set_bolts {})
                      (.set_state_spouts {}))
-          clojurified-assignment (clojurify-assignment assignment)
           topo-assignment {expected-name assignment}
           check-auth-state (atom [])
           mock-check-authorization (fn [nimbus storm-name storm-conf operation] 
@@ -1371,17 +1412,16 @@
       (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) expected-conf)
       (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
       (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
-      (stubbing [clojurify-assignment clojurified-assignment]
-        (.getSupervisorPageInfo nimbus "super1" nil true)
+      (.getSupervisorPageInfo nimbus "super1" nil true)
  
-        ;; afterwards, it should get called twice
-        (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name expected-conf "getSupervisorPageInfo")
-        (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name expected-conf "getTopology"))))))
+      ;; afterwards, it should get called twice
+      (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getSupervisorPageInfo")
+      (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getTopology")))))
 
 (deftest test-nimbus-iface-getTopology-methods-throw-correctly
-  (with-local-cluster [cluster]
+  (with-open [cluster (LocalCluster. )]
     (let [
-          nimbus (:nimbus cluster)
+          nimbus (.getNimbus cluster)
           id "bogus ID"
          ]
       (is (thrown? NotAliveException (.getTopology nimbus id)))
@@ -1416,22 +1456,27 @@
   )
 )
 
+(defn mkStormBase [launch-time-secs storm-name status]
+  (doto (StormBase.)
+    (.set_name storm-name)
+    (.set_launch_time_secs (int launch-time-secs))
+    (.set_status status)))
+
 (deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)]
-  (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store]
-    (let [nimbus (:nimbus cluster)
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withClusterState cluster-state)
+                            (.withBlobStore blob-store)))]
+    (let [nimbus (.getNimbus cluster)
           bogus-secs 42
           bogus-type TopologyStatus/ACTIVE
           bogus-bases {
                  "1" nil
-                 "2" (thriftify-storm-base {:launch-time-secs bogus-secs
-                        :storm-name "id2-name"
-                        :status {:type bogus-type}})
+                 "2" (mkStormBase bogus-secs "id2-name" bogus-type)
                  "3" nil
-                 "4" (thriftify-storm-base {:launch-time-secs bogus-secs
-                        :storm-name "id4-name"
-                        :status {:type bogus-type}})
+                 "4" (mkStormBase bogus-secs "id4-name" bogus-type)
                 }
           topo-name "test-topo"
           topo-conf {TOPOLOGY-NAME topo-name
@@ -1499,8 +1544,10 @@
           ))))
 
 (deftest test-file-bogus-download
-  (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
-    (let [nimbus (:nimbus cluster)]
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+    (let [nimbus (.getNimbus cluster)]
       (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus nil)))
       (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "")))
       (is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "/bogus-path/foo")))
@@ -1509,25 +1556,28 @@
 (deftest test-validate-topo-config-on-submit
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)]
-    (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
-    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
-                         :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
-      (let [nimbus (:nimbus cluster)
-            topology (Thrift/buildTopology {} {})
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withClusterState cluster-state)
+                            (.withBlobStore blob-store)
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+      (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
+      (let [topology (Thrift/buildTopology {} {})
             bad-config {"topology.isolate.machines" "2"}]
         (is (thrown-cause? InvalidTopologyException
-          (submit-local-topology-with-opts nimbus "test" bad-config topology
+          (.submitTopologyWithOpts cluster "test" bad-config topology
                                            (SubmitOptions.))))))))
 
 (deftest test-stateless-with-scheduled-topology-to-be-killed
   ; tests regression of STORM-856
-  (with-inprocess-zookeeper zk-port
-    (with-local-tmp [nimbus-dir]
+  (with-open [zk (InProcessZookeeper. )]
+    (with-open [tmp-nimbus-dir (TmpPath. )]
+      (let [nimbus-dir (.getPath tmp-nimbus-dir)]
       (letlocals
         (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                      {STORM-ZOOKEEPER-SERVERS ["localhost"]
                       STORM-CLUSTER-MODE "local"
-                      STORM-ZOOKEEPER-PORT zk-port
+                      STORM-ZOOKEEPER-PORT (.getPort zk)
                       STORM-LOCAL-DIR nimbus-dir}))
         (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
         (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
@@ -1537,7 +1587,7 @@
                          {"1" (Thrift/prepareSpoutDetails
                                 (TestPlannerSpout. true) (Integer. 3))}
                          {}))
-        (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
+        (.submitTopology nimbus "t1" nil (str "{\"" TOPOLOGY-MESSAGE-TIMEOUT-SECS "\": 30}") topology)
         ; make transition for topology t1 to be killed -> nimbus applies this event to cluster state
         (.killTopology nimbus "t1")
         ; shutdown nimbus immediately to achieve nimbus doesn't handle event right now
@@ -1550,18 +1600,19 @@
         (.launchServer nimbus)
         (.shutdown nimbus)
         (.disconnect cluster-state)
-        ))))
+        )))))
 
 (deftest test-topology-action-notifier
-  (with-inprocess-zookeeper zk-port
-    (with-local-tmp [nimbus-dir]
-      (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
+  (with-open [zk (InProcessZookeeper. )]
+    (with-open [tmp-nimbus-dir (TmpPath.)
+                _ (MockedZookeeper. (proxy [Zookeeper] []
+                    (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+      (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                        {STORM-ZOOKEEPER-SERVERS ["localhost"]
                         STORM-CLUSTER-MODE "local"
-                        STORM-ZOOKEEPER-PORT zk-port
+                        STORM-ZOOKEEPER-PORT (.getPort zk)
                         STORM-LOCAL-DIR nimbus-dir
                         NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
           (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1573,7 +1624,7 @@
                            {"1" (Thrift/prepareSpoutDetails
                                   (TestPlannerSpout. true) (Integer. 3))}
                            {}))
-          (submit-local-topology nimbus "test-notification" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
+          (.submitTopology nimbus "test-notification" nil (str "{\"" TOPOLOGY-MESSAGE-TIMEOUT-SECS "\": 30}") topology)
 
           (.deactivate nimbus "test-notification")
 
@@ -1594,23 +1645,23 @@
           )))))
 
 (deftest test-debug-on-component
-  (with-local-cluster [cluster]
-    (let [nimbus (:nimbus cluster)
+  (with-open [cluster (LocalCluster. )]
+    (let [nimbus (.getNimbus cluster)
           topology (Thrift/buildTopology
                      {"spout" (Thrift/prepareSpoutDetails
                                 (TestPlannerSpout. true) (Integer. 3))}
                      {})]
-        (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
+        (.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
         (.debug nimbus "t1" "spout" true 100))))
 
 (deftest test-debug-on-global
-  (with-local-cluster [cluster]
-    (let [nimbus (:nimbus cluster)
+  (with-open [cluster (LocalCluster. )]
+    (let [nimbus (.getNimbus cluster)
           topology (Thrift/buildTopology
                      {"spout" (Thrift/prepareSpoutDetails
                                 (TestPlannerSpout. true) (Integer. 3))}
                      {})]
-      (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
+      (.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
       (.debug nimbus "t1" "" true 100))))
 
 ;; if the user sends an empty log config, nimbus will say that all 
@@ -1618,9 +1669,12 @@
 (deftest empty-save-config-results-in-all-unchanged-actions
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)]
-    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
-                         :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
-      (let [nimbus (:nimbus cluster)
+    (with-open [cluster (.build 
+                          (doto (LocalCluster$Builder. )
+                            (.withClusterState cluster-state)
+                            (.withBlobStore blob-store)
+                            (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+      (let [nimbus (.getNimbus cluster)
             previous-config (LogConfig.)
             mock-config (LogConfig.)
             expected-config (LogConfig.)]
@@ -1644,9 +1698,12 @@
 (deftest log-level-update-merges-and-flags-existent-log-level
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)]
-    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
-              

<TRUNCATED>