You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:40 UTC

[17/35] storm git commit: add the plugin to use for manager worker

http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index d3d7344..8f11f8a 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -22,7 +22,8 @@
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout]
            [org.apache.storm.daemon.supervisor SupervisorUtils SyncProcessEvent SupervisorData]
            [java.util ArrayList Arrays HashMap]
-           [org.apache.storm.testing.staticmocking MockedSupervisorUtils])
+           [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
+           [org.apache.storm.daemon.supervisor.workermanager DefaultWorkerManager])
   (:import [org.apache.storm.scheduler ISupervisor])
   (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
   (:import [org.apache.storm.generated RebalanceOptions WorkerResources])
@@ -367,17 +368,19 @@
                           (setWorkerUserWSEImpl [conf worker-id user] nil)
                          (workerRootImpl [conf] "/tmp/workers")
                           (workerArtifactsRootImpl [conf] "/tmp/workers-artifacts"))
+              worker-manager (proxy [DefaultWorkerManager] []
+                               (jlp [stormRoot conf] ""))
               process-proxy (proxy [SyncProcessEvent] []
-                              (jlp [stormRoot conf] "")
                               (writeLogMetadata [stormconf user workerId stormId port conf] nil)
                               (createBlobstoreLinks [conf stormId workerId] nil))]
 
           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                       _ (UtilsInstaller. utils-spy)]
-                (.launchWorker process-proxy mock-supervisor nil
+                (.prepareWorker worker-manager mock-supervisor nil)
+                (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
                                       "" mock-storm-id mock-port
                                       mock-worker-id
-                                      (WorkerResources.) nil nil)
+                                      (WorkerResources.) nil)
                 (. (Mockito/verify utils-spy)
                    (launchProcessImpl (Matchers/eq exp-args)
                                       (Matchers/any)
@@ -405,17 +408,19 @@
                             (addToClasspathImpl [classpath paths] mock-cp)
                             (launchProcessImpl [& _] nil))
                           Mockito/spy)
+              worker-manager (proxy [DefaultWorkerManager] []
+                               (jlp [stormRoot conf] ""))
               process-proxy (proxy [SyncProcessEvent] []
-                              (jlp [stormRoot conf] "")
                               (writeLogMetadata [stormconf user workerId stormId port conf] nil)
                               (createBlobstoreLinks [conf stormId workerId] nil))]
             (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                         _ (UtilsInstaller. utils-spy)]
-                  (.launchWorker process-proxy mock-supervisor nil
+                  (.prepareWorker worker-manager mock-supervisor nil)
+                  (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
                                             "" mock-storm-id
                                             mock-port
                                             mock-worker-id
-                                            (WorkerResources.) nil nil)
+                                            (WorkerResources.) nil)
                   (. (Mockito/verify utils-spy)
                      (launchProcessImpl (Matchers/eq exp-args)
                                         (Matchers/any)
@@ -441,17 +446,19 @@
                               (str Utils/FILE_PATH_SEPARATOR "base"))
                             (launchProcessImpl [& _] nil))
                           Mockito/spy)
+              worker-manager (proxy [DefaultWorkerManager] []
+                               (jlp [stormRoot conf] ""))
               process-proxy (proxy [SyncProcessEvent] []
-                              (jlp [stormRoot conf] "")
                               (writeLogMetadata [stormconf user workerId stormId port conf] nil)
                               (createBlobstoreLinks [conf stormId workerId] nil))]
           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                       _ (UtilsInstaller. utils-spy)]
-                  (.launchWorker process-proxy mock-supervisor nil
+                  (.prepareWorker worker-manager mock-supervisor nil)
+                  (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
                                                "" mock-storm-id
                                               mock-port
                                               mock-worker-id
-                                              (WorkerResources.) nil nil)
+                                              (WorkerResources.) nil)
                   (. (Mockito/verify utils-spy)
                      (launchProcessImpl (Matchers/eq exp-args)
                                         (Matchers/any)
@@ -477,17 +484,19 @@
                               (str Utils/FILE_PATH_SEPARATOR "base"))
                             (launchProcessImpl [& _] nil))
                           Mockito/spy)
+              worker-manager (proxy [DefaultWorkerManager] []
+                               (jlp [stormRoot conf] nil))
               process-proxy (proxy [SyncProcessEvent] []
-                              (jlp [stormRoot conf] nil)
                               (writeLogMetadata [stormconf user workerId stormId port conf] nil)
                               (createBlobstoreLinks [conf stormId workerId] nil))]
           (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                       _ (UtilsInstaller. utils-spy)]
