You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:40 UTC
[17/35] storm git commit: add the plugin to use for manager worker
http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index d3d7344..8f11f8a 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -22,7 +22,8 @@
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout]
[org.apache.storm.daemon.supervisor SupervisorUtils SyncProcessEvent SupervisorData]
[java.util ArrayList Arrays HashMap]
- [org.apache.storm.testing.staticmocking MockedSupervisorUtils])
+ [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
+ [org.apache.storm.daemon.supervisor.workermanager DefaultWorkerManager])
(:import [org.apache.storm.scheduler ISupervisor])
(:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
(:import [org.apache.storm.generated RebalanceOptions WorkerResources])
@@ -367,17 +368,19 @@
(setWorkerUserWSEImpl [conf worker-id user] nil)
(workerRootImpl [conf] "/tmp/workers")
(workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+ worker-manager (proxy [DefaultWorkerManager] []
+ (jlp [stormRoot conf] ""))
process-proxy (proxy [SyncProcessEvent] []
- (jlp [stormRoot conf] "")
(writeLogMetadata [stormconf user workerId stormId port conf] nil)
(createBlobstoreLinks [conf stormId workerId] nil))]
(with-open [_ (ConfigUtilsInstaller. cu-proxy)
_ (UtilsInstaller. utils-spy)]
- (.launchWorker process-proxy mock-supervisor nil
+ (.prepareWorker worker-manager mock-supervisor nil)
+ (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
"" mock-storm-id mock-port
mock-worker-id
- (WorkerResources.) nil nil)
+ (WorkerResources.) nil)
(. (Mockito/verify utils-spy)
(launchProcessImpl (Matchers/eq exp-args)
(Matchers/any)
@@ -405,17 +408,19 @@
(addToClasspathImpl [classpath paths] mock-cp)
(launchProcessImpl [& _] nil))
Mockito/spy)
+ worker-manager (proxy [DefaultWorkerManager] []
+ (jlp [stormRoot conf] ""))
process-proxy (proxy [SyncProcessEvent] []
- (jlp [stormRoot conf] "")
(writeLogMetadata [stormconf user workerId stormId port conf] nil)
(createBlobstoreLinks [conf stormId workerId] nil))]
(with-open [_ (ConfigUtilsInstaller. cu-proxy)
_ (UtilsInstaller. utils-spy)]
- (.launchWorker process-proxy mock-supervisor nil
+ (.prepareWorker worker-manager mock-supervisor nil)
+ (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
"" mock-storm-id
mock-port
mock-worker-id
- (WorkerResources.) nil nil)
+ (WorkerResources.) nil)
(. (Mockito/verify utils-spy)
(launchProcessImpl (Matchers/eq exp-args)
(Matchers/any)
@@ -441,17 +446,19 @@
(str Utils/FILE_PATH_SEPARATOR "base"))
(launchProcessImpl [& _] nil))
Mockito/spy)
+ worker-manager (proxy [DefaultWorkerManager] []
+ (jlp [stormRoot conf] ""))
process-proxy (proxy [SyncProcessEvent] []
- (jlp [stormRoot conf] "")
(writeLogMetadata [stormconf user workerId stormId port conf] nil)
(createBlobstoreLinks [conf stormId workerId] nil))]
(with-open [_ (ConfigUtilsInstaller. cu-proxy)
_ (UtilsInstaller. utils-spy)]
- (.launchWorker process-proxy mock-supervisor nil
+ (.prepareWorker worker-manager mock-supervisor nil)
+ (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
"" mock-storm-id
mock-port
mock-worker-id
- (WorkerResources.) nil nil)
+ (WorkerResources.) nil)
(. (Mockito/verify utils-spy)
(launchProcessImpl (Matchers/eq exp-args)
(Matchers/any)
@@ -477,17 +484,19 @@
(str Utils/FILE_PATH_SEPARATOR "base"))
(launchProcessImpl [& _] nil))
Mockito/spy)
+ worker-manager (proxy [DefaultWorkerManager] []
+ (jlp [stormRoot conf] nil))
process-proxy (proxy [SyncProcessEvent] []
- (jlp [stormRoot conf] nil)
(writeLogMetadata [stormconf user workerId stormId port conf] nil)
(createBlobstoreLinks [conf stormId workerId] nil))]
(with-open [_ (ConfigUtilsInstaller. cu-proxy)
_ (UtilsInstaller. utils-spy)]
- (.launchWorker process-proxy mock-supervisor nil
+ (.prepareWorker worker-manager mock-supervisor nil)
+ (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
"" mock-storm-id
mock-port
mock-worker-id
- (WorkerResources.) nil nil)
+ (WorkerResources.) nil)
(. (Mockito/verify utils-spy)
(launchProcessImpl (Matchers/any)
(Matchers/eq full-env)
@@ -575,18 +584,20 @@
(launchProcessImpl [& _] nil))
Mockito/spy)
supervisor-utils (Mockito/mock SupervisorUtils)
+ worker-manager (proxy [DefaultWorkerManager] []
+ (jlp [stormRoot conf] ""))
process-proxy (proxy [SyncProcessEvent] []
- (jlp [stormRoot conf] "")
(writeLogMetadata [stormconf user workerId stormId port conf] nil))]
(with-open [_ (ConfigUtilsInstaller. cu-proxy)
_ (UtilsInstaller. utils-spy)
_ (MockedSupervisorUtils. supervisor-utils)]
(. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
- (.launchWorker process-proxy mock-supervisor nil
+ (.prepareWorker worker-manager mock-supervisor nil)
+ (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
"" mock-storm-id
mock-port
mock-worker-id
- (WorkerResources.) nil nil)
+ (WorkerResources.) nil)
(. (Mockito/verify utils-spy)
(launchProcessImpl (Matchers/eq exp-launch)
(Matchers/any)
@@ -621,18 +632,20 @@
(launchProcessImpl [& _] nil))
Mockito/spy)
supervisor-utils (Mockito/mock SupervisorUtils)
+ worker-manager (proxy [DefaultWorkerManager] []
+ (jlp [stormRoot conf] ""))
process-proxy (proxy [SyncProcessEvent] []
- (jlp [stormRoot conf] "")
(writeLogMetadata [stormconf user workerId stormId port conf] nil))]
(with-open [_ (ConfigUtilsInstaller. cu-proxy)
_ (UtilsInstaller. utils-spy)
_ (MockedSupervisorUtils. supervisor-utils)]
(. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
- (.launchWorker process-proxy mock-supervisor nil
+ (.prepareWorker worker-manager mock-supervisor nil)
+ (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
"" mock-storm-id
mock-port
mock-worker-id
- (WorkerResources.) nil nil)
+ (WorkerResources.) nil)
(. (Mockito/verify utils-spy)
(launchProcessImpl (Matchers/eq exp-launch)
(Matchers/any)
@@ -664,7 +677,8 @@
(let [scheme "digest"
digest "storm:thisisapoorpassword"
auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
- STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
+ STORM-ZOOKEEPER-AUTH-PAYLOAD digest
+ STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"}
expected-acls (SupervisorUtils/supervisorZkAcls)
fake-isupervisor (reify ISupervisor
(getSupervisorId [this] nil)
@@ -714,7 +728,7 @@
(launchProcessImpl [& _] nil))]
(with-open [_ (UtilsInstaller. utils-proxy)]
(is (try
- (SupervisorUtils/workerLauncher {} nil (ArrayList.) {} nil nil nil)
+ (SupervisorUtils/processLauncher {} nil (ArrayList.) {} nil nil nil)
false
(catch Throwable t
(and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
@@ -736,8 +750,8 @@
mem-onheap (int 512)
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-happy-path-list
@@ -748,8 +762,8 @@
mem-onheap (int 512)
childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-happy-path-list-arraylist
@@ -760,8 +774,8 @@
mem-onheap (int 512)
childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-topology-id-alone
@@ -772,8 +786,8 @@
mem-onheap (int 512)
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-no-keys
@@ -784,8 +798,8 @@
mem-onheap (int 512)
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-nil-childopts
@@ -796,8 +810,8 @@
mem-onheap (int 512)
childopts nil
expected-childopts '[]
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-substitute-childopts-nil-ids
@@ -808,8 +822,8 @@
mem-onheap (int 512)
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
- process-event (SyncProcessEvent.)
- childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+ worker-manager (DefaultWorkerManager.)
+ childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
(is (= expected-childopts childopts-with-ids)))))
(deftest test-retry-read-assignments