You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:03 UTC

[03/10] storm git commit: STORM-1281: LocalCluster, testing4j and testing.clj to java

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 2be4a7a..583c25e 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -16,21 +16,21 @@
 (ns org.apache.storm.cluster-test
   (:import [java.util Arrays]
            [org.apache.storm.nimbus NimbusInfo])
-  (:import [org.apache.storm.daemon.common Assignment StormBase SupervisorInfo])
-  (:import [org.apache.storm.generated NimbusSummary TopologyStatus])
+  (:import [org.apache.storm.generated SupervisorInfo StormBase Assignment NimbusSummary TopologyStatus NodeInfo Credentials])
   (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids Watcher$Event$EventType])
   (:import [org.mockito Mockito])
   (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
-  (:import [org.apache.storm.utils Time Utils ZookeeperAuthInfo ConfigUtils])
+  (:import [org.apache.storm.utils Time Time$SimulatedTime Utils ZookeeperAuthInfo ConfigUtils])
   (:import [org.apache.storm.cluster IStateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
   (:import [org.apache.storm.zookeeper Zookeeper])
   (:import [org.apache.storm.callback ZKStateChangedCallback])
+  (:import [org.apache.storm.testing InProcessZookeeper])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster])
   (:require [conjure.core])
   (:use [conjure core])
   (:use [clojure test])
-  (:use [org.apache.storm config util testing log converter])
+  (:use [org.apache.storm config util log])
   (:use [org.apache.storm.internal thrift]))
 
 (defn mk-config [zk-port]
@@ -53,8 +53,8 @@
   (byte-array (map byte vals)))
 
 (deftest test-basics
-  (with-inprocess-zookeeper zk-port
-    (let [state (mk-state zk-port)]
+  (with-open [zk (InProcessZookeeper. )]
+    (let [state (mk-state (.getPort zk))]
       (.set-data state "/root" (barr 1 2 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1 2 3) (.get-data state "/root" false)))
       (is (= nil (.get-data state "/a" false)))
@@ -74,9 +74,9 @@
       )))
 
 (deftest test-multi-state
-  (with-inprocess-zookeeper zk-port
-    (let [state1 (mk-state zk-port)
-          state2 (mk-state zk-port)]
+  (with-open [zk (InProcessZookeeper. )]
+    (let [state1 (mk-state (.getPort zk))
+          state2 (mk-state (.getPort zk))]
       (.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1) (.get-data state1 "/root" false)))
       (is (Arrays/equals (barr 1) (.get-data state2 "/root" false)))
@@ -88,10 +88,10 @@
       )))
 
 (deftest test-ephemeral
-  (with-inprocess-zookeeper zk-port
-    (let [state1 (mk-state zk-port)
-          state2 (mk-state zk-port)
-          state3 (mk-state zk-port)]
+  (with-open [zk (InProcessZookeeper. )]
+    (let [state1 (mk-state (.getPort zk))
+          state2 (mk-state (.getPort zk))
+          state3 (mk-state (.getPort zk))]
       (.set-ephemeral-node state1 "/a" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
       (is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
@@ -128,11 +128,11 @@
         ))))
 
 (deftest test-callbacks
-  (with-inprocess-zookeeper zk-port
+  (with-open [zk (InProcessZookeeper. )]
     (let [[state1-last-cb state1-cb] (mk-callback-tester)
-          state1 (mk-state zk-port state1-cb)
+          state1 (mk-state (.getPort zk) state1-cb)
           [state2-last-cb state2-cb] (mk-callback-tester)
-          state2 (mk-state zk-port state2-cb)]
+          state2 (mk-state (.getPort zk) state2-cb)]
       (.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
       (.get-data state2 "/root" true)
       (is (= nil @state1-last-cb))
@@ -178,45 +178,60 @@
       )))
 
 
+(defn mkAssignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources]
+  (doto (Assignment.)
+    (.set_executor_node_port executor->node+port)
+    (.set_executor_start_time_secs executor->start-time-secs)
+    (.set_worker_resources worker->resources)
+    (.set_node_host node->host)
+    (.set_master_code_dir master-code-dir)))
+
+(defn mkStormBase [storm-name launch-time-secs status num-workers]
+  (doto (StormBase.)
+    (.set_name storm-name)
+    (.set_launch_time_secs (int launch-time-secs))
+    (.set_status status)
+    (.set_num_workers (int num-workers))))
+
 (deftest test-storm-cluster-state-basics
-  (with-inprocess-zookeeper zk-port
-    (let [state (mk-storm-state zk-port)
-          assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {})
-          assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {})
+  (with-open [zk (InProcessZookeeper. )]
+    (let [state (mk-storm-state (.getPort zk))
+          assignment1 (mkAssignment "/aaa" {} {[1] (NodeInfo. "1" #{1001 1})} {} {})
+          assignment2 (mkAssignment "/aaa" {} {[2] (NodeInfo. "2" #{2002})} {} {})
           nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
           nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
           nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (Time/currentTimeSecs) false "v1")
           nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (Time/currentTimeSecs) false "v2")
-          base1 (StormBase. "/tmp/storm1" 1 {:type TopologyStatus/ACTIVE} 2 {} "" nil nil {})
-          base2 (StormBase. "/tmp/storm2" 2 {:type TopologyStatus/ACTIVE} 2 {} "" nil nil {})]
+          base1 (mkStormBase "/tmp/storm1" 1 TopologyStatus/ACTIVE 2)
+          base2 (mkStormBase "/tmp/storm2" 2 TopologyStatus/ACTIVE 2)]
       (is (= [] (.assignments state nil)))
-      (.setAssignment state "storm1" (thriftify-assignment assignment1))
-      (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm1" nil))))
-      (is (= nil (clojurify-assignment (.assignmentInfo state "storm3" nil))))
-      (.setAssignment state "storm1" (thriftify-assignment assignment2))
-      (.setAssignment state "storm3" (thriftify-assignment assignment1))
+      (.setAssignment state "storm1" assignment1)
+      (is (= assignment1 (.assignmentInfo state "storm1" nil)))
+      (is (= nil (.assignmentInfo state "storm3" nil)))
+      (.setAssignment state "storm1" assignment2)
+      (.setAssignment state "storm3" assignment1)
       (is (= #{"storm1" "storm3"} (set (.assignments state nil))))
-      (is (= assignment2 (clojurify-assignment (.assignmentInfo state "storm1" nil))))
-      (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm3" nil))))
+      (is (= assignment2 (.assignmentInfo state "storm1" nil)))
+      (is (= assignment1 (.assignmentInfo state "storm3" nil)))
 
       (is (= [] (.activeStorms state)))
-      (.activateStorm state "storm1" (thriftify-storm-base base1))
+      (.activateStorm state "storm1" base1)
       (is (= ["storm1"] (.activeStorms state)))
-      (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
-      (is (= nil (clojurify-storm-base (.stormBase state "storm2" nil))))
-      (.activateStorm state "storm2" (thriftify-storm-base base2))
-      (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
-      (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
+      (is (= base1 (.stormBase state "storm1" nil)))
+      (is (= nil (.stormBase state "storm2" nil)))
+      (.activateStorm state "storm2" base2)
+      (is (= base1 (.stormBase state "storm1" nil)))
+      (is (= base2 (.stormBase state "storm2" nil)))
       (is (= #{"storm1" "storm2"} (set (.activeStorms state))))
       (.removeStormBase state "storm1")
-      (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
+      (is (= base2 (.stormBase state "storm2" nil)))
       (is (= #{"storm2"} (set (.activeStorms state))))
 
-      (is (nil? (clojurify-crdentials (.credentials state "storm1" nil))))
-      (.setCredentials state "storm1" (thriftify-credentials {"a" "a"}) {})
-      (is (= {"a" "a"} (clojurify-crdentials (.credentials state "storm1" nil))))
-      (.setCredentials state "storm1" (thriftify-credentials {"b" "b"}) {})
-      (is (= {"b" "b"} (clojurify-crdentials (.credentials state "storm1" nil))))
+      (is (nil? (.credentials state "storm1" nil)))
+      (.setCredentials state "storm1" (doto (Credentials. ) (.set_creds {"a" "a"})) {})
+      (is (= {"a" "a"} (.get_creds (.credentials state "storm1" nil))))
+      (.setCredentials state "storm1" (doto (Credentials. ) (.set_creds {"b" "b"})) {})
+      (is (= {"b" "b"} (.get_creds (.credentials state "storm1" nil))))
 
       (is (= [] (.blobstoreInfo state "")))
       (.setupBlobstore state "key1" nimbusInfo1 (Integer/parseInt "1"))
@@ -254,21 +269,21 @@
     (.toString result)))
 
 (deftest test-storm-cluster-state-errors
-  (with-inprocess-zookeeper zk-port
-    (with-simulated-time
-      (let [state (mk-storm-state zk-port)]
+  (with-open [zk (InProcessZookeeper. )]
+    (with-open [_ (Time$SimulatedTime. )]
+      (let [state (mk-storm-state (.getPort zk))]
         (.reportError state "a" "1" (Utils/localHostname) 6700  (RuntimeException.))
         (validate-errors! state "a" "1" ["RuntimeException"])
-        (advance-time-secs! 1)
+        (Time/advanceTimeSecs 1)
         (.reportError state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
         (validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
         (doseq [i (range 10)]
           (.reportError state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
-          (advance-time-secs! 2))
+          (Time/advanceTimeSecs 2))
         (validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
         (doseq [i (range 5)]
           (.reportError state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
-          (advance-time-secs! 2))
+          (Time/advanceTimeSecs 2))
         (validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
                                           (repeat 5 "RuntimeException")
                                           ))
@@ -276,18 +291,29 @@
         (.disconnect state)
         ))))
 
+(defn mkSupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map]
+  (doto (SupervisorInfo.)
+    (.set_time_secs time-secs)
+    (.set_hostname hostname)
+    (.set_assignment_id assignment-id)
+    (.set_used_ports used-ports)
+    (.set_meta meta)
+    (.set_scheduler_meta scheduler-meta)
+    (.set_uptime_secs uptime-secs)
+    (.set_version version)
+    (.set_resources_map resources-map)))
 
 (deftest test-supervisor-state
-  (with-inprocess-zookeeper zk-port
-    (let [state1 (mk-storm-state zk-port)
-          state2 (mk-storm-state zk-port)
-          supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
-          supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
+  (with-open [zk (InProcessZookeeper. )]
+    (let [state1 (mk-storm-state (.getPort zk))
+          state2 (mk-storm-state (.getPort zk))
+          supervisor-info1 (mkSupervisorInfo 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
+          supervisor-info2 (mkSupervisorInfo 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
       (is (= [] (.supervisors state1 nil)))
-      (.supervisorHeartbeat state2 "2" (thriftify-supervisor-info supervisor-info2))
-      (.supervisorHeartbeat state1 "1" (thriftify-supervisor-info supervisor-info1))
-      (is (= supervisor-info2 (clojurify-supervisor-info (.supervisorInfo state1 "2"))))
-      (is (= supervisor-info1 (clojurify-supervisor-info (.supervisorInfo state1 "1"))))
+      (.supervisorHeartbeat state2 "2" supervisor-info2)
+      (.supervisorHeartbeat state1 "1" supervisor-info1)
+      (is (= supervisor-info2 (.supervisorInfo state1 "2")))
+      (is (= supervisor-info1 (.supervisorInfo state1 "1")))
       (is (= #{"1" "2"} (set (.supervisors state1 nil))))
       (is (= #{"1" "2"} (set (.supervisors state2 nil))))
       (.disconnect state2)
@@ -296,10 +322,10 @@
       )))
 
 (deftest test-cluster-authentication
-  (with-inprocess-zookeeper zk-port
+  (with-open [zk (InProcessZookeeper. )]
     (let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
           conf (merge
-                 (mk-config zk-port)
+                 (mk-config (.getPort zk))
                  {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
                   STORM-ZOOKEEPER-SESSION-TIMEOUT 10
                   STORM-ZOOKEEPER-RETRY-INTERVAL 5
@@ -310,7 +336,7 @@
       (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
       (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
       (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
-      (Utils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
+      (Utils/testSetupBuilder builder (str (.getPort zk) "/") conf (ZookeeperAuthInfo. conf))
       (is (nil?
             (try
               (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 827f536..7974e49 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -31,9 +31,9 @@
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.daemon DrpcServer])
   (:import [org.mockito ArgumentCaptor Mockito Matchers])
-  (:use [org.apache.storm config testing])
+  (:use [org.apache.storm config])
   (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm.daemon common drpc])
+  (:use [org.apache.storm.daemon drpc])
   (:use [conjure core]))
 
 (defbolt exclamation-bolt ["result" "return-info"] [tuple collector]

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index dc56f81..6eacf15 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -15,13 +15,12 @@
 ;; limitations under the License.
 (ns org.apache.storm.grouping-test
   (:use [clojure test])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
+  (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
            [org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
   (:import [org.apache.storm.grouping LoadMapping])
-  (:use [org.apache.storm testing log config])
+  (:use [org.apache.storm log config])
   (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm.daemon common])
-  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm LocalCluster$Builder Testing Thrift])
   (:import [org.apache.storm.utils Utils]
            (org.apache.storm.daemon GrouperFactory)))
 
@@ -79,7 +78,9 @@
     (is (<= load2 max2-prcnt))))
 
 (deftest test-field
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withSupervisors 4)))]
     (let [spout-phint 4
           bolt-phint 6
           topology (Thrift/buildTopology
@@ -90,21 +91,24 @@
                             (Thrift/prepareFieldsGrouping ["word"])}
                            (TestWordBytesCounter.) (Integer. spout-phint))
                      })
-          results (complete-topology
+          results (Testing/completeTopology
                     cluster
                     topology
-                    :mock-sources {"1" (->> [[(.getBytes "a")]
+                    (doto (CompleteTopologyParam.)
+                      (.setMockedSources (MockedSources. {"1" (->> [[(.getBytes "a")]
                                              [(.getBytes "b")]]
                                             (repeat (* spout-phint bolt-phint))
-                                            (apply concat))})]
-      (is (ms= (apply concat
+                                            (apply concat))}))))]
+      (is (Testing/multiseteq (apply concat
                       (for [value '("a" "b")
                             sum (range 1 (inc (* spout-phint bolt-phint)))]
-                        [[value sum]]))
-               (read-tuples results "2"))))))
+                        [[value (int sum)]]))
+               (Testing/readTuples results "2"))))))
 
 (deftest test-field
-  (with-simulated-time-local-cluster [cluster :supervisors 4]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withSupervisors 4)))]
     (let [spout-phint 4
           bolt-phint 6
           topology (Thrift/buildTopology
@@ -115,25 +119,28 @@
                             (Thrift/prepareFieldsGrouping ["word"])}
                            (TestWordBytesCounter.) (Integer. bolt-phint))
                      })
-          results (complete-topology
+          results (Testing/completeTopology
                     cluster
                     topology
-                    :mock-sources {"1" (->> [[(.getBytes "a")]
+                    (doto (CompleteTopologyParam.)
+                      (.setMockedSources (MockedSources. {"1" (->> [[(.getBytes "a")]
                                              [(.getBytes "b")]]
                                             (repeat (* spout-phint bolt-phint))
-                                            (apply concat))})]
-      (is (ms= (apply concat
+                                            (apply concat))}))))]
+      (is (Testing/multiseteq (apply concat
                       (for [value '("a" "b")
                             sum (range 1 (inc (* spout-phint bolt-phint)))]
-                        [[value sum]]))
-               (read-tuples results "2"))))))
+                        [[value (int sum)]]))
+               (Testing/readTuples results "2"))))))
 
 (defbolt id-bolt ["val"] [tuple collector]
   (emit-bolt! collector (.getValues tuple))
   (ack! collector tuple))
 
 (deftest test-custom-groupings
-  (with-simulated-time-local-cluster [cluster]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withSupervisors 4)))]
     (let [topology (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails
                            (TestWordSpout. true))}
@@ -150,14 +157,15 @@
                            id-bolt
                            (Integer. 6))
                      })
-          results (complete-topology cluster
+          results (Testing/completeTopology cluster
                                      topology
-                                     :mock-sources {"1" [["a"]
+                                     (doto (CompleteTopologyParam.)
+                                       (.setMockedSources (MockedSources. {"1" [["a"]
                                                         ["b"]
                                                         ]}
-                                     )]
-      (is (ms= [["a"] ["a"] ["b"] ["b"]]
-               (read-tuples results "2")))
-      (is (ms= [["a"] ["a"] ["a"] ["b"] ["b"] ["b"]]
-               (read-tuples results "3")))
+                                     ))))]
+      (is (Testing/multiseteq [["a"] ["a"] ["b"] ["b"]]
+               (Testing/readTuples results "2")))
+      (is (Testing/multiseteq [["a"] ["a"] ["a"] ["b"] ["b"] ["b"]]
+               (Testing/readTuples results "3")))
       )))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/local_state_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/local_state_test.clj b/storm-core/test/clj/org/apache/storm/local_state_test.clj
index f180d69..e5baa67 100644
--- a/storm-core/test/clj/org/apache/storm/local_state_test.clj
+++ b/storm-core/test/clj/org/apache/storm/local_state_test.clj
@@ -15,15 +15,18 @@
 ;; limitations under the License.
 (ns org.apache.storm.local-state-test
   (:use [clojure test])
-  (:use [org.apache.storm testing])
+  (:import [org.apache.storm.testing TmpPath])
   (:import [org.apache.storm.utils LocalState]
            [org.apache.storm.generated GlobalStreamId]
            [org.apache.commons.io FileUtils]
            [java.io File]))
 
 (deftest test-local-state
-  (with-local-tmp [dir1 dir2]
-    (let [gs-a (GlobalStreamId. "a" "a")
+  (with-open [dir1-tmp (TmpPath.) 
+              dir2-tmp (TmpPath.)]
+    (let [dir1 (.getPath dir1-tmp)
+          dir2 (.getPath dir2-tmp)
+          gs-a (GlobalStreamId. "a" "a")
           gs-b (GlobalStreamId. "b" "b")
           gs-c (GlobalStreamId. "c" "c")
           gs-d (GlobalStreamId. "d" "d")
@@ -45,8 +48,9 @@
       (is (= gs-d (.get ls2 "b"))))))
 
 (deftest empty-state
-  (with-local-tmp [dir]
-    (let [ls (LocalState. dir)
+  (with-open [tmp-dir (TmpPath.)]
+    (let [dir (.getPath tmp-dir)
+          ls (LocalState. dir)
           gs-a (GlobalStreamId. "a" "a")
           data (FileUtils/openOutputStream (File. dir "12345"))
           version (FileUtils/openOutputStream (File. dir "12345.version"))]

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index e9bada1..027e848 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -19,8 +19,7 @@
   (:require [conjure.core])
   (:use [clojure test])
   (:use [conjure core])
-  (:use [org.apache.storm testing]
-        [org.apache.storm.ui helpers])
+  (:use [org.apache.storm.ui helpers])
   (:import [org.apache.storm.daemon DirectoryCleaner]
            [org.apache.storm.utils Utils Time]
            [org.apache.storm.utils.staticmocking UtilsInstaller]

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index 6a3d3ca..0e29d8c 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -17,14 +17,17 @@
 (ns org.apache.storm.messaging.netty-integration-test
   (:use [clojure test])
   (:import [org.apache.storm.messaging TransportFactory]
-           [org.apache.storm Thrift])
-  (:import [org.apache.storm.testing TestWordSpout TestGlobalCount])
+           [org.apache.storm Thrift Testing LocalCluster$Builder])
+  (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordSpout TestGlobalCount])
   (:import [org.apache.storm.utils Utils])
-  (:use [org.apache.storm testing util config]))
+  (:use [org.apache.storm util config]))
 
 (deftest test-integration
-  (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
-                                      :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withSupervisors 4)
+                                (.withSupervisorSlotPortMin 6710)
+                                (.withDaemonConf {STORM-LOCAL-MODE-ZMQ true 
                                                     STORM-MESSAGING-TRANSPORT  "org.apache.storm.messaging.netty.Context"
                                                     STORM-MESSAGING-NETTY-AUTHENTICATION false
                                                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
@@ -32,7 +35,7 @@
                                                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
                                                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                                                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}]
+                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1})))]
     (let [topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails
                             (TestWordSpout. true) (Integer. 4))}
@@ -40,12 +43,14 @@
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareShuffleGrouping)}
                             (TestGlobalCount.) (Integer. 6))})
-          results (complete-topology cluster
+          results (Testing/completeTopology cluster
                                      topology
-                                     ;; important for test that
-                                     ;; #tuples = multiple of 4 and 6
-                                     :storm-conf {TOPOLOGY-WORKERS 3}
-                                     :mock-sources {"1" [["a"] ["b"]
+                                     (doto (CompleteTopologyParam.)
+                                       ;; important for test that
+                                       ;; #tuples = multiple of 4 and 6
+                                       (.setStormConf {TOPOLOGY-WORKERS 3})
+                                       (.setMockedSources
+                                         (MockedSources. {"1" [["a"] ["b"]
                                                          ["a"] ["b"]
                                                          ["a"] ["b"]
                                                          ["a"] ["b"]
@@ -58,5 +63,5 @@
                                                          ["a"] ["b"]
                                                          ["a"] ["b"]
                                                          ]}
-                                     )]
-        (is (= (* 6 4) (.size (read-tuples results "2")))))))
+                                     ))))]
+        (is (= (* 6 4) (.size (Testing/readTuples results "2")))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
index 5b5914d..51a7f1c 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
@@ -17,7 +17,8 @@
   (:use [clojure test])
   (:import [org.apache.storm.messaging TransportFactory IConnection TaskMessage IConnectionCallback])
   (:import [org.apache.storm.utils Utils])
-  (:use [org.apache.storm testing util config log])
+  (:import [org.apache.storm Testing Testing$Condition])
+  (:use [org.apache.storm util config log])
   (:import [java.util ArrayList]
            (org.apache.storm.daemon.worker WorkerState)))
 
@@ -60,7 +61,9 @@
 
 (defn- wait-for-not-nil
   [atm]
-  (while-timeout TEST-TIMEOUT-MS (nil? @atm) (Thread/sleep 10)))
+  (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+    (reify Testing$Condition (exec [this] (nil? @atm)))
+    (fn [] (Thread/sleep 10))))
 
 (defn- test-basic-fn [storm-conf]
   (log-message "1. Should send and receive a basic message")
@@ -112,7 +115,7 @@
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         _ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0})
-        _ (while-timeout TEST-TIMEOUT-MS (empty? (.getLoad client [(int 1) (int 2)])) (Thread/sleep 10))
+        _ (Testing/whileTimeout Testing/TEST_TIMEOUT_MS (reify Testing$Condition (exec [this] (empty? (.getLoad client [(int 1) (int 2)])))) (fn [] (Thread/sleep 10)))
         load (.getLoad client [(int 1) (int 2)])]
     (is (= 0.0 (.getBoltLoad (.get load (int 1)))))
     (is (= 1.0 (.getBoltLoad (.get load (int 2)))))
@@ -244,7 +247,9 @@
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
 
-    (while-timeout TEST-TIMEOUT-MS (< (.size resp) (- num-messages 1)) (log-message (.size resp) " " num-messages) (Thread/sleep 10))
+    (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+      (reify Testing$Condition (exec [this] (< (.size resp) (- num-messages 1))))
+      (fn [] (log-message (.size resp) " " num-messages) (Thread/sleep 10)))
 
     (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj
index 402ea7f..c984336 100644
--- a/storm-core/test/clj/org/apache/storm/messaging_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging_test.clj
@@ -15,20 +15,22 @@
 ;; limitations under the License.
 (ns org.apache.storm.messaging-test
   (:use [clojure test])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
-  (:use [org.apache.storm testing config])
-  (:use [org.apache.storm.daemon common])
-  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
+  (:use [org.apache.storm config])
+  (:import [org.apache.storm Testing Thrift LocalCluster$Builder])
   (:import [org.apache.storm.utils Utils]))
 
 (deftest test-local-transport
-  (doseq [transport-on? [false true]] 
-    (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
-                                        :daemon-conf {TOPOLOGY-WORKERS 2
-                                                      STORM-LOCAL-MODE-ZMQ 
-                                                      (if transport-on? true false) 
-                                                      STORM-MESSAGING-TRANSPORT 
-                                                      "org.apache.storm.messaging.netty.Context"}]
+  (doseq [transport-on? [false true]]
+    (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                  (.withSimulatedTime)
+                                  (.withSupervisors 1)
+                                  (.withPortsPerSupervisor 2)
+                                  (.withDaemonConf {TOPOLOGY-WORKERS 2
+                                                    STORM-LOCAL-MODE-ZMQ 
+                                                    (if transport-on? true false) 
+                                                    STORM-MESSAGING-TRANSPORT 
+                                                    "org.apache.storm.messaging.netty.Context"})))]
       (let [topology (Thrift/buildTopology
                        {"1" (Thrift/prepareSpoutDetails
                               (TestWordSpout. true) (Integer. 2))}
@@ -37,11 +39,12 @@
                                (Thrift/prepareShuffleGrouping)}
                               (TestGlobalCount.) (Integer. 6))
                         })
-            results (complete-topology cluster
+            results (Testing/completeTopology cluster
                                        topology
-                                       ;; important for test that
-                                       ;; #tuples = multiple of 4 and 6
-                                       :mock-sources {"1" [["a"] ["b"]
+                                       (doto (CompleteTopologyParam.)
+                                         ;; important for test that
+                                         ;; #tuples = multiple of 4 and 6
+                                         (.setMockedSources (MockedSources. {"1" [["a"] ["b"]
                                                            ["a"] ["b"]
                                                            ["a"] ["b"]
                                                            ["a"] ["b"]
@@ -54,14 +57,5 @@
                                                            ["a"] ["b"]
                                                            ["a"] ["b"]
                                                            ]}
-                                       )]
-        (is (= (* 6 4) (.size (read-tuples results "2"))))))))
-
-(extend-type TestEventLogSpout
-  CompletableSpout
-  (exhausted? [this]
-    (-> this .completed))
-  (cleanup [this]
-    (.cleanup this))
-  (startup [this]
-    ))
+                                       ))))]
+        (is (= (* 6 4) (.size (Testing/readTuples results "2"))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index c993d5b..edea3ec 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -17,17 +17,17 @@
   (:use [clojure test])
   (:import [org.apache.storm.topology TopologyBuilder])
   (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount FeederSpout
             TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout])
   (:import [org.apache.storm.task ShellBolt])
   (:import [org.apache.storm.spout ShellSpout])
   (:import [org.apache.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
   (:import [org.apache.storm.metric.api.rpc CountShellMetric])
   (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm Testing Testing$Condition LocalCluster$Builder])
   
-  (:use [org.apache.storm testing config])
+  (:use [org.apache.storm config])
   (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm.util])
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.utils Utils]
@@ -50,8 +50,9 @@
             ))))))
 
 (defn assert-loop [afn ids]
-  (while-timeout TEST-TIMEOUT-MS (not (every? afn ids))
-    (Thread/sleep 1)))
+  (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+    (reify Testing$Condition (exec [this] (not (every? afn ids))))
+    (fn [] (Thread/sleep 1))))
 
 (defn assert-acked [tracker & ids]
   (assert-loop #(.isAcked tracker %) ids))
@@ -70,16 +71,16 @@
               (ack! collector tuple)))))
 
 (defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
-  (while-timeout TEST-TIMEOUT-MS
+  (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+    (reify Testing$Condition (exec [this]
       (let [taskid->buckets (clojurify-structure (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name))]
         (or
          (and (not= N 0) (nil? taskid->buckets))
-         (not-every? #(<= N %) (map (comp count second) taskid->buckets))))
-      ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " FakeMetricConsumer/getTaskIdToBuckets)
-    (if cluster
-      (advance-cluster-time cluster 1)
-      (Thread/sleep 10))))
-    
+         (not-every? #(<= N %) (map (comp count second) taskid->buckets))))))
+    (fn []
+      (if cluster
+        (.advanceClusterTime cluster 1)
+        (Thread/sleep 10)))))
 
 (defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
   (-> (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name)
@@ -98,65 +99,67 @@
   `(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name))))
 
 (deftest test-custom-metric
-  (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            "storm.zookeeper.connection.timeout" 30000
                            "storm.zookeeper.session.timeout" 60000
-                           }]
-    (let [feeder (feeder-spout ["field1"])
+                           })))]
+    (let [feeder (FeederSpout. ["field1"])
           topology (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails feeder)}
                     {"2" (Thrift/prepareBoltDetails
                            {(Utils/getGlobalStreamId "1" nil)
                             (Thrift/prepareGlobalGrouping)}
                            count-acks)})]
-      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+      (.submitTopology cluster "metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 6)
+      (.advanceClusterTime cluster 6)
       (assert-buckets! "2" "my-custom-metric" [1] cluster)
             
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
 
-      (advance-cluster-time cluster 20)
+      (.advanceClusterTime cluster 20)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
       
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)               
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (deftest test-custom-metric-with-multi-tasks
-  (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            "storm.zookeeper.connection.timeout" 30000
                            "storm.zookeeper.session.timeout" 60000
-                           }]
-    (let [feeder (feeder-spout ["field1"])
+                           })))]
+    (let [feeder (FeederSpout. ["field1"])
           topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails feeder)}
                      {"2" (Thrift/prepareBoltDetails
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareAllGrouping)}
                             count-acks (Integer. 1) {TOPOLOGY-TASKS 2})})]
-      (submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
+      (.submitTopology cluster "metrics-tester-with-multitasks" {} topology)
 
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 6)
+      (.advanceClusterTime cluster 6)
       (assert-buckets! "2" "my-custom-metric" [1] cluster)
 
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0] cluster)
 
-      (advance-cluster-time cluster 20)
+      (.advanceClusterTime cluster 20)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
 
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
 
 (defn mk-shell-bolt-with-metrics-spec
@@ -165,34 +168,35 @@
          (PythonShellMetricsBolt. command file)))
 
 (deftest test-custom-metric-with-multilang-py
-  (with-simulated-time-local-cluster 
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                       [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                       "storm.zookeeper.connection.timeout" 30000
-                       "storm.zookeeper.session.timeout" 60000
-                       }]
-    (let [feeder (feeder-spout ["field1"])
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
+                           "storm.zookeeper.connection.timeout" 30000
+                           "storm.zookeeper.session.timeout" 60000
+                           })))]
+    (let [feeder (FeederSpout. ["field1"])
           topology (Thrift/buildTopology
                      {"1" (Thrift/prepareSpoutDetails feeder)}
                      {"2" (mk-shell-bolt-with-metrics-spec
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareGlobalGrouping)}
                             "python" "tester_bolt_metrics.py")})]
-      (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
+      (.submitTopology cluster "shell-metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 6)
+      (.advanceClusterTime cluster 6)
       (assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
             
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
 
-      (advance-cluster-time cluster 20)
+      (.advanceClusterTime cluster 20)
       (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
       
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)               
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
       )))
 
@@ -201,48 +205,51 @@
     (Thrift/prepareSpoutDetails (PythonShellMetricsSpout. command file)))
 
 (deftest test-custom-metric-with-spout-multilang-py
-  (with-simulated-time-local-cluster 
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                       [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                       "storm.zookeeper.connection.timeout" 30000
-                       "storm.zookeeper.session.timeout" 60000}]
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
+                           "storm.zookeeper.connection.timeout" 30000
+                           "storm.zookeeper.session.timeout" 60000
+                           })))]
     (let [topology (Thrift/buildTopology
                      {"1" (mk-shell-spout-with-metrics-spec "python" "tester_spout_metrics.py")}
                      {"2" (Thrift/prepareBoltDetails
                             {(Utils/getGlobalStreamId "1" nil)
                              (Thrift/prepareAllGrouping)}
                             count-acks)})]
-      (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
+      (.submitTopology cluster "shell-spout-metrics-tester" {} topology)
 
-      (advance-cluster-time cluster 7)
+      (.advanceClusterTime cluster 7)
       (assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
       )))
 
 
 (deftest test-builtin-metrics-1
-  (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER                    
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
-    (let [feeder (feeder-spout ["field1"])
+                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
+    (let [feeder (FeederSpout. ["field1"])
           topology (Thrift/buildTopology
                     {"myspout" (Thrift/prepareSpoutDetails feeder)}
                     {"mybolt" (Thrift/prepareBoltDetails
                                 {(Utils/getGlobalStreamId "myspout" nil)
                                  (Thrift/prepareShuffleGrouping)}
                                 acking-bolt)})]
-      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+      (.submitTopology cluster "metrics-tester" {} topology)
       
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 61)
+      (.advanceClusterTime cluster 61)
       (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
 
-      (advance-cluster-time cluster 120)
+      (.advanceClusterTime cluster 120)
       (assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
@@ -251,7 +258,7 @@
 
       (.feed feeder ["b"] 1)
       (.feed feeder ["c"] 1)
-      (advance-cluster-time cluster 60)
+      (.advanceClusterTime cluster 60)
       (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)      
@@ -260,12 +267,13 @@
 
 
 (deftest test-builtin-metrics-2
-  (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
-    (let [feeder (feeder-spout ["field1"])
+                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5})))]
+    (let [feeder (FeederSpout. ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
           topology (Thrift/buildTopology
@@ -274,13 +282,13 @@
                                 {(Utils/getGlobalStreamId "myspout" nil)
                                  (Thrift/prepareShuffleGrouping)}
                                 ack-every-other)})]
-      (submit-local-topology (:nimbus cluster)
+      (.submitTopology cluster
                              "metrics-tester"
                              {}
                              topology)
       
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 6)
+      (.advanceClusterTime cluster 6)
       (assert-buckets! "myspout" "__fail-count/default" [] cluster)
       (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
@@ -290,7 +298,7 @@
       (assert-acked tracker 1)
 
       (.feed feeder ["b"] 2)      
-      (advance-cluster-time cluster 5)
+      (.advanceClusterTime cluster 5)
       (assert-buckets! "myspout" "__fail-count/default" [] cluster)
       (assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
@@ -298,7 +306,7 @@
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
 
-      (advance-cluster-time cluster 15)      
+      (.advanceClusterTime cluster 15)      
       (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
@@ -306,7 +314,7 @@
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
       
       (.feed feeder ["c"] 3)            
-      (advance-cluster-time cluster 15)      
+      (.advanceClusterTime cluster 15)      
       (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
@@ -314,13 +322,14 @@
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
 
 (deftest test-builtin-metrics-3
-  (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
                            TOPOLOGY-STATS-SAMPLE-RATE 1.0
                            TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
-                           TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
-    (let [feeder (feeder-spout ["field1"])
+                           TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+    (let [feeder (FeederSpout. ["field1"])
           tracker (AckFailMapTracker.)
           _ (.setAckFailDelegate feeder tracker)
           topology (Thrift/buildTopology
@@ -329,14 +338,14 @@
                                 {(Utils/getGlobalStreamId "myspout" nil)
                                  (Thrift/prepareGlobalGrouping)}
                                 ack-every-other)})]
-      (submit-local-topology (:nimbus cluster)
+      (.submitTopology cluster
                              "timeout-tester"
                              {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
                              topology)
       (.feed feeder ["a"] 1)
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
-      (advance-cluster-time cluster 9)
+      (.advanceClusterTime cluster 9)
       (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
@@ -345,7 +354,7 @@
       (assert-acked tracker 1 3)
       
       (is (not (.isFailed tracker 2)))
-      (advance-cluster-time cluster 30)
+      (.advanceClusterTime cluster 30)
       (assert-failed tracker 2)
       (assert-buckets! "myspout" "__fail-count/default" [1] cluster)
       (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
@@ -355,23 +364,24 @@
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
 
 (deftest test-system-bolt
-  (with-simulated-time-local-cluster
-    [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+  (with-open [cluster (.build (doto (LocalCluster$Builder.)
+                                (.withSimulatedTime)
+                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
                            [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
-    (let [feeder (feeder-spout ["field1"])
+                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
+    (let [feeder (FeederSpout. ["field1"])
           topology (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails feeder)}
                     {})]      
-      (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+      (.submitTopology cluster "metrics-tester" {} topology)
 
       (.feed feeder ["a"] 1)
-      (advance-cluster-time cluster 70)
+      (.advanceClusterTime cluster 70)
       (assert-buckets! "__system" "newWorkerEvent" [1] cluster)
       (assert-metric-data-exists! "__system" "uptimeSecs")
       (assert-metric-data-exists! "__system" "startTimeSecs")
 
-      (advance-cluster-time cluster 180)
+      (.advanceClusterTime cluster 180)
       (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
       )))