-            (.launchWorker process-proxy mock-supervisor nil
+            (.prepareWorker worker-manager mock-supervisor nil)
+            (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
                                         "" mock-storm-id
                                         mock-port
                                         mock-worker-id
-                                        (WorkerResources.) nil nil)
+                                        (WorkerResources.) nil)
               (. (Mockito/verify utils-spy)
                  (launchProcessImpl (Matchers/any)
                                     (Matchers/eq full-env)
@@ -575,18 +584,20 @@
                               (launchProcessImpl [& _] nil))
                             Mockito/spy)
                 supervisor-utils (Mockito/mock SupervisorUtils)
+                worker-manager (proxy [DefaultWorkerManager] []
+                                 (jlp [stormRoot conf] ""))
                 process-proxy (proxy [SyncProcessEvent] []
-                                (jlp [stormRoot conf] "")
                                 (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
             (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                         _ (UtilsInstaller. utils-spy)
                         _ (MockedSupervisorUtils. supervisor-utils)]
               (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
-              (.launchWorker process-proxy mock-supervisor nil
+              (.prepareWorker worker-manager mock-supervisor nil)
+              (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
                                           "" mock-storm-id
                                           mock-port
                                           mock-worker-id
-                                          (WorkerResources.) nil nil)
+                                          (WorkerResources.) nil)
                 (. (Mockito/verify utils-spy)
                    (launchProcessImpl (Matchers/eq exp-launch)
                                       (Matchers/any)
@@ -621,18 +632,20 @@
                               (launchProcessImpl [& _] nil))
                             Mockito/spy)
                 supervisor-utils (Mockito/mock SupervisorUtils)
+                worker-manager (proxy [DefaultWorkerManager] []
+                                 (jlp [stormRoot conf] ""))
                 process-proxy (proxy [SyncProcessEvent] []
-                                (jlp [stormRoot conf] "")
                                 (writeLogMetadata [stormconf user workerId stormId port conf] nil))]
             (with-open [_ (ConfigUtilsInstaller. cu-proxy)
                         _ (UtilsInstaller. utils-spy)
                         _ (MockedSupervisorUtils. supervisor-utils)]
               (. (Mockito/when (.javaCmdImpl supervisor-utils (Mockito/any))) (thenReturn (str "java")))
-              (.launchWorker process-proxy mock-supervisor nil
+              (.prepareWorker worker-manager mock-supervisor nil)
+              (.launchDistributedWorker process-proxy worker-manager mock-supervisor nil
                                           "" mock-storm-id
                                           mock-port
                                           mock-worker-id
-                                          (WorkerResources.) nil nil)
+                                          (WorkerResources.) nil)
                 (. (Mockito/verify utils-spy)
                  (launchProcessImpl (Matchers/eq exp-launch)
                                     (Matchers/any)
@@ -664,7 +677,8 @@
     (let [scheme "digest"
           digest "storm:thisisapoorpassword"
           auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
-                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
+                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest
+                     STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"}
           expected-acls (SupervisorUtils/supervisorZkAcls)
           fake-isupervisor (reify ISupervisor
                              (getSupervisorId [this] nil)
@@ -714,7 +728,7 @@
                           (launchProcessImpl [& _] nil))]
         (with-open [_ (UtilsInstaller. utils-proxy)]
           (is (try
-                (SupervisorUtils/workerLauncher {} nil (ArrayList.) {} nil nil nil)
+                (SupervisorUtils/processLauncher {} nil (ArrayList.) {} nil nil nil)
                 false
                 (catch Throwable t
                   (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
@@ -736,8 +750,8 @@
             mem-onheap (int 512)
             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-substitute-childopts-happy-path-list
@@ -748,8 +762,8 @@
             mem-onheap (int 512)
             childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-substitute-childopts-happy-path-list-arraylist
@@ -760,8 +774,8 @@
             mem-onheap (int 512)
             childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-substitute-childopts-topology-id-alone
@@ -772,8 +786,8 @@
             mem-onheap (int 512)
             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-substitute-childopts-no-keys
@@ -784,8 +798,8 @@
             mem-onheap (int 512)
             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-substitute-childopts-nil-childopts
@@ -796,8 +810,8 @@
             mem-onheap (int 512)
             childopts nil
             expected-childopts '[]
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-substitute-childopts-nil-ids
@@ -808,8 +822,8 @@
             mem-onheap (int 512)
             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-            process-event (SyncProcessEvent.)
-            childopts-with-ids (vec (.substituteChildopts process-event childopts worker-id topology-id port mem-onheap))]
+            worker-manager (DefaultWorkerManager.)
+            childopts-with-ids (vec (.substituteChildopts worker-manager childopts worker-id topology-id port mem-onheap))]
         (is (= expected-childopts childopts-with-ids)))))
 
   (deftest test-retry-read-assignments