You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:21 UTC
[11/27] storm git commit: Merge branch 'master' into ClusterUtils
http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b89b7bb,9c31ddf..3ebdbcd
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -21,20 -21,20 +21,23 @@@
(:require [clojure [string :as string] [set :as set]])
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout])
(:import [org.apache.storm.scheduler ISupervisor])
- (:import [org.apache.storm.utils ConfigUtils])
+ (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
(:import [org.apache.storm.generated RebalanceOptions])
- (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster])
- (:import [org.mockito Matchers Mockito])
++ (:import [org.apache.storm.testing.staticmocking MockedCluster])
(:import [java.util UUID])
- (:import [org.mockito Mockito])
++ (:import [org.mockito Mockito Matchers])
+ (:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [java.io File])
(:import [java.nio.file Files])
- (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
- (:import [org.apache.storm.utils Utils IPredicate]
++ (:import [org.apache.storm.utils Utils IPredicate])
++ (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
+ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
+ UtilsInstaller])
(:import [java.nio.file.attribute FileAttribute])
- (:use [org.apache.storm config testing util timer log])
+ (:use [org.apache.storm config testing util timer log converter])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
- [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
+ [org.apache.storm [thrift :as thrift]])
(:use [conjure core])
(:require [clojure.java.io :as io]))
@@@ -43,9 -43,10 +46,10 @@@
[cluster supervisor-id port]
(let [state (:storm-cluster-state cluster)
slot-assigns (for [storm-id (.assignments state nil)]
- (let [executors (-> (.assignment-info state storm-id nil)
+ (let [executors (-> (clojurify-assignment (.assignmentInfo state storm-id nil))
:executor->node+port
- reverse-map
+ (Utils/reverseMap)
+ clojurify-structure
(get [supervisor-id port] ))]
(when executors [storm-id executors])
))
@@@ -565,198 -632,203 +635,201 @@@
fake-isupervisor (reify ISupervisor
(getSupervisorId [this] nil)
(getAssignmentId [this] nil))
+ fake-cu (proxy [ConfigUtils] []
+ (supervisorStateImpl [conf] nil)
+ (supervisorLocalDirImpl [conf] nil))
+ fake-utils (proxy [Utils] []
+ (localHostnameImpl [] nil)
+ (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
- (upTime [] 0))))]
++ (upTime [] 0))))
+ cluster-utils (Mockito/mock ClusterUtils)]
- (with-open [_ (proxy [MockedConfigUtils] []
- (supervisorStateImpl [conf] nil)
- (supervisorLocalDirImpl [conf] nil))
+ (with-open [_ (ConfigUtilsInstaller. fake-cu)
- _ (UtilsInstaller. fake-utils)]
- (stubbing [cluster/mk-storm-cluster-state nil
- mk-timer nil]
++ _ (UtilsInstaller. fake-utils)
+ mocked-cluster (MockedCluster. cluster-utils)]
- (stubbing [uptime-computer nil
- ; cluster/mk-storm-cluster-state nil
- local-hostname nil
- mk-timer nil]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
- (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
- ; (verify-call-times-for cluster/mk-storm-cluster-state 1)
- ; (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- ; expected-acls)
- )))))
- (verify-call-times-for cluster/mk-storm-cluster-state 1)
- (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- expected-acls)))))
++ (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))))))
- (deftest test-write-log-metadata
- (testing "supervisor writes correct data to logs metadata file"
- (let [exp-owner "alice"
- exp-worker-id "42"
- exp-storm-id "0123456789"
- exp-port 4242
- exp-logs-users ["bob" "charlie" "daryl"]
- exp-logs-groups ["read-only-group" "special-group"]
- storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
- TOPOLOGY-USERS ["charlie" "bob"]
- TOPOLOGY-GROUPS ["special-group"]
- LOGS-GROUPS ["read-only-group"]
- LOGS-USERS ["daryl"]}
- exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
- "worker-id" exp-worker-id
- LOGS-USERS exp-logs-users
- LOGS-GROUPS exp-logs-groups}
- conf {}]
- (mocking [supervisor/write-log-metadata-to-yaml-file!]
- (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
- exp-storm-id exp-port conf)
- (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
- exp-storm-id exp-port exp-data conf)))))
+ (deftest test-write-log-metadata
+ (testing "supervisor writes correct data to logs metadata file"
+ (let [exp-owner "alice"
+ exp-worker-id "42"
+ exp-storm-id "0123456789"
+ exp-port 4242
+ exp-logs-users ["bob" "charlie" "daryl"]
+ exp-logs-groups ["read-only-group" "special-group"]
+ storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+ TOPOLOGY-USERS ["charlie" "bob"]
+ TOPOLOGY-GROUPS ["special-group"]
+ LOGS-GROUPS ["read-only-group"]
+ LOGS-USERS ["daryl"]}
+ exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+ "worker-id" exp-worker-id
+ LOGS-USERS exp-logs-users
+ LOGS-GROUPS exp-logs-groups}
+ conf {}]
+ (mocking [supervisor/write-log-metadata-to-yaml-file!]
+ (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+ exp-storm-id exp-port conf)
+ (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+ exp-storm-id exp-port exp-data conf)))))
- (deftest test-worker-launcher-requires-user
- (testing "worker-launcher throws on blank user"
- (mocking [launch-process]
- (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
- #"(?i).*user cannot be blank.*"
- (supervisor/worker-launcher {} nil ""))))))
+ (deftest test-worker-launcher-requires-user
+ (testing "worker-launcher throws on blank user"
+ (let [utils-proxy (proxy [Utils] []
+ (launchProcessImpl [& _] nil))]
+ (with-open [_ (UtilsInstaller. utils-proxy)]
+ (is (try
+ (supervisor/worker-launcher {} nil "")
+ false
+ (catch Throwable t
+ (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+ (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
- (defn found? [sub-str input-str]
- (if (string? input-str)
- (contrib-str/substring? sub-str (str input-str))
- (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+ (defn found? [sub-str input-str]
+ (if (string? input-str)
+ (contrib-str/substring? sub-str (str input-str))
+ (boolean (some #(contrib-str/substring? sub-str %) input-str))))
- (defn not-found? [sub-str input-str]
+ (defn not-found? [sub-str input-str]
(complement (found? sub-str input-str)))
- (deftest test-substitute-childopts-happy-path-string
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-happy-path-string
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-happy-path-list
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-happy-path-list
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-happy-path-list-arraylist
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-happy-path-list-arraylist
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-topology-id-alone
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-topology-id-alone
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-no-keys
- (testing "worker-launcher has no ids to replace in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-no-keys
+ (testing "worker-launcher has no ids to replace in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-nil-childopts
- (testing "worker-launcher has nil childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts nil
- expected-childopts nil
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-nil-childopts
+ (testing "worker-launcher has nil childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts nil
+ expected-childopts nil
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-nil-ids
- (testing "worker-launcher has nil ids"
- (let [worker-id nil
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-nil-ids
+ (testing "worker-launcher has nil ids"
+ (let [worker-id nil
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-retry-read-assignments
- (with-simulated-time-local-cluster [cluster
- :supervisors 0
- :ports-per-supervisor 2
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
- TOPOLOGY-ACKER-EXECUTORS 0}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind topology1 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind topology2 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind state (:storm-cluster-state cluster))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology1"
- {TOPOLOGY-WORKERS 2}
- topology1
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- ))
- (is (empty? (:launched changed)))
- (bind options (RebalanceOptions.))
- (.set_wait_secs options 0)
- (bind changed (capture-changed-workers
- (.rebalance (:nimbus cluster) "topology2" options)
- (advance-cluster-time cluster 10)
- (heartbeat-workers cluster "sup1" [1 2 3 4])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed)
- {"sup1" [1 2]}
- (get-storm-id (:storm-cluster-state cluster) "topology1"))
- (validate-launched-once (:launched changed)
- {"sup1" [3 4]}
- (get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
+ (deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+ (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
- ))))
++ )))