You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:03 UTC
[03/10] storm git commit: STORM-1281: LocalCluster,
testing4j and testing.clj to java
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 2be4a7a..583c25e 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -16,21 +16,21 @@
(ns org.apache.storm.cluster-test
(:import [java.util Arrays]
[org.apache.storm.nimbus NimbusInfo])
- (:import [org.apache.storm.daemon.common Assignment StormBase SupervisorInfo])
- (:import [org.apache.storm.generated NimbusSummary TopologyStatus])
+ (:import [org.apache.storm.generated SupervisorInfo StormBase Assignment NimbusSummary TopologyStatus NodeInfo Credentials])
(:import [org.apache.zookeeper ZooDefs ZooDefs$Ids Watcher$Event$EventType])
(:import [org.mockito Mockito])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
- (:import [org.apache.storm.utils Time Utils ZookeeperAuthInfo ConfigUtils])
+ (:import [org.apache.storm.utils Time Time$SimulatedTime Utils ZookeeperAuthInfo ConfigUtils])
(:import [org.apache.storm.cluster IStateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
(:import [org.apache.storm.zookeeper Zookeeper])
(:import [org.apache.storm.callback ZKStateChangedCallback])
+ (:import [org.apache.storm.testing InProcessZookeeper])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster])
(:require [conjure.core])
(:use [conjure core])
(:use [clojure test])
- (:use [org.apache.storm config util testing log converter])
+ (:use [org.apache.storm config util log])
(:use [org.apache.storm.internal thrift]))
(defn mk-config [zk-port]
@@ -53,8 +53,8 @@
(byte-array (map byte vals)))
(deftest test-basics
- (with-inprocess-zookeeper zk-port
- (let [state (mk-state zk-port)]
+ (with-open [zk (InProcessZookeeper. )]
+ (let [state (mk-state (.getPort zk))]
(.set-data state "/root" (barr 1 2 3) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1 2 3) (.get-data state "/root" false)))
(is (= nil (.get-data state "/a" false)))
@@ -74,9 +74,9 @@
)))
(deftest test-multi-state
- (with-inprocess-zookeeper zk-port
- (let [state1 (mk-state zk-port)
- state2 (mk-state zk-port)]
+ (with-open [zk (InProcessZookeeper. )]
+ (let [state1 (mk-state (.getPort zk))
+ state2 (mk-state (.getPort zk))]
(.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state1 "/root" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/root" false)))
@@ -88,10 +88,10 @@
)))
(deftest test-ephemeral
- (with-inprocess-zookeeper zk-port
- (let [state1 (mk-state zk-port)
- state2 (mk-state zk-port)
- state3 (mk-state zk-port)]
+ (with-open [zk (InProcessZookeeper. )]
+ (let [state1 (mk-state (.getPort zk))
+ state2 (mk-state (.getPort zk))
+ state3 (mk-state (.getPort zk))]
(.set-ephemeral-node state1 "/a" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(is (Arrays/equals (barr 1) (.get-data state1 "/a" false)))
(is (Arrays/equals (barr 1) (.get-data state2 "/a" false)))
@@ -128,11 +128,11 @@
))))
(deftest test-callbacks
- (with-inprocess-zookeeper zk-port
+ (with-open [zk (InProcessZookeeper. )]
(let [[state1-last-cb state1-cb] (mk-callback-tester)
- state1 (mk-state zk-port state1-cb)
+ state1 (mk-state (.getPort zk) state1-cb)
[state2-last-cb state2-cb] (mk-callback-tester)
- state2 (mk-state zk-port state2-cb)]
+ state2 (mk-state (.getPort zk) state2-cb)]
(.set-data state1 "/root" (barr 1) ZooDefs$Ids/OPEN_ACL_UNSAFE)
(.get-data state2 "/root" true)
(is (= nil @state1-last-cb))
@@ -178,45 +178,60 @@
)))
+(defn mkAssignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources]
+ (doto (Assignment.)
+ (.set_executor_node_port executor->node+port)
+ (.set_executor_start_time_secs executor->start-time-secs)
+ (.set_worker_resources worker->resources)
+ (.set_node_host node->host)
+ (.set_master_code_dir master-code-dir)))
+
+(defn mkStormBase [storm-name launch-time-secs status num-workers]
+ (doto (StormBase.)
+ (.set_name storm-name)
+ (.set_launch_time_secs (int launch-time-secs))
+ (.set_status status)
+ (.set_num_workers (int num-workers))))
+
(deftest test-storm-cluster-state-basics
- (with-inprocess-zookeeper zk-port
- (let [state (mk-storm-state zk-port)
- assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {})
- assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {})
+ (with-open [zk (InProcessZookeeper. )]
+ (let [state (mk-storm-state (.getPort zk))
+ assignment1 (mkAssignment "/aaa" {} {[1] (NodeInfo. "1" #{1001 1})} {} {})
+ assignment2 (mkAssignment "/aaa" {} {[2] (NodeInfo. "2" #{2002})} {} {})
nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (Time/currentTimeSecs) false "v1")
nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (Time/currentTimeSecs) false "v2")
- base1 (StormBase. "/tmp/storm1" 1 {:type TopologyStatus/ACTIVE} 2 {} "" nil nil {})
- base2 (StormBase. "/tmp/storm2" 2 {:type TopologyStatus/ACTIVE} 2 {} "" nil nil {})]
+ base1 (mkStormBase "/tmp/storm1" 1 TopologyStatus/ACTIVE 2)
+ base2 (mkStormBase "/tmp/storm2" 2 TopologyStatus/ACTIVE 2)]
(is (= [] (.assignments state nil)))
- (.setAssignment state "storm1" (thriftify-assignment assignment1))
- (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm1" nil))))
- (is (= nil (clojurify-assignment (.assignmentInfo state "storm3" nil))))
- (.setAssignment state "storm1" (thriftify-assignment assignment2))
- (.setAssignment state "storm3" (thriftify-assignment assignment1))
+ (.setAssignment state "storm1" assignment1)
+ (is (= assignment1 (.assignmentInfo state "storm1" nil)))
+ (is (= nil (.assignmentInfo state "storm3" nil)))
+ (.setAssignment state "storm1" assignment2)
+ (.setAssignment state "storm3" assignment1)
(is (= #{"storm1" "storm3"} (set (.assignments state nil))))
- (is (= assignment2 (clojurify-assignment (.assignmentInfo state "storm1" nil))))
- (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm3" nil))))
+ (is (= assignment2 (.assignmentInfo state "storm1" nil)))
+ (is (= assignment1 (.assignmentInfo state "storm3" nil)))
(is (= [] (.activeStorms state)))
- (.activateStorm state "storm1" (thriftify-storm-base base1))
+ (.activateStorm state "storm1" base1)
(is (= ["storm1"] (.activeStorms state)))
- (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
- (is (= nil (clojurify-storm-base (.stormBase state "storm2" nil))))
- (.activateStorm state "storm2" (thriftify-storm-base base2))
- (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
- (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
+ (is (= base1 (.stormBase state "storm1" nil)))
+ (is (= nil (.stormBase state "storm2" nil)))
+ (.activateStorm state "storm2" base2)
+ (is (= base1 (.stormBase state "storm1" nil)))
+ (is (= base2 (.stormBase state "storm2" nil)))
(is (= #{"storm1" "storm2"} (set (.activeStorms state))))
(.removeStormBase state "storm1")
- (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
+ (is (= base2 (.stormBase state "storm2" nil)))
(is (= #{"storm2"} (set (.activeStorms state))))
- (is (nil? (clojurify-crdentials (.credentials state "storm1" nil))))
- (.setCredentials state "storm1" (thriftify-credentials {"a" "a"}) {})
- (is (= {"a" "a"} (clojurify-crdentials (.credentials state "storm1" nil))))
- (.setCredentials state "storm1" (thriftify-credentials {"b" "b"}) {})
- (is (= {"b" "b"} (clojurify-crdentials (.credentials state "storm1" nil))))
+ (is (nil? (.credentials state "storm1" nil)))
+ (.setCredentials state "storm1" (doto (Credentials. ) (.set_creds {"a" "a"})) {})
+ (is (= {"a" "a"} (.get_creds (.credentials state "storm1" nil))))
+ (.setCredentials state "storm1" (doto (Credentials. ) (.set_creds {"b" "b"})) {})
+ (is (= {"b" "b"} (.get_creds (.credentials state "storm1" nil))))
(is (= [] (.blobstoreInfo state "")))
(.setupBlobstore state "key1" nimbusInfo1 (Integer/parseInt "1"))
@@ -254,21 +269,21 @@
(.toString result)))
(deftest test-storm-cluster-state-errors
- (with-inprocess-zookeeper zk-port
- (with-simulated-time
- (let [state (mk-storm-state zk-port)]
+ (with-open [zk (InProcessZookeeper. )]
+ (with-open [_ (Time$SimulatedTime. )]
+ (let [state (mk-storm-state (.getPort zk))]
(.reportError state "a" "1" (Utils/localHostname) 6700 (RuntimeException.))
(validate-errors! state "a" "1" ["RuntimeException"])
- (advance-time-secs! 1)
+ (Time/advanceTimeSecs 1)
(.reportError state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
(validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
(doseq [i (range 10)]
(.reportError state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
- (advance-time-secs! 2))
+ (Time/advanceTimeSecs 2))
(validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
(doseq [i (range 5)]
(.reportError state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
- (advance-time-secs! 2))
+ (Time/advanceTimeSecs 2))
(validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
(repeat 5 "RuntimeException")
))
@@ -276,18 +291,29 @@
(.disconnect state)
))))
+(defn mkSupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map]
+ (doto (SupervisorInfo.)
+ (.set_time_secs time-secs)
+ (.set_hostname hostname)
+ (.set_assignment_id assignment-id)
+ (.set_used_ports used-ports)
+ (.set_meta meta)
+ (.set_scheduler_meta scheduler-meta)
+ (.set_uptime_secs uptime-secs)
+ (.set_version version)
+ (.set_resources_map resources-map)))
(deftest test-supervisor-state
- (with-inprocess-zookeeper zk-port
- (let [state1 (mk-storm-state zk-port)
- state2 (mk-storm-state zk-port)
- supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
- supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
+ (with-open [zk (InProcessZookeeper. )]
+ (let [state1 (mk-storm-state (.getPort zk))
+ state2 (mk-storm-state (.getPort zk))
+ supervisor-info1 (mkSupervisorInfo 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
+ supervisor-info2 (mkSupervisorInfo 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
(is (= [] (.supervisors state1 nil)))
- (.supervisorHeartbeat state2 "2" (thriftify-supervisor-info supervisor-info2))
- (.supervisorHeartbeat state1 "1" (thriftify-supervisor-info supervisor-info1))
- (is (= supervisor-info2 (clojurify-supervisor-info (.supervisorInfo state1 "2"))))
- (is (= supervisor-info1 (clojurify-supervisor-info (.supervisorInfo state1 "1"))))
+ (.supervisorHeartbeat state2 "2" supervisor-info2)
+ (.supervisorHeartbeat state1 "1" supervisor-info1)
+ (is (= supervisor-info2 (.supervisorInfo state1 "2")))
+ (is (= supervisor-info1 (.supervisorInfo state1 "1")))
(is (= #{"1" "2"} (set (.supervisors state1 nil))))
(is (= #{"1" "2"} (set (.supervisors state2 nil))))
(.disconnect state2)
@@ -296,10 +322,10 @@
)))
(deftest test-cluster-authentication
- (with-inprocess-zookeeper zk-port
+ (with-open [zk (InProcessZookeeper. )]
(let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
conf (merge
- (mk-config zk-port)
+ (mk-config (.getPort zk))
{STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
STORM-ZOOKEEPER-SESSION-TIMEOUT 10
STORM-ZOOKEEPER-RETRY-INTERVAL 5
@@ -310,7 +336,7 @@
(. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
(. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
(. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
- (Utils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
+ (Utils/testSetupBuilder builder (str (.getPort zk) "/") conf (ZookeeperAuthInfo. conf))
(is (nil?
(try
(. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 827f536..7974e49 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -31,9 +31,9 @@
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.daemon DrpcServer])
(:import [org.mockito ArgumentCaptor Mockito Matchers])
- (:use [org.apache.storm config testing])
+ (:use [org.apache.storm config])
(:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm.daemon common drpc])
+ (:use [org.apache.storm.daemon drpc])
(:use [conjure core]))
(defbolt exclamation-bolt ["result" "return-info"] [tuple collector]
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index dc56f81..6eacf15 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -15,13 +15,12 @@
;; limitations under the License.
(ns org.apache.storm.grouping-test
(:use [clojure test])
- (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
+ (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
[org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
(:import [org.apache.storm.grouping LoadMapping])
- (:use [org.apache.storm testing log config])
+ (:use [org.apache.storm log config])
(:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm.daemon common])
- (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm LocalCluster$Builder Testing Thrift])
(:import [org.apache.storm.utils Utils]
(org.apache.storm.daemon GrouperFactory)))
@@ -79,7 +78,9 @@
(is (<= load2 max2-prcnt))))
(deftest test-field
- (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)))]
(let [spout-phint 4
bolt-phint 6
topology (Thrift/buildTopology
@@ -90,21 +91,24 @@
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordBytesCounter.) (Integer. spout-phint))
})
- results (complete-topology
+ results (Testing/completeTopology
cluster
topology
- :mock-sources {"1" (->> [[(.getBytes "a")]
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" (->> [[(.getBytes "a")]
[(.getBytes "b")]]
(repeat (* spout-phint bolt-phint))
- (apply concat))})]
- (is (ms= (apply concat
+ (apply concat))}))))]
+ (is (Testing/multiseteq (apply concat
(for [value '("a" "b")
sum (range 1 (inc (* spout-phint bolt-phint)))]
- [[value sum]]))
- (read-tuples results "2"))))))
+ [[value (int sum)]]))
+ (Testing/readTuples results "2"))))))
(deftest test-field
- (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)))]
(let [spout-phint 4
bolt-phint 6
topology (Thrift/buildTopology
@@ -115,25 +119,28 @@
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordBytesCounter.) (Integer. bolt-phint))
})
- results (complete-topology
+ results (Testing/completeTopology
cluster
topology
- :mock-sources {"1" (->> [[(.getBytes "a")]
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" (->> [[(.getBytes "a")]
[(.getBytes "b")]]
(repeat (* spout-phint bolt-phint))
- (apply concat))})]
- (is (ms= (apply concat
+ (apply concat))}))))]
+ (is (Testing/multiseteq (apply concat
(for [value '("a" "b")
sum (range 1 (inc (* spout-phint bolt-phint)))]
- [[value sum]]))
- (read-tuples results "2"))))))
+ [[value (int sum)]]))
+ (Testing/readTuples results "2"))))))
(defbolt id-bolt ["val"] [tuple collector]
(emit-bolt! collector (.getValues tuple))
(ack! collector tuple))
(deftest test-custom-groupings
- (with-simulated-time-local-cluster [cluster]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true))}
@@ -150,14 +157,15 @@
id-bolt
(Integer. 6))
})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- :mock-sources {"1" [["a"]
+ (doto (CompleteTopologyParam.)
+ (.setMockedSources (MockedSources. {"1" [["a"]
["b"]
]}
- )]
- (is (ms= [["a"] ["a"] ["b"] ["b"]]
- (read-tuples results "2")))
- (is (ms= [["a"] ["a"] ["a"] ["b"] ["b"] ["b"]]
- (read-tuples results "3")))
+ ))))]
+ (is (Testing/multiseteq [["a"] ["a"] ["b"] ["b"]]
+ (Testing/readTuples results "2")))
+ (is (Testing/multiseteq [["a"] ["a"] ["a"] ["b"] ["b"] ["b"]]
+ (Testing/readTuples results "3")))
)))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/local_state_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/local_state_test.clj b/storm-core/test/clj/org/apache/storm/local_state_test.clj
index f180d69..e5baa67 100644
--- a/storm-core/test/clj/org/apache/storm/local_state_test.clj
+++ b/storm-core/test/clj/org/apache/storm/local_state_test.clj
@@ -15,15 +15,18 @@
;; limitations under the License.
(ns org.apache.storm.local-state-test
(:use [clojure test])
- (:use [org.apache.storm testing])
+ (:import [org.apache.storm.testing TmpPath])
(:import [org.apache.storm.utils LocalState]
[org.apache.storm.generated GlobalStreamId]
[org.apache.commons.io FileUtils]
[java.io File]))
(deftest test-local-state
- (with-local-tmp [dir1 dir2]
- (let [gs-a (GlobalStreamId. "a" "a")
+ (with-open [dir1-tmp (TmpPath.)
+ dir2-tmp (TmpPath.)]
+ (let [dir1 (.getPath dir1-tmp)
+ dir2 (.getPath dir2-tmp)
+ gs-a (GlobalStreamId. "a" "a")
gs-b (GlobalStreamId. "b" "b")
gs-c (GlobalStreamId. "c" "c")
gs-d (GlobalStreamId. "d" "d")
@@ -45,8 +48,9 @@
(is (= gs-d (.get ls2 "b"))))))
(deftest empty-state
- (with-local-tmp [dir]
- (let [ls (LocalState. dir)
+ (with-open [tmp-dir (TmpPath.)]
+ (let [dir (.getPath tmp-dir)
+ ls (LocalState. dir)
gs-a (GlobalStreamId. "a" "a")
data (FileUtils/openOutputStream (File. dir "12345"))
version (FileUtils/openOutputStream (File. dir "12345.version"))]
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index e9bada1..027e848 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -19,8 +19,7 @@
(:require [conjure.core])
(:use [clojure test])
(:use [conjure core])
- (:use [org.apache.storm testing]
- [org.apache.storm.ui helpers])
+ (:use [org.apache.storm.ui helpers])
(:import [org.apache.storm.daemon DirectoryCleaner]
[org.apache.storm.utils Utils Time]
[org.apache.storm.utils.staticmocking UtilsInstaller]
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index 6a3d3ca..0e29d8c 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -17,14 +17,17 @@
(ns org.apache.storm.messaging.netty-integration-test
(:use [clojure test])
(:import [org.apache.storm.messaging TransportFactory]
- [org.apache.storm Thrift])
- (:import [org.apache.storm.testing TestWordSpout TestGlobalCount])
+ [org.apache.storm Thrift Testing LocalCluster$Builder])
+ (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordSpout TestGlobalCount])
(:import [org.apache.storm.utils Utils])
- (:use [org.apache.storm testing util config]))
+ (:use [org.apache.storm util config]))
(deftest test-integration
- (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
- :daemon-conf {STORM-LOCAL-MODE-ZMQ true
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 4)
+ (.withSupervisorSlotPortMin 6710)
+ (.withDaemonConf {STORM-LOCAL-MODE-ZMQ true
STORM-MESSAGING-TRANSPORT "org.apache.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
@@ -32,7 +35,7 @@
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1}]
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1})))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true) (Integer. 4))}
@@ -40,12 +43,14 @@
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareShuffleGrouping)}
(TestGlobalCount.) (Integer. 6))})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- ;; important for test that
- ;; #tuples = multiple of 4 and 6
- :storm-conf {TOPOLOGY-WORKERS 3}
- :mock-sources {"1" [["a"] ["b"]
+ (doto (CompleteTopologyParam.)
+ ;; important for test that
+ ;; #tuples = multiple of 4 and 6
+ (.setStormConf {TOPOLOGY-WORKERS 3})
+ (.setMockedSources
+ (MockedSources. {"1" [["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
@@ -58,5 +63,5 @@
["a"] ["b"]
["a"] ["b"]
]}
- )]
- (is (= (* 6 4) (.size (read-tuples results "2")))))))
+ ))))]
+ (is (= (* 6 4) (.size (Testing/readTuples results "2")))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
index 5b5914d..51a7f1c 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
@@ -17,7 +17,8 @@
(:use [clojure test])
(:import [org.apache.storm.messaging TransportFactory IConnection TaskMessage IConnectionCallback])
(:import [org.apache.storm.utils Utils])
- (:use [org.apache.storm testing util config log])
+ (:import [org.apache.storm Testing Testing$Condition])
+ (:use [org.apache.storm util config log])
(:import [java.util ArrayList]
(org.apache.storm.daemon.worker WorkerState)))
@@ -60,7 +61,9 @@
(defn- wait-for-not-nil
[atm]
- (while-timeout TEST-TIMEOUT-MS (nil? @atm) (Thread/sleep 10)))
+ (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+ (reify Testing$Condition (exec [this] (nil? @atm)))
+ (fn [] (Thread/sleep 10))))
(defn- test-basic-fn [storm-conf]
(log-message "1. Should send and receive a basic message")
@@ -112,7 +115,7 @@
_ (wait-until-ready [server client])
_ (.send client task (.getBytes req_msg))
_ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0})
- _ (while-timeout TEST-TIMEOUT-MS (empty? (.getLoad client [(int 1) (int 2)])) (Thread/sleep 10))
+ _ (Testing/whileTimeout Testing/TEST_TIMEOUT_MS (reify Testing$Condition (exec [this] (empty? (.getLoad client [(int 1) (int 2)])))) (fn [] (Thread/sleep 10)))
load (.getLoad client [(int 1) (int 2)])]
(is (= 0.0 (.getBoltLoad (.get load (int 1)))))
(is (= 1.0 (.getBoltLoad (.get load (int 2)))))
@@ -244,7 +247,9 @@
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
- (while-timeout TEST-TIMEOUT-MS (< (.size resp) (- num-messages 1)) (log-message (.size resp) " " num-messages) (Thread/sleep 10))
+ (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+ (reify Testing$Condition (exec [this] (< (.size resp) (- num-messages 1))))
+ (fn [] (log-message (.size resp) " " num-messages) (Thread/sleep 10)))
(doseq [num (range 1 num-messages)]
(let [req_msg (str num)
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj
index 402ea7f..c984336 100644
--- a/storm-core/test/clj/org/apache/storm/messaging_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging_test.clj
@@ -15,20 +15,22 @@
;; limitations under the License.
(ns org.apache.storm.messaging-test
(:use [clojure test])
- (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
- (:use [org.apache.storm testing config])
- (:use [org.apache.storm.daemon common])
- (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
+ (:use [org.apache.storm config])
+ (:import [org.apache.storm Testing Thrift LocalCluster$Builder])
(:import [org.apache.storm.utils Utils]))
(deftest test-local-transport
- (doseq [transport-on? [false true]]
- (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
- :daemon-conf {TOPOLOGY-WORKERS 2
- STORM-LOCAL-MODE-ZMQ
- (if transport-on? true false)
- STORM-MESSAGING-TRANSPORT
- "org.apache.storm.messaging.netty.Context"}]
+ (doseq [transport-on? [false true]]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withSupervisors 1)
+ (.withPortsPerSupervisor 2)
+ (.withDaemonConf {TOPOLOGY-WORKERS 2
+ STORM-LOCAL-MODE-ZMQ
+ (if transport-on? true false)
+ STORM-MESSAGING-TRANSPORT
+ "org.apache.storm.messaging.netty.Context"})))]
(let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestWordSpout. true) (Integer. 2))}
@@ -37,11 +39,12 @@
(Thrift/prepareShuffleGrouping)}
(TestGlobalCount.) (Integer. 6))
})
- results (complete-topology cluster
+ results (Testing/completeTopology cluster
topology
- ;; important for test that
- ;; #tuples = multiple of 4 and 6
- :mock-sources {"1" [["a"] ["b"]
+ (doto (CompleteTopologyParam.)
+ ;; important for test that
+ ;; #tuples = multiple of 4 and 6
+ (.setMockedSources (MockedSources. {"1" [["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
["a"] ["b"]
@@ -54,14 +57,5 @@
["a"] ["b"]
["a"] ["b"]
]}
- )]
- (is (= (* 6 4) (.size (read-tuples results "2"))))))))
-
-(extend-type TestEventLogSpout
- CompletableSpout
- (exhausted? [this]
- (-> this .completed))
- (cleanup [this]
- (.cleanup this))
- (startup [this]
- ))
+ ))))]
+ (is (= (* 6 4) (.size (Testing/readTuples results "2"))))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index c993d5b..edea3ec 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -17,17 +17,17 @@
(:use [clojure test])
(:import [org.apache.storm.topology TopologyBuilder])
(:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus])
- (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount FeederSpout
TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout])
(:import [org.apache.storm.task ShellBolt])
(:import [org.apache.storm.spout ShellSpout])
(:import [org.apache.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
(:import [org.apache.storm.metric.api.rpc CountShellMetric])
(:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm Testing Testing$Condition LocalCluster$Builder])
- (:use [org.apache.storm testing config])
+ (:use [org.apache.storm config])
(:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm.daemon common])
(:use [org.apache.storm.util])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils]
@@ -50,8 +50,9 @@
))))))
(defn assert-loop [afn ids]
- (while-timeout TEST-TIMEOUT-MS (not (every? afn ids))
- (Thread/sleep 1)))
+ (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+ (reify Testing$Condition (exec [this] (not (every? afn ids))))
+ (fn [] (Thread/sleep 1))))
(defn assert-acked [tracker & ids]
(assert-loop #(.isAcked tracker %) ids))
@@ -70,16 +71,16 @@
(ack! collector tuple)))))
(defn wait-for-atleast-N-buckets! [N comp-id metric-name cluster]
- (while-timeout TEST-TIMEOUT-MS
+ (Testing/whileTimeout Testing/TEST_TIMEOUT_MS
+ (reify Testing$Condition (exec [this]
(let [taskid->buckets (clojurify-structure (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name))]
(or
(and (not= N 0) (nil? taskid->buckets))
- (not-every? #(<= N %) (map (comp count second) taskid->buckets))))
- ;;(log-message "Waiting for at least " N " timebuckets to appear in FakeMetricsConsumer for component id " comp-id " and metric name " metric-name " metrics " FakeMetricConsumer/getTaskIdToBuckets)
- (if cluster
- (advance-cluster-time cluster 1)
- (Thread/sleep 10))))
-
+ (not-every? #(<= N %) (map (comp count second) taskid->buckets))))))
+ (fn []
+ (if cluster
+ (.advanceClusterTime cluster 1)
+ (Thread/sleep 10)))))
(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
(-> (FakeMetricConsumer/getTaskIdToBuckets comp-id metric-name)
@@ -98,65 +99,67 @@
`(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name))))
(deftest test-custom-metric
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
- }]
- (let [feeder (feeder-spout ["field1"])
+ })))]
+ (let [feeder (FeederSpout. ["field1"])
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder)}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareGlobalGrouping)}
count-acks)})]
- (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+ (.submitTopology cluster "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 6)
+ (.advanceClusterTime cluster 6)
(assert-buckets! "2" "my-custom-metric" [1] cluster)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0] cluster)
- (advance-cluster-time cluster 20)
+ (.advanceClusterTime cluster 20)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(deftest test-custom-metric-with-multi-tasks
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
"storm.zookeeper.connection.timeout" 30000
"storm.zookeeper.session.timeout" 60000
- }]
- (let [feeder (feeder-spout ["field1"])
+ })))]
+ (let [feeder (FeederSpout. ["field1"])
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder)}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareAllGrouping)}
count-acks (Integer. 1) {TOPOLOGY-TASKS 2})})]
- (submit-local-topology (:nimbus cluster) "metrics-tester-with-multitasks" {} topology)
+ (.submitTopology cluster "metrics-tester-with-multitasks" {} topology)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 6)
+ (.advanceClusterTime cluster 6)
(assert-buckets! "2" "my-custom-metric" [1] cluster)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0] cluster)
- (advance-cluster-time cluster 20)
+ (.advanceClusterTime cluster 20)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2] cluster))))
(defn mk-shell-bolt-with-metrics-spec
@@ -165,34 +168,35 @@
(PythonShellMetricsBolt. command file)))
(deftest test-custom-metric-with-multilang-py
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
- "storm.zookeeper.connection.timeout" 30000
- "storm.zookeeper.session.timeout" 60000
- }]
- (let [feeder (feeder-spout ["field1"])
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
+ "storm.zookeeper.connection.timeout" 30000
+ "storm.zookeeper.session.timeout" 60000
+ })))]
+ (let [feeder (FeederSpout. ["field1"])
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder)}
{"2" (mk-shell-bolt-with-metrics-spec
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareGlobalGrouping)}
"python" "tester_bolt_metrics.py")})]
- (submit-local-topology (:nimbus cluster) "shell-metrics-tester" {} topology)
+ (.submitTopology cluster "shell-metrics-tester" {} topology)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 6)
+ (.advanceClusterTime cluster 6)
(assert-buckets! "2" "my-custom-shell-metric" [1] cluster)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "2" "my-custom-shell-metric" [1 0] cluster)
- (advance-cluster-time cluster 20)
+ (.advanceClusterTime cluster 20)
(assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0] cluster)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "2" "my-custom-shell-metric" [1 0 0 0 0 0 2] cluster)
)))
@@ -201,48 +205,51 @@
(Thrift/prepareSpoutDetails (PythonShellMetricsSpout. command file)))
(deftest test-custom-metric-with-spout-multilang-py
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
- [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
- "storm.zookeeper.connection.timeout" 30000
- "storm.zookeeper.session.timeout" 60000}]
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
+ "storm.zookeeper.connection.timeout" 30000
+ "storm.zookeeper.session.timeout" 60000
+ })))]
(let [topology (Thrift/buildTopology
{"1" (mk-shell-spout-with-metrics-spec "python" "tester_spout_metrics.py")}
{"2" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareAllGrouping)}
count-acks)})]
- (submit-local-topology (:nimbus cluster) "shell-spout-metrics-tester" {} topology)
+ (.submitTopology cluster "shell-spout-metrics-tester" {} topology)
- (advance-cluster-time cluster 7)
+ (.advanceClusterTime cluster 7)
(assert-buckets! "1" "my-custom-shellspout-metric" [2] cluster)
)))
(deftest test-builtin-metrics-1
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
- TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
- (let [feeder (feeder-spout ["field1"])
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
+ (let [feeder (FeederSpout. ["field1"])
topology (Thrift/buildTopology
{"myspout" (Thrift/prepareSpoutDetails feeder)}
{"mybolt" (Thrift/prepareBoltDetails
{(Utils/getGlobalStreamId "myspout" nil)
(Thrift/prepareShuffleGrouping)}
acking-bolt)})]
- (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+ (.submitTopology cluster "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 61)
+ (.advanceClusterTime cluster 61)
(assert-buckets! "myspout" "__ack-count/default" [1] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1] cluster)
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
- (advance-cluster-time cluster 120)
+ (.advanceClusterTime cluster 120)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 0 0] cluster)
@@ -251,7 +258,7 @@
(.feed feeder ["b"] 1)
(.feed feeder ["c"] 1)
- (advance-cluster-time cluster 60)
+ (.advanceClusterTime cluster 60)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 2] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 0 0 2] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2] cluster)
@@ -260,12 +267,13 @@
(deftest test-builtin-metrics-2
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
- TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
- (let [feeder (feeder-spout ["field1"])
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5})))]
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
@@ -274,13 +282,13 @@
{(Utils/getGlobalStreamId "myspout" nil)
(Thrift/prepareShuffleGrouping)}
ack-every-other)})]
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"metrics-tester"
{}
topology)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 6)
+ (.advanceClusterTime cluster 6)
(assert-buckets! "myspout" "__fail-count/default" [] cluster)
(assert-buckets! "myspout" "__ack-count/default" [1] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1] cluster)
@@ -290,7 +298,7 @@
(assert-acked tracker 1)
(.feed feeder ["b"] 2)
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(assert-buckets! "myspout" "__fail-count/default" [] cluster)
(assert-buckets! "myspout" "__ack-count/default" [1 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 1] cluster)
@@ -298,7 +306,7 @@
(assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0] cluster)
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1] cluster)
- (advance-cluster-time cluster 15)
+ (.advanceClusterTime cluster 15)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0] cluster)
@@ -306,7 +314,7 @@
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0] cluster)
(.feed feeder ["c"] 3)
- (advance-cluster-time cluster 15)
+ (.advanceClusterTime cluster 15)
(assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0] cluster)
(assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0] cluster)
@@ -314,13 +322,14 @@
(assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0] cluster))))
(deftest test-builtin-metrics-3
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
- TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
- (let [feeder (feeder-spout ["field1"])
+ TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
+ (let [feeder (FeederSpout. ["field1"])
tracker (AckFailMapTracker.)
_ (.setAckFailDelegate feeder tracker)
topology (Thrift/buildTopology
@@ -329,14 +338,14 @@
{(Utils/getGlobalStreamId "myspout" nil)
(Thrift/prepareGlobalGrouping)}
ack-every-other)})]
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"timeout-tester"
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
topology)
(.feed feeder ["a"] 1)
(.feed feeder ["b"] 2)
(.feed feeder ["c"] 3)
- (advance-cluster-time cluster 9)
+ (.advanceClusterTime cluster 9)
(assert-buckets! "myspout" "__ack-count/default" [2] cluster)
(assert-buckets! "myspout" "__emit-count/default" [3] cluster)
(assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
@@ -345,7 +354,7 @@
(assert-acked tracker 1 3)
(is (not (.isFailed tracker 2)))
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(assert-failed tracker 2)
(assert-buckets! "myspout" "__fail-count/default" [1] cluster)
(assert-buckets! "myspout" "__ack-count/default" [2 0 0 0] cluster)
@@ -355,23 +364,24 @@
(assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0] cluster))))
(deftest test-system-bolt
- (with-simulated-time-local-cluster
- [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ (with-open [cluster (.build (doto (LocalCluster$Builder.)
+ (.withSimulatedTime)
+ (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
- TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
- (let [feeder (feeder-spout ["field1"])
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
+ (let [feeder (FeederSpout. ["field1"])
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails feeder)}
{})]
- (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+ (.submitTopology cluster "metrics-tester" {} topology)
(.feed feeder ["a"] 1)
- (advance-cluster-time cluster 70)
+ (.advanceClusterTime cluster 70)
(assert-buckets! "__system" "newWorkerEvent" [1] cluster)
(assert-metric-data-exists! "__system" "uptimeSecs")
(assert-metric-data-exists! "__system" "startTimeSecs")
- (advance-cluster-time cluster 180)
+ (.advanceClusterTime cluster 180)
(assert-buckets! "__system" "newWorkerEvent" [1 0 0 0] cluster)
)))