You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:02 UTC
[02/10] storm git commit: STORM-1281: LocalCluster,
testing4j and testing.clj to java
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index cc760a4..bb949d5 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -16,16 +16,18 @@
(ns org.apache.storm.nimbus-test
(:use [clojure test])
(:require [org.apache.storm [util :as util]])
- (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+ (:import [java.util.function UnaryOperator])
+ (:import [org.apache.storm.testing InProcessZookeeper MockLeaderElector TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
[org.apache.storm.blobstore BlobStore]
[org.apache.storm.nimbus InMemoryTopologyActionNotifier]
[org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
[org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
- [org.apache.storm Thrift MockAutoCred]
+ [org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
[org.apache.storm.stats BoltExecutorStats StatsUtil]
[org.apache.storm.security.auth IGroupMappingServiceProvider IAuthorizer])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper])
+ (:import [org.apache.storm.testing TmpPath])
(:import [org.apache.storm.scheduler INimbus])
(:import [org.mockito Mockito Matchers])
(:import [org.mockito.exceptions.base MockitoAssertionError])
@@ -38,18 +40,22 @@
(:import [java.util HashMap HashSet Optional])
(:import [java.io File])
(:import [javax.security.auth Subject])
- (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate StormCommonInstaller]
+ (:import [org.apache.storm.utils Time Time$SimulatedTime Utils Utils$UptimeComputer ConfigUtils IPredicate StormCommonInstaller]
[org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
(:import [org.apache.storm.zookeeper Zookeeper])
(:import [org.apache.commons.io FileUtils])
(:import [org.json.simple JSONValue])
(:import [org.apache.storm.daemon StormCommon])
(:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
- (:use [org.apache.storm testing util config log converter])
+ (:use [org.apache.storm util config log])
(:require [conjure.core])
(:use [conjure core]))
+(defn- mk-nimbus
+ [conf inimbus blob-store leader-elector group-mapper cluster-state]
+ (Nimbus. conf inimbus cluster-state nil blob-store leader-elector group-mapper))
+
(defn- from-json
[^String str]
(if str
@@ -58,28 +64,30 @@
nil))
(defn storm-component->task-info [cluster storm-name]
- (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)
- nimbus (:nimbus cluster)]
+ (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+ nimbus (.getNimbus cluster)]
(-> (.getUserTopology nimbus storm-id)
(#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
(Utils/reverseMap)
clojurify-structure)))
(defn getCredentials [cluster storm-name]
- (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)]
- (clojurify-crdentials (.credentials (:storm-cluster-state cluster) storm-id nil))))
+ (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+ creds (.credentials (.getClusterState cluster) storm-id nil)]
+ (if creds (into {} (.get_creds creds)))))
(defn storm-component->executor-info [cluster storm-name]
- (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)
- nimbus (:nimbus cluster)
+ (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+ nimbus (.getNimbus cluster)
storm-conf (from-json (.getTopologyConf nimbus storm-id))
topology (.getUserTopology nimbus storm-id)
task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))
- state (:storm-cluster-state cluster)
+ state (.getClusterState cluster)
get-component (comp task->component first)]
- (->> (clojurify-assignment (.assignmentInfo state storm-id nil))
- :executor->node+port
- keys
+ (->> (.assignmentInfo state storm-id nil)
+ .get_executor_node_port
+ .keySet
+ clojurify-structure
(map (fn [e] {e (get-component e)}))
(apply merge)
(Utils/reverseMap)
@@ -87,26 +95,25 @@
(defn storm-num-workers [state storm-name]
(let [storm-id (StormCommon/getStormId state storm-name)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- (count (clojurify-structure (Utils/reverseMap (:executor->node+port assignment))))
- ))
+ assignment (.assignmentInfo state storm-id nil)]
+ (.size (Utils/reverseMap (.get_executor_node_port assignment)))))
(defn topology-nodes [state storm-name]
(let [storm-id (StormCommon/getStormId state storm-name)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
+ assignment (.assignmentInfo state storm-id nil)]
(->> assignment
- :executor->node+port
- vals
- (map first)
+ .get_executor_node_port
+ .values
+ (map (fn [np] (.get_node np)))
set
)))
(defn topology-slots [state storm-name]
(let [storm-id (StormCommon/getStormId state storm-name)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
+ assignment (.assignmentInfo state storm-id nil)]
(->> assignment
- :executor->node+port
- vals
+ .get_executor_node_port
+ .values
set
)))
@@ -114,12 +121,12 @@
; map-val is a temporary kluge for clojure.
(defn topology-node-distribution [state storm-name]
(let [storm-id (StormCommon/getStormId state storm-name)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
+ assignment (.assignmentInfo state storm-id nil)]
(->> assignment
- :executor->node+port
- vals
+ .get_executor_node_port
+ .values
set
- (group-by first)
+ (group-by (fn [np] (.get_node np)))
(map-val count)
(map (fn [[_ amt]] {amt 1}))
(apply merge-with +)
@@ -129,20 +136,22 @@
(count (topology-nodes state storm-name)))
(defn executor-assignment [cluster storm-id executor-id]
- (let [state (:storm-cluster-state cluster)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- ((:executor->node+port assignment) executor-id)
+ (let [state (.getClusterState cluster)
+ assignment (.assignmentInfo state storm-id nil)]
+ (.get (.get_executor_node_port assignment) executor-id)
))
(defn executor-start-times [cluster storm-id]
- (let [state (:storm-cluster-state cluster)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- (:executor->start-time-secs assignment)))
+ (let [state (.getClusterState cluster)
+ assignment (.assignmentInfo state storm-id nil)]
+ (clojurify-structure (.get_executor_start_time_secs assignment))))
(defn do-executor-heartbeat [cluster storm-id executor]
- (let [state (:storm-cluster-state cluster)
- executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state storm-id nil)))
- [node port] (get executor->node+port executor)
+ (let [state (.getClusterState cluster)
+ executor->node+port (.get_executor_node_port (.assignmentInfo state storm-id nil))
+ np (.get executor->node+port executor)
+ node (.get_node np)
+ port (first (.get_port np))
curr-beat (StatsUtil/convertZkWorkerHb (.getWorkerHeartbeat state storm-id node port))
stats (if (get curr-beat "executor-stats")
(get curr-beat "executor-stats")
@@ -156,28 +165,28 @@
(StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))))
(defn slot-assignments [cluster storm-id]
- (let [state (:storm-cluster-state cluster)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- (clojurify-structure (Utils/reverseMap (:executor->node+port assignment)))))
+ (let [state (.getClusterState cluster)
+ assignment (.assignmentInfo state storm-id nil)]
+ (clojurify-structure (Utils/reverseMap (.get_executor_node_port assignment)))))
(defn task-ids [cluster storm-id]
- (let [nimbus (:nimbus cluster)]
+ (let [nimbus (.getNimbus cluster)]
(-> (.getUserTopology nimbus storm-id)
(#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
clojurify-structure
keys)))
(defn topology-executors [cluster storm-id]
- (let [state (:storm-cluster-state cluster)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
- ret-keys (keys (:executor->node+port assignment))
+ (let [state (.getClusterState cluster)
+ assignment (.assignmentInfo state storm-id nil)
+ ret-keys (keys (.get_executor_node_port assignment))
_ (log-message "ret-keys: " (pr-str ret-keys)) ]
ret-keys
))
(defn check-distribution [items distribution]
- (let [counts (map count items)]
- (is (ms= counts distribution))))
+ (let [counts (map long (map count items))]
+ (is (Testing/multiseteq counts (map long distribution)))))
(defn disjoint? [& sets]
(let [combined (apply concat sets)]
@@ -188,14 +197,14 @@
clojurify-structure (StormCommon/executorIdToTasks executor-id))
(defnk check-consistency [cluster storm-name :assigned? true]
- (let [state (:storm-cluster-state cluster)
+ (let [state (.getClusterState cluster)
storm-id (StormCommon/getStormId state storm-name)
task-ids (task-ids cluster storm-id)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
- executor->node+port (:executor->node+port assignment)
+ assignment (.assignmentInfo state storm-id nil)
+ executor->node+port (.get_executor_node_port assignment)
task->node+port (StormCommon/taskToNodeport executor->node+port)
assigned-task-ids (mapcat executor->tasks (keys executor->node+port))
- all-nodes (set (map first (vals executor->node+port)))]
+ all-nodes (set (map (fn [np] (.get_node np)) (.values executor->node+port)))]
(when assigned?
(is (= (sort task-ids) (sort assigned-task-ids)))
(doseq [t task-ids]
@@ -203,17 +212,17 @@
(doseq [[e s] executor->node+port]
(is (not-nil? s)))
- ;;(map str (-> (Thread/currentThread) .getStackTrace))
- (is (= all-nodes (set (keys (:node->host assignment)))))
+ (is (= all-nodes (set (keys (.get_node_host assignment)))))
(doseq [[e s] executor->node+port]
- (is (not-nil? ((:executor->start-time-secs assignment) e))))
+ (is (not-nil? (.get (.get_executor_start_time_secs assignment) e))))
))
(deftest test-bogusId
- (with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
- :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
- (let [state (:storm-cluster-state cluster)
- nimbus (:nimbus cluster)]
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSupervisors 4)
+ (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+ (let [state (.getClusterState cluster)
+ nimbus (.getNimbus cluster)]
(is (thrown? NotAliveException (.getTopologyConf nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
@@ -222,10 +231,11 @@
)))
(deftest test-assignment
- (with-simulated-time-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
- :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
- (let [state (:storm-cluster-state cluster)
- nimbus (:nimbus cluster)
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 4)
+ (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+ (let [state (.getClusterState cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3))}
@@ -254,8 +264,8 @@
(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 4))})
- _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
- _ (advance-cluster-time cluster 11)
+ _ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
+ _ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
;; 3 should be assigned once (if it were optimized, we'd have
@@ -265,8 +275,8 @@
(is (= 4 (count (task-info "2"))))
(is (= 1 (count (task-info "3"))))
(is (= 4 (storm-num-workers state "mystorm")))
- (submit-local-topology nimbus "storm2" {TOPOLOGY-WORKERS 20} topology2)
- (advance-cluster-time cluster 11)
+ (.submitTopology cluster "storm2" {TOPOLOGY-WORKERS 20} topology2)
+ (.advanceClusterTime cluster 11)
(check-consistency cluster "storm2")
(is (= 2 (count (.assignments state nil))))
(let [task-info (storm-component->task-info cluster "storm2")]
@@ -297,17 +307,17 @@
(deftest test-auto-credentials
- (with-simulated-time-local-cluster [cluster :supervisors 6
- :ports-per-supervisor 3
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 6)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
NIMBUS-CREDENTIAL-RENEW-FREQ-SECS 10
NIMBUS-CREDENTIAL-RENEWERS (list "org.apache.storm.MockAutoCred")
NIMBUS-AUTO-CRED-PLUGINS (list "org.apache.storm.MockAutoCred")
- }]
- (let [state (:storm-cluster-state cluster)
- nimbus (:nimbus cluster)
+ })))]
+ (let [state (.getClusterState cluster)
topology-name "test-auto-cred-storm"
submitOptions (SubmitOptions. TopologyInitialStatus/INACTIVE)
- (.set_creds submitOptions (Credentials. (HashMap.)))
@@ -322,14 +332,14 @@
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.))})
- _ (submit-local-topology-with-opts nimbus topology-name {TOPOLOGY-WORKERS 4
+ _ (.submitTopologyWithOpts cluster topology-name {TOPOLOGY-WORKERS 4
TOPOLOGY-AUTO-CREDENTIALS (list "org.apache.storm.MockAutoCred")
} topology submitOptions)
credentials (getCredentials cluster topology-name)]
; check that the credentials have nimbus auto generated cred
(is (= (.get credentials MockAutoCred/NIMBUS_CRED_KEY) MockAutoCred/NIMBUS_CRED_VAL))
;advance cluster time so the renewers can execute
- (advance-cluster-time cluster 20)
+ (.advanceClusterTime cluster 20)
;check that renewed credentials replace the original credential.
(is (= (.get (getCredentials cluster topology-name) MockAutoCred/NIMBUS_CRED_KEY) MockAutoCred/NIMBUS_CRED_RENEW_VAL))
(is (= (.get (getCredentials cluster topology-name) MockAutoCred/GATEWAY_CRED_KEY) MockAutoCred/GATEWAY_CRED_RENEW_VAL)))))
@@ -347,19 +357,19 @@
~(first lexpr))))
(deftest test-isolated-assignment
- (with-simulated-time-local-cluster [cluster :supervisors 6
- :ports-per-supervisor 3
- :inimbus (isolation-nimbus)
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 6)
+ (.withINimbus (isolation-nimbus))
+ (.withDaemonConf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
STORM-SCHEDULER "org.apache.storm.scheduler.IsolationScheduler"
ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
NIMBUS-MONITOR-FREQ-SECS 10
- }]
+ })))]
(letlocals
- (bind state (:storm-cluster-state cluster))
- (bind nimbus (:nimbus cluster))
+ (bind state (.getClusterState cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3))}
@@ -372,14 +382,14 @@
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.))}))
- (submit-local-topology nimbus "noniso" {TOPOLOGY-WORKERS 4} topology)
- (advance-cluster-time cluster 11)
+ (.submitTopology cluster "noniso" {TOPOLOGY-WORKERS 4} topology)
+ (.advanceClusterTime cluster 11)
(is (= 4 (topology-num-nodes state "noniso")))
(is (= 4 (storm-num-workers state "noniso")))
- (submit-local-topology nimbus "tester1" {TOPOLOGY-WORKERS 6} topology)
- (submit-local-topology nimbus "tester2" {TOPOLOGY-WORKERS 6} topology)
- (advance-cluster-time cluster 11)
+ (.submitTopology cluster "tester1" {TOPOLOGY-WORKERS 6} topology)
+ (.submitTopology cluster "tester2" {TOPOLOGY-WORKERS 6} topology)
+ (.advanceClusterTime cluster 11)
(bind task-info-tester1 (storm-component->task-info cluster "tester1"))
(bind task-info-tester2 (storm-component->task-info cluster "tester2"))
@@ -401,7 +411,7 @@
(bind tester1-slots (topology-slots state "tester1"))
(bind tester2-slots (topology-slots state "tester2"))
(bind noniso-slots (topology-slots state "noniso"))
- (advance-cluster-time cluster 20)
+ (.advanceClusterTime cluster 20)
(is (= tester1-slots (topology-slots state "tester1")))
(is (= tester2-slots (topology-slots state "tester2")))
(is (= noniso-slots (topology-slots state "noniso")))
@@ -409,9 +419,11 @@
)))
(deftest test-zero-executor-or-tasks
- (with-simulated-time-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
- (let [state (:storm-cluster-state cluster)
- nimbus (:nimbus cluster)
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 6)
+ (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+ (let [state (.getClusterState cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. false) (Integer. 3)
@@ -426,8 +438,8 @@
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) nil
{TOPOLOGY-TASKS 5})})
- _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
- _ (advance-cluster-time cluster 11)
+ _ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
+ _ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "mystorm")]
(check-consistency cluster "mystorm")
(is (= 0 (count (task-info "1"))))
@@ -438,9 +450,10 @@
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(deftest test-executor-assignments
- (with-simulated-time-local-cluster[cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
- (let [nimbus (:nimbus cluster)
- topology (Thrift/buildTopology
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+ (let [topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3)
{TOPOLOGY-TASKS 5})}
@@ -453,8 +466,8 @@
{(Utils/getGlobalStreamId "2" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 3))})
- _ (submit-local-topology nimbus "mystorm" {TOPOLOGY-WORKERS 4} topology)
- _ (advance-cluster-time cluster 11)
+ _ (.submitTopology cluster "mystorm" {TOPOLOGY-WORKERS 4} topology)
+ _ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "mystorm")
executor-info (->> (storm-component->executor-info cluster "mystorm")
(map-val #(map executor->tasks %)))]
@@ -470,10 +483,12 @@
)))
(deftest test-over-parallelism-assignment
- (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
- :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
- (let [state (:storm-cluster-state cluster)
- nimbus (:nimbus cluster)
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 2)
+ (.withPortsPerSupervisor 5)
+ (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+ (let [state (.getClusterState cluster)
topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 21))}
@@ -489,8 +504,8 @@
{(Utils/getGlobalStreamId "1" nil)
(Thrift/prepareNoneGrouping)}
(TestPlannerBolt.) (Integer. 10))})
- _ (submit-local-topology nimbus "test" {TOPOLOGY-WORKERS 7} topology)
- _ (advance-cluster-time cluster 11)
+ _ (.submitTopology cluster "test" {TOPOLOGY-WORKERS 7} topology)
+ _ (.advanceClusterTime cluster 11)
task-info (storm-component->task-info cluster "test")]
(check-consistency cluster "test")
(is (= 21 (count (task-info "1"))))
@@ -502,81 +517,84 @@
(deftest test-topo-history
(let [group-mapper (Mockito/mock IGroupMappingServiceProvider)]
- (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
- :group-mapper group-mapper
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 2)
+ (.withPortsPerSupervisor 5)
+ (.withGroupMapper group-mapper)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-ADMINS ["admin-user"]
NIMBUS-TASK-TIMEOUT-SECS 30
NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-ACKER-EXECUTORS 0}]
+ TOPOLOGY-ACKER-EXECUTORS 0})))]
(.thenReturn (Mockito/when (.getGroups group-mapper (Mockito/anyObject))) #{"alice-group"})
(letlocals
- (bind conf (:daemon-conf cluster))
+ (bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 4))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
(bind storm-id (StormCommon/getStormId state "test"))
- (advance-cluster-time cluster 5)
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
- (.killTopology (:nimbus cluster) "test")
+ (.advanceClusterTime cluster 5)
+ (is (not-nil? (.stormBase state storm-id nil)))
+ (is (not-nil? (.assignmentInfo state storm-id nil)))
+ (.killTopology (.getNimbus cluster) "test")
;; check that storm is deactivated but alive
- (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type)))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
- (advance-cluster-time cluster 35)
+ (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id nil))))
+ (is (not-nil? (.assignmentInfo state storm-id nil)))
+ (.advanceClusterTime cluster 35)
;; kill topology read on group
- (submit-local-topology (:nimbus cluster) "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
+ (.submitTopology cluster "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
(bind storm-id-killgroup (StormCommon/getStormId state "killgrouptest"))
- (advance-cluster-time cluster 5)
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id-killgroup nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil))))
- (.killTopology (:nimbus cluster) "killgrouptest")
+ (.advanceClusterTime cluster 5)
+ (is (not-nil? (.stormBase state storm-id-killgroup nil)))
+ (is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
+ (.killTopology (.getNimbus cluster) "killgrouptest")
;; check that storm is deactivated but alive
- (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id-killgroup nil)) :status :type)))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil))))
- (advance-cluster-time cluster 35)
+ (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id-killgroup nil))))
+ (is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
+ (.advanceClusterTime cluster 35)
;; kill topology can't read
- (submit-local-topology (:nimbus cluster) "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
+ (.submitTopology cluster "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
(bind storm-id-killnoread (StormCommon/getStormId state "killnoreadtest"))
- (advance-cluster-time cluster 5)
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id-killnoread nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil))))
- (.killTopology (:nimbus cluster) "killnoreadtest")
+ (.advanceClusterTime cluster 5)
+ (is (not-nil? (.stormBase state storm-id-killnoread nil)))
+ (is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
+ (.killTopology (.getNimbus cluster) "killnoreadtest")
;; check that storm is deactivated but alive
- (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id-killnoread nil)) :status :type)))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil))))
- (advance-cluster-time cluster 35)
+ (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id-killnoread nil))))
+ (is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
+ (.advanceClusterTime cluster 35)
;; active topology can read
- (submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
- (advance-cluster-time cluster 11)
+ (.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
+ (.advanceClusterTime cluster 11)
(bind storm-id2 (StormCommon/getStormId state "2test"))
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id2 nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id2 nil))))
+ (is (not-nil? (.stormBase state storm-id2 nil)))
+ (is (not-nil? (.assignmentInfo state storm-id2 nil)))
;; active topology can not read
- (submit-local-topology (:nimbus cluster) "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice"]} topology)
- (advance-cluster-time cluster 11)
+ (.submitTopology cluster "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice"]} topology)
+ (.advanceClusterTime cluster 11)
(bind storm-id3 (StormCommon/getStormId state "testnoread"))
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id3 nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id3 nil))))
+ (is (not-nil? (.stormBase state storm-id3 nil)))
+ (is (not-nil? (.assignmentInfo state storm-id3 nil)))
;; active topology can read based on group
- (submit-local-topology (:nimbus cluster) "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
- (advance-cluster-time cluster 11)
+ (.submitTopology cluster "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
+ (.advanceClusterTime cluster 11)
(bind storm-id4 (StormCommon/getStormId state "testreadgroup"))
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id4 nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
+ (is (not-nil? (.stormBase state storm-id4 nil)))
+ (is (not-nil? (.assignmentInfo state storm-id4 nil)))
;; at this point have 1 running, 1 killed topo
- (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) (System/getProperty "user.name")))))]
+ (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) (System/getProperty "user.name")))))]
(log-message "Checking user " (System/getProperty "user.name") " " hist-topo-ids)
(is (= 4 (count hist-topo-ids)))
(is (= storm-id2 (get hist-topo-ids 0)))
(is (= storm-id-killgroup (get hist-topo-ids 1)))
(is (= storm-id (get hist-topo-ids 2)))
(is (= storm-id4 (get hist-topo-ids 3))))
- (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) "alice"))))]
+ (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "alice"))))]
(log-message "Checking user alice " hist-topo-ids)
(is (= 5 (count hist-topo-ids)))
(is (= storm-id2 (get hist-topo-ids 0)))
@@ -584,7 +602,7 @@
(is (= storm-id (get hist-topo-ids 2)))
(is (= storm-id3 (get hist-topo-ids 3)))
(is (= storm-id4 (get hist-topo-ids 4))))
- (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) "admin-user"))))]
+ (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "admin-user"))))]
(log-message "Checking user admin-user " hist-topo-ids)
(is (= 6 (count hist-topo-ids)))
(is (= storm-id2 (get hist-topo-ids 0)))
@@ -593,122 +611,128 @@
(is (= storm-id (get hist-topo-ids 3)))
(is (= storm-id3 (get hist-topo-ids 4)))
(is (= storm-id4 (get hist-topo-ids 5))))
- (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (:nimbus cluster) "group-only-user"))))]
+ (let [hist-topo-ids (vec (sort (.get_topo_ids (.getTopologyHistory (.getNimbus cluster) "group-only-user"))))]
(log-message "Checking user group-only-user " hist-topo-ids)
(is (= 2 (count hist-topo-ids)))
(is (= storm-id-killgroup (get hist-topo-ids 0)))
(is (= storm-id4 (get hist-topo-ids 1))))))))
(deftest test-kill-storm
- (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 2)
+ (.withPortsPerSupervisor 5)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-TIMEOUT-SECS 30
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
- (bind conf (:daemon-conf cluster))
+ (bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 14))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
(bind storm-id (StormCommon/getStormId state "test"))
- (advance-cluster-time cluster 15)
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
- (.killTopology (:nimbus cluster) "test")
+ (.advanceClusterTime cluster 15)
+ (is (not-nil? (.stormBase state storm-id nil)))
+ (is (not-nil? (.assignmentInfo state storm-id nil)))
+ (.killTopology (.getNimbus cluster) "test")
;; check that storm is deactivated but alive
- (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type)))
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
- (advance-cluster-time cluster 18)
+ (is (= TopologyStatus/KILLED (.get_status (.stormBase state storm-id nil))))
+ (is (not-nil? (.assignmentInfo state storm-id nil)))
+ (.advanceClusterTime cluster 18)
;; check that storm is deactivated but alive
(is (= 1 (count (.heartbeatStorms state))))
- (advance-cluster-time cluster 3)
- (is (nil? (clojurify-storm-base (.stormBase state storm-id nil))))
- (is (nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
+ (.advanceClusterTime cluster 3)
+ (is (nil? (.stormBase state storm-id nil)))
+ (is (nil? (.assignmentInfo state storm-id nil)))
;; cleanup happens on monitoring thread
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(is (empty? (.heartbeatStorms state)))
;; TODO: check that code on nimbus was cleaned up locally...
- (is (thrown? NotAliveException (.killTopology (:nimbus cluster) "lalala")))
- (submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
- (advance-cluster-time cluster 11)
- (is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
- (advance-cluster-time cluster 11)
+ (is (thrown? NotAliveException (.killTopology (.getNimbus cluster) "lalala")))
+ (.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} topology)
+ (.advanceClusterTime cluster 11)
+ (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
+ (.advanceClusterTime cluster 11)
(bind storm-id (StormCommon/getStormId state "2test"))
- (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
- (.killTopology (:nimbus cluster) "2test")
- (is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
- (advance-cluster-time cluster 11)
+ (is (not-nil? (.stormBase state storm-id nil)))
+ (.killTopology (.getNimbus cluster) "2test")
+ (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
+ (.advanceClusterTime cluster 11)
(is (= 1 (count (.heartbeatStorms state))))
- (advance-cluster-time cluster 6)
- (is (nil? (clojurify-storm-base (.stormBase state storm-id nil))))
- (is (nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 6)
+ (is (nil? (.stormBase state storm-id nil)))
+ (is (nil? (.assignmentInfo state storm-id nil)))
+ (.advanceClusterTime cluster 11)
(is (= 0 (count (.heartbeatStorms state))))
- (submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
+ (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (StormCommon/getStormId state "test3"))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(.removeStorm state storm-id3)
- (is (nil? (clojurify-storm-base (.stormBase state storm-id3 nil))))
- (is (nil? (clojurify-assignment (.assignmentInfo state storm-id3 nil))))
+ (is (nil? (.stormBase state storm-id3 nil)))
+ (is (nil? (.assignmentInfo state storm-id3 nil)))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(is (= 0 (count (.heartbeatStorms state))))
;; this guarantees that monitor thread won't trigger for 10 more seconds
- (advance-time-secs! 11)
- (wait-until-cluster-waiting cluster)
+ (Time/advanceTimeSecs 11)
+ (.waitForIdle cluster)
- (submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
+ (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
(bind storm-id3 (StormCommon/getStormId state "test3"))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind executor-id (first (topology-executors cluster storm-id3)))
(do-executor-heartbeat cluster storm-id3 executor-id)
- (.killTopology (:nimbus cluster) "test3")
- (advance-cluster-time cluster 6)
+ (.killTopology (.getNimbus cluster) "test3")
+ (.advanceClusterTime cluster 6)
(is (= 1 (count (.heartbeatStorms state))))
- (advance-cluster-time cluster 5)
+ (.advanceClusterTime cluster 5)
(is (= 0 (count (.heartbeatStorms state))))
;; test kill with opts
- (submit-local-topology (:nimbus cluster) "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
- (advance-cluster-time cluster 11)
- (.killTopologyWithOpts (:nimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
+ (.submitTopology cluster "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
+ (.advanceClusterTime cluster 11)
+ (.killTopologyWithOpts (.getNimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
(bind storm-id4 (StormCommon/getStormId state "test4"))
- (advance-cluster-time cluster 9)
- (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
- (advance-cluster-time cluster 2)
- (is (nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
+ (.advanceClusterTime cluster 9)
+ (is (not-nil? (.assignmentInfo state storm-id4 nil)))
+ (.advanceClusterTime cluster 2)
+ (is (nil? (.assignmentInfo state storm-id4 nil)))
)))
(deftest test-reassignment
- (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 2)
+ (.withPortsPerSupervisor 5)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
- (bind conf (:daemon-conf cluster))
+ (bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 2))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
- (advance-cluster-time cluster 11)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
+ (.advanceClusterTime cluster 11)
(check-consistency cluster "test")
(bind storm-id (StormCommon/getStormId state "test"))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
@@ -717,7 +741,7 @@
(bind _ (log-message "ass1, t0: " (pr-str ass1)))
(bind _ (log-message "ass2, t0: " (pr-str ass2)))
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(bind _ (log-message "ass1, t30, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t30, pre beat: " (pr-str ass2)))
(do-executor-heartbeat cluster storm-id executor-id1)
@@ -725,7 +749,7 @@
(bind _ (log-message "ass1, t30, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t30, post beat: " (pr-str ass2)))
- (advance-cluster-time cluster 13)
+ (.advanceClusterTime cluster 13)
(bind _ (log-message "ass1, t43, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t43, pre beat: " (pr-str ass2)))
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
@@ -734,7 +758,7 @@
(bind _ (log-message "ass1, t43, post beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t43, post beat: " (pr-str ass2)))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind _ (log-message "ass1, t54, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t54, pre beat: " (pr-str ass2)))
(do-executor-heartbeat cluster storm-id executor-id1)
@@ -745,7 +769,7 @@
; have to wait an extra 10 seconds because nimbus may not
; resynchronize its heartbeat time till monitor-time secs after
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind _ (log-message "ass1, t65, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t65, pre beat: " (pr-str ass2)))
(do-executor-heartbeat cluster storm-id executor-id1)
@@ -754,7 +778,7 @@
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(check-consistency cluster "test")
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind _ (log-message "ass1, t76, pre beat: " (pr-str ass1)))
(bind _ (log-message "ass2, t76, pre beat: " (pr-str ass2)))
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
@@ -764,98 +788,100 @@
(bind _ (log-message "ass2, t76, post beat: " (pr-str ass2)))
(check-consistency cluster "test")
- (advance-cluster-time cluster 31)
+ (.advanceClusterTime cluster 31)
(is (not= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2))) ; tests launch timeout
(check-consistency cluster "test")
(bind ass1 (executor-assignment cluster storm-id executor-id1))
- (bind active-supervisor (first ass2))
- (kill-supervisor cluster active-supervisor)
+ (bind active-supervisor (.get_node ass2))
+ (.killSupervisor cluster active-supervisor)
(doseq [i (range 12)]
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
- (advance-cluster-time cluster 10)
+ (.advanceClusterTime cluster 10)
)
;; tests that it doesn't reassign executors if they're heartbeating even if supervisor times out
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(check-consistency cluster "test")
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(is (not-nil? ass1))
(is (not-nil? ass2))
- (is (not= active-supervisor (first (executor-assignment cluster storm-id executor-id2))))
- (is (not= active-supervisor (first (executor-assignment cluster storm-id executor-id1))))
+ (is (not= active-supervisor (.get_node (executor-assignment cluster storm-id executor-id2))))
+ (is (not= active-supervisor (.get_node (executor-assignment cluster storm-id executor-id1))))
(check-consistency cluster "test")
(doseq [supervisor-id (.supervisors state nil)]
- (kill-supervisor cluster supervisor-id))
+ (.killSupervisor cluster supervisor-id))
- (advance-cluster-time cluster 90)
+ (.advanceClusterTime cluster 90)
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
(is (nil? ass1))
(is (nil? ass2))
(check-consistency cluster "test" :assigned? false)
- (add-supervisor cluster)
- (advance-cluster-time cluster 11)
+ (.addSupervisor cluster)
+ (.advanceClusterTime cluster 11)
(check-consistency cluster "test")
)))
(deftest test-reassignment-to-constrained-cluster
- (with-simulated-time-local-cluster [cluster :supervisors 0
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 0)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
- (add-supervisor cluster :ports 1 :id "a")
- (add-supervisor cluster :ports 1 :id "b")
- (bind conf (:daemon-conf cluster))
+ (.addSupervisor cluster 1 "a")
+ (.addSupervisor cluster 1 "b")
+ (bind conf (.getDaemonConf cluster))
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 2))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
- (advance-cluster-time cluster 11)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
+ (.advanceClusterTime cluster 11)
(check-consistency cluster "test")
(bind storm-id (StormCommon/getStormId state "test"))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)
- (advance-cluster-time cluster 13)
+ (.advanceClusterTime cluster 13)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
- (kill-supervisor cluster "b")
+ (.killSupervisor cluster "b")
(do-executor-heartbeat cluster storm-id executor-id1)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)
(check-consistency cluster "test")
@@ -866,42 +892,45 @@
(check-distribution (vals slot-executors) distribution))
(defn check-num-nodes [slot-executors num-nodes]
- (let [nodes (->> slot-executors keys (map first) set)]
+ (let [nodes (->> slot-executors keys (map (fn [np] (.get_node np))) set)]
(is (= num-nodes (count nodes)))
))
(deftest test-reassign-squeezed-topology
- (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 1
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 1)
+ (.withPortsPerSupervisor 1)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 9))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} topology) ; distribution should be 2, 2, 2, 3 ideally
- (advance-cluster-time cluster 11)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster "test" {TOPOLOGY-WORKERS 4} topology) ; distribution should be 2, 2, 2, 3 ideally
+ (.advanceClusterTime cluster 11)
(bind storm-id (StormCommon/getStormId state "test"))
(bind slot-executors (slot-assignments cluster storm-id))
(check-executor-distribution slot-executors [9])
(check-consistency cluster "test")
- (add-supervisor cluster :ports 2)
- (advance-cluster-time cluster 11)
+ (.addSupervisor cluster 2)
+ (.advanceClusterTime cluster 11)
(bind slot-executors (slot-assignments cluster storm-id))
(bind executor->start (executor-start-times cluster storm-id))
(check-executor-distribution slot-executors [3 3 3])
(check-consistency cluster "test")
- (add-supervisor cluster :ports 8)
+ (.addSupervisor cluster 8)
;; this actually works for any time > 0, since zookeeper fires an event causing immediate reassignment
;; doesn't work for time = 0 because it's not waiting for cluster yet, so test might happen before reassignment finishes
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind slot-executors2 (slot-assignments cluster storm-id))
(bind executor->start2 (executor-start-times cluster storm-id))
(check-executor-distribution slot-executors2 [2 2 2 3])
@@ -922,47 +951,49 @@
)))
(deftest test-rebalance
- (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 3
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 1)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind storm-id (StormCommon/getStormId state "test"))
- (add-supervisor cluster :ports 3)
- (add-supervisor cluster :ports 3)
+ (.addSupervisor cluster 3)
+ (.addSupervisor cluster 3)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind slot-executors (slot-assignments cluster storm-id))
;; check that all workers are on one machine
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 1)
- (.rebalance (:nimbus cluster) "test" (RebalanceOptions.))
+ (.rebalance (.getNimbus cluster) "test" (RebalanceOptions.))
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 1)
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(bind slot-executors (slot-assignments cluster storm-id))
(check-executor-distribution slot-executors [1 1 1])
(check-num-nodes slot-executors 3)
(is (thrown? InvalidTopologyException
- (.rebalance (:nimbus cluster) "test"
+ (.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" (int 0)})
))))
@@ -970,23 +1001,25 @@
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(deftest test-rebalance-change-parallelism
- (with-simulated-time-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 4)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 6)
{TOPOLOGY-TASKS 12})}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(bind storm-id (StormCommon/getStormId state "test"))
(bind checker (fn [distribution]
(check-executor-distribution
@@ -994,30 +1027,30 @@
distribution)))
(checker [2 2 2])
- (.rebalance (:nimbus cluster) "test"
+ (.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_workers (int 6))
))
- (advance-cluster-time cluster 29)
+ (.advanceClusterTime cluster 29)
(checker [2 2 2])
- (advance-cluster-time cluster 3)
+ (.advanceClusterTime cluster 3)
(checker [1 1 1 1 1 1])
- (.rebalance (:nimbus cluster) "test"
+ (.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" (int 1)})
))
- (advance-cluster-time cluster 29)
+ (.advanceClusterTime cluster 29)
(checker [1 1 1 1 1 1])
- (advance-cluster-time cluster 3)
+ (.advanceClusterTime cluster 3)
(checker [1])
- (.rebalance (:nimbus cluster) "test"
+ (.rebalance (.getNimbus cluster) "test"
(doto (RebalanceOptions.)
(.set_num_executors {"1" (int 8)})
(.set_num_workers 4)
))
- (advance-cluster-time cluster 32)
+ (.advanceClusterTime cluster 32)
(checker [2 2 2 2])
(check-consistency cluster "test")
@@ -1033,9 +1066,9 @@
(let [assignments (.assignments state nil)]
(log-message "Assignemts: " assignments)
(let [id->node->ports (into {} (for [id assignments
- :let [executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state id nil)))
+ :let [executor->node+port (.get_executor_node_port (.assignmentInfo state id nil))
node+ports (set (.values executor->node+port))
- node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [[node port] node+ports] {node [port]}))]]
+ node->ports (apply merge-with (fn [a b] (distinct (concat a b))) (for [np node+ports] {(.get_node np) [(first (.get_port np))]}))]]
{id node->ports}))
_ (log-message "id->node->ports: " id->node->ports)
all-nodes (apply merge-with (fn [a b]
@@ -1047,12 +1080,15 @@
)))
(deftest test-rebalance-constrained-cluster
- (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 4
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withSupervisors 1)
+ (.withPortsPerSupervisor 4)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
NIMBUS-MONITOR-FREQ-SECS 10
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
TOPOLOGY-ACKER-EXECUTORS 0
- TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
+ TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
@@ -1066,43 +1102,44 @@
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
- (bind state (:storm-cluster-state cluster))
- (submit-local-topology (:nimbus cluster)
+ (bind state (.getClusterState cluster))
+ (.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology)
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"test2"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology2)
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"test3"
{TOPOLOGY-WORKERS 3
TOPOLOGY-MESSAGE-TIMEOUT-SECS 90} topology3)
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(check-for-collisions state)
- (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.)
+ (.rebalance (.getNimbus cluster) "test" (doto (RebalanceOptions.)
(.set_num_workers 4)
(.set_wait_secs 0)
))
- (advance-cluster-time cluster 11)
+ (.advanceClusterTime cluster 11)
(check-for-collisions state)
- (advance-cluster-time cluster 30)
+ (.advanceClusterTime cluster 30)
(check-for-collisions state)
)))
(deftest test-submit-invalid
- (with-simulated-time-local-cluster [cluster
- :daemon-conf {SUPERVISOR-ENABLE false
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withSimulatedTime)
+ (.withDaemonConf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
TOPOLOGY-EVENTLOGGER-EXECUTORS 0
NIMBUS-EXECUTORS-PER-TOPOLOGY 8
- NIMBUS-SLOTS-PER-TOPOLOGY 8}]
+ NIMBUS-SLOTS-PER-TOPOLOGY 8})))]
(letlocals
(bind topology (Thrift/buildTopology
{"1" (Thrift/prepareSpoutDetails
@@ -1110,7 +1147,7 @@
{TOPOLOGY-TASKS 1})}
{}))
(is (thrown? InvalidTopologyException
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"test/aaa"
{}
topology)))
@@ -1119,9 +1156,9 @@
(TestPlannerSpout. true) (Integer. 16)
{TOPOLOGY-TASKS 16})}
{}))
- (bind state (:storm-cluster-state cluster))
+ (bind state (.getClusterState cluster))
(is (thrown? InvalidTopologyException
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 3}
topology)))
@@ -1131,67 +1168,67 @@
{TOPOLOGY-TASKS 5})}
{}))
(is (thrown? InvalidTopologyException
- (submit-local-topology (:nimbus cluster)
+ (.submitTopology cluster
"test"
{TOPOLOGY-WORKERS 16}
- topology)))
- (is (nil? (submit-local-topology (:nimbus cluster)
- "test"
- {TOPOLOGY-WORKERS 8}
- topology))))))
+ topology))))))
(deftest test-clean-inbox
"Tests that the inbox correctly cleans jar files."
- (with-simulated-time
- (with-local-tmp [dir-location]
- (let [dir (File. dir-location)
- mk-file (fn [name seconds-ago]
- (let [f (File. (str dir-location "/" name))
- t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
- (FileUtils/touch f)
- (.setLastModified f t)))
- assert-files-in-dir (fn [compare-file-names]
- (let [file-names (map #(.getName %) (file-seq dir))]
- (is (= (sort compare-file-names)
- (sort (filter #(.endsWith % ".jar") file-names))
- ))))]
- ;; Make three files a.jar, b.jar, c.jar.
- ;; a and b are older than c and should be deleted first.
- (advance-time-secs! 100)
- (doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
- (apply mk-file fs))
- (assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
- (Nimbus/cleanInbox dir-location 10)
- (assert-files-in-dir ["c.jar"])
- ;; Cleanit again, c.jar should stay
- (advance-time-secs! 5)
- (Nimbus/cleanInbox dir-location 10)
- (assert-files-in-dir ["c.jar"])
- ;; Advance time, clean again, c.jar should be deleted.
- (advance-time-secs! 5)
- (Nimbus/cleanInbox dir-location 10)
- (assert-files-in-dir [])
- ))))
+ (with-open [_ (Time$SimulatedTime.)
+ tmp-path (TmpPath. )]
+ (let [dir-location (.getPath tmp-path)
+ dir (File. dir-location)
+ mk-file (fn [name seconds-ago]
+ (let [f (File. (str dir-location "/" name))
+ t (- (Time/currentTimeMillis) (* seconds-ago 1000))]
+ (FileUtils/touch f)
+ (.setLastModified f t)))
+ assert-files-in-dir (fn [compare-file-names]
+ (let [file-names (map #(.getName %) (file-seq dir))]
+ (is (= (sort compare-file-names)
+ (sort (filter #(.endsWith % ".jar") file-names))
+ ))))]
+ ;; Make three files a.jar, b.jar, c.jar.
+ ;; a and b are older than c and should be deleted first.
+ (Time/advanceTimeSecs 100)
+ (doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
+ (apply mk-file fs))
+ (assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
+ (Nimbus/cleanInbox dir-location 10)
+ (assert-files-in-dir ["c.jar"])
+ ;; Cleanit again, c.jar should stay
+ (Time/advanceTimeSecs 5)
+ (Nimbus/cleanInbox dir-location 10)
+ (assert-files-in-dir ["c.jar"])
+ ;; Advance time, clean again, c.jar should be deleted.
+ (Time/advanceTimeSecs 5)
+ (Nimbus/cleanInbox dir-location 10)
+ (assert-files-in-dir [])
+ )))
(defn wait-for-status [nimbus name status]
- (while-timeout 5000
- (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
- topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
- (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
- (not= topo-status status))
- (Thread/sleep 100)))
+ (Testing/whileTimeout 5000
+ (reify Testing$Condition
+ (exec [this]
+ (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
+ topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
+ (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
+ (not= topo-status status))))
+ (fn [] (Thread/sleep 100))))
(deftest test-leadership
"Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
- (with-inprocess-zookeeper zk-port
- (with-local-tmp [nimbus-dir]
- (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
+ (with-open [zk (InProcessZookeeper. )]
+ (with-open [tmp-nimbus-dir (TmpPath.)
+ _ (MockedZookeeper. (proxy [Zookeeper] []
+ (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+ (let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
- STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
@@ -1202,7 +1239,7 @@
{}))
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector :is-leader false))))]
+ (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. false))))]
(letlocals
(bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1210,7 +1247,7 @@
(.launchServer non-leader-nimbus)
;first we verify that the master nimbus can perform all actions, even with another nimbus present.
- (submit-local-topology nimbus "t1" {} topology)
+ (.submitTopology nimbus "t1" nil "{}" topology)
;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
(.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
(wait-for-status nimbus "t1" "ACTIVE")
@@ -1221,9 +1258,10 @@
;now we verify that non master nimbus can not perform any of the actions.
(is (thrown? RuntimeException
- (submit-local-topology non-leader-nimbus
+ (.submitTopology non-leader-nimbus
"failing"
- {}
+ nil
+ "{}"
topology)))
(is (thrown? RuntimeException
@@ -1245,15 +1283,14 @@
(.disconnect cluster-state))))))
(deftest test-nimbus-iface-submitTopologyWithOpts-checks-authorization
- (with-local-cluster [cluster
- :daemon-conf {NIMBUS-AUTHORIZER
- "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
+ (with-open [cluster (.build (doto (LocalCluster$Builder. )
+ (.withDaemonConf {NIMBUS-AUTHORIZER
+ "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
(let [
- nimbus (:nimbus cluster)
topology (Thrift/buildTopology {} {})
]
(is (thrown? AuthorizationException
- (submit-local-topology-with-opts nimbus "mystorm" {} topology
+ (.submitTopologyWithOpts cluster "mystorm" {} topology
(SubmitOptions. TopologyInitialStatus/INACTIVE))
))
)
@@ -1263,9 +1300,12 @@
(deftest test-nimbus-iface-methods-check-authorization
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
- :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
- (let [nimbus (:nimbus cluster)
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withClusterState cluster-state)
+ (.withBlobStore blob-store)
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
+ (let [nimbus (.getNimbus cluster)
topology-name "test"
topology-id "test-id"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
@@ -1278,13 +1318,14 @@
(deftest test-nimbus-check-authorization-params
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)
- mk-nimbus (fn
- [conf inimbus blob-store leader-elector group-mapper cluster-state]
- (Mockito/spy (mk-nimbus conf inimbus blob-store leader-elector group-mapper cluster-state)))]
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store :mk-nimbus mk-nimbus
- :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
- (let [nimbus (:nimbus cluster)
+ blob-store (Mockito/mock BlobStore)]
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withClusterState cluster-state)
+ (.withBlobStore blob-store)
+ (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (let [nimbus (.getNimbus cluster)
topology-name "test-nimbus-check-autho-params"
topology-id "fake-id"
topology (Thrift/buildTopology {} {})
@@ -1334,13 +1375,14 @@
(deftest test-check-authorization-getSupervisorPageInfo
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)
- mk-nimbus (fn
- [conf inimbus blob-store leader-elector group-mapper cluster-state]
- (Mockito/spy (mk-nimbus conf inimbus blob-store leader-elector group-mapper cluster-state)))]
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store :mk-nimbus mk-nimbus
- :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
- (let [nimbus (:nimbus cluster)
+ blob-store (Mockito/mock BlobStore)]
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withClusterState cluster-state)
+ (.withBlobStore blob-store)
+ (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (let [nimbus (.getNimbus cluster)
expected-name "test-nimbus-check-autho-params"
expected-conf {TOPOLOGY-NAME expected-name
TOPOLOGY-WORKERS 1
@@ -1354,7 +1396,6 @@
(.set_spouts {})
(.set_bolts {})
(.set_state_spouts {}))
- clojurified-assignment (clojurify-assignment assignment)
topo-assignment {expected-name assignment}
check-auth-state (atom [])
mock-check-authorization (fn [nimbus storm-name storm-conf operation]
@@ -1371,17 +1412,16 @@
(.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) expected-conf)
(.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
(.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
- (stubbing [clojurify-assignment clojurified-assignment]
- (.getSupervisorPageInfo nimbus "super1" nil true)
+ (.getSupervisorPageInfo nimbus "super1" nil true)
- ;; afterwards, it should get called twice
- (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name expected-conf "getSupervisorPageInfo")
- (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name expected-conf "getTopology"))))))
+ ;; afterwards, it should get called twice
+ (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getSupervisorPageInfo")
+ (.checkAuthorization (Mockito/verify nimbus) expected-name expected-conf "getTopology")))))
(deftest test-nimbus-iface-getTopology-methods-throw-correctly
- (with-local-cluster [cluster]
+ (with-open [cluster (LocalCluster. )]
(let [
- nimbus (:nimbus cluster)
+ nimbus (.getNimbus cluster)
id "bogus ID"
]
(is (thrown? NotAliveException (.getTopology nimbus id)))
@@ -1416,22 +1456,27 @@
)
)
+(defn mkStormBase [launch-time-secs storm-name status]
+ (doto (StormBase.)
+ (.set_name storm-name)
+ (.set_launch_time_secs (int launch-time-secs))
+ (.set_status status)))
+
(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store]
- (let [nimbus (:nimbus cluster)
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withClusterState cluster-state)
+ (.withBlobStore blob-store)))]
+ (let [nimbus (.getNimbus cluster)
bogus-secs 42
bogus-type TopologyStatus/ACTIVE
bogus-bases {
"1" nil
- "2" (thriftify-storm-base {:launch-time-secs bogus-secs
- :storm-name "id2-name"
- :status {:type bogus-type}})
+ "2" (mkStormBase bogus-secs "id2-name" bogus-type)
"3" nil
- "4" (thriftify-storm-base {:launch-time-secs bogus-secs
- :storm-name "id4-name"
- :status {:type bogus-type}})
+ "4" (mkStormBase bogus-secs "id4-name" bogus-type)
}
topo-name "test-topo"
topo-conf {TOPOLOGY-NAME topo-name
@@ -1499,8 +1544,10 @@
))))
(deftest test-file-bogus-download
- (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
- (let [nimbus (:nimbus cluster)]
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withDaemonConf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0})))]
+ (let [nimbus (.getNimbus cluster)]
(is (thrown-cause? AuthorizationException (.beginFileDownload nimbus nil)))
(is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "")))
(is (thrown-cause? AuthorizationException (.beginFileDownload nimbus "/bogus-path/foo")))
@@ -1509,25 +1556,28 @@
(deftest test-validate-topo-config-on-submit
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
- :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
- (let [nimbus (:nimbus cluster)
- topology (Thrift/buildTopology {} {})
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withClusterState cluster-state)
+ (.withBlobStore blob-store)
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
+ (let [topology (Thrift/buildTopology {} {})
bad-config {"topology.isolate.machines" "2"}]
(is (thrown-cause? InvalidTopologyException
- (submit-local-topology-with-opts nimbus "test" bad-config topology
+ (.submitTopologyWithOpts cluster "test" bad-config topology
(SubmitOptions.))))))))
(deftest test-stateless-with-scheduled-topology-to-be-killed
; tests regression of STORM-856
- (with-inprocess-zookeeper zk-port
- (with-local-tmp [nimbus-dir]
+ (with-open [zk (InProcessZookeeper. )]
+ (with-open [tmp-nimbus-dir (TmpPath. )]
+ (let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
- STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
(bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
@@ -1537,7 +1587,7 @@
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
- (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
+ (.submitTopology nimbus "t1" nil (str "{\"" TOPOLOGY-MESSAGE-TIMEOUT-SECS "\": 30}") topology)
; make transition for topology t1 to be killed -> nimbus applies this event to cluster state
(.killTopology nimbus "t1")
; shutdown nimbus immediately to achieve nimbus doesn't handle event right now
@@ -1550,18 +1600,19 @@
(.launchServer nimbus)
(.shutdown nimbus)
(.disconnect cluster-state)
- ))))
+ )))))
(deftest test-topology-action-notifier
- (with-inprocess-zookeeper zk-port
- (with-local-tmp [nimbus-dir]
- (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
+ (with-open [zk (InProcessZookeeper. )]
+ (with-open [tmp-nimbus-dir (TmpPath.)
+ _ (MockedZookeeper. (proxy [Zookeeper] []
+ (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+ (let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-CLUSTER-MODE "local"
- STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir
NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
(bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1573,7 +1624,7 @@
{"1" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{}))
- (submit-local-topology nimbus "test-notification" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
+ (.submitTopology nimbus "test-notification" nil (str "{\"" TOPOLOGY-MESSAGE-TIMEOUT-SECS "\": 30}") topology)
(.deactivate nimbus "test-notification")
@@ -1594,23 +1645,23 @@
)))))
(deftest test-debug-on-component
- (with-local-cluster [cluster]
- (let [nimbus (:nimbus cluster)
+ (with-open [cluster (LocalCluster. )]
+ (let [nimbus (.getNimbus cluster)
topology (Thrift/buildTopology
{"spout" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{})]
- (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
+ (.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
(.debug nimbus "t1" "spout" true 100))))
(deftest test-debug-on-global
- (with-local-cluster [cluster]
- (let [nimbus (:nimbus cluster)
+ (with-open [cluster (LocalCluster. )]
+ (let [nimbus (.getNimbus cluster)
topology (Thrift/buildTopology
{"spout" (Thrift/prepareSpoutDetails
(TestPlannerSpout. true) (Integer. 3))}
{})]
- (submit-local-topology nimbus "t1" {TOPOLOGY-WORKERS 1} topology)
+ (.submitTopology cluster "t1" {TOPOLOGY-WORKERS 1} topology)
(.debug nimbus "t1" "" true 100))))
;; if the user sends an empty log config, nimbus will say that all
@@ -1618,9 +1669,12 @@
(deftest empty-save-config-results-in-all-unchanged-actions
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
- :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
- (let [nimbus (:nimbus cluster)
+ (with-open [cluster (.build
+ (doto (LocalCluster$Builder. )
+ (.withClusterState cluster-state)
+ (.withBlobStore blob-store)
+ (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
+ (let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
mock-config (LogConfig.)
expected-config (LogConfig.)]
@@ -1644,9 +1698,12 @@
(deftest log-level-update-merges-and-flags-existent-log-level
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)]
- (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store
-
<TRUNCATED>