You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:40 UTC

[04/23] storm git commit: Updated to read-supervisor-storm-conf

Updated to read-supervisor-storm-conf


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ba70bf1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ba70bf1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ba70bf1c

Branch: refs/heads/master
Commit: ba70bf1cfe72aeabec104d8ea664f087b719a08c
Parents: 310128d
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Jan 15 14:28:04 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/supervisor.clj  | 21 +++---
 .../src/clj/org/apache/storm/daemon/worker.clj  |  2 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 23 +++++++
 .../clj/org/apache/storm/supervisor_test.clj    | 72 ++++++++++----------
 4 files changed, 72 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 76af578..4296a86 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -472,7 +472,7 @@
 (defn remove-blob-references
   "Remove a reference to a blob when its no longer needed."
   [localizer storm-id conf]
-  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+  (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         user (storm-conf TOPOLOGY-SUBMITTER-USER)
         topo-name (storm-conf TOPOLOGY-NAME)]
@@ -495,7 +495,7 @@
   "For each of the downloaded topologies, adds references to the blobs that the topologies are
   using. This is used to reconstruct the cache on restart."
   [localizer storm-id conf]
-  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+  (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         user (storm-conf TOPOLOGY-SUBMITTER-USER)
         topo-name (storm-conf TOPOLOGY-NAME)
@@ -613,7 +613,7 @@
   "Update each blob listed in the topology configuration if the latest version of the blob
    has not been downloaded."
   [conf storm-id localizer]
-  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+  (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         user (storm-conf TOPOLOGY-SUBMITTER-USER)
         localresources (blobstore-map-to-localresources blobstore-map)]
@@ -724,7 +724,7 @@
                       action ^ProfileAction (:action pro-action)
                       stop? (> (System/currentTimeMillis) (:timestamp pro-action))
                       target-dir (worker-artifacts-root conf storm-id port)
-                      storm-conf (read-supervisor-storm-conf conf storm-id)
+                      storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
                       user (storm-conf TOPOLOGY-SUBMITTER-USER)
                       environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {})
                       worker-pid (slurp (worker-artifacts-pid-path conf storm-id port))
@@ -950,7 +950,7 @@
         (FileUtils/forceMkdir (File. stormroot))
         (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot))
           (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE)))
-        (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot))
+        (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot))
       (do
         (log-message "Failed to download blob resources for storm-id " storm-id)
         (rmr tmproot)))))
@@ -962,7 +962,10 @@
     (when (not (.exists (.getParentFile file)))
       (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
         (do (FileUtils/forceMkdir (.getParentFile file))
-            (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
+            (setup-storm-code-dir
+              conf
+              (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
+              (.getCanonicalPath (.getParentFile file))))
         (.mkdirs (.getParentFile file))))
     (let [writer (java.io.FileWriter. file)
           yaml (Yaml.)]
@@ -1013,7 +1016,7 @@
   "Create symlinks in worker launch directory for all blobs"
   [conf storm-id worker-id]
   (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
-        storm-conf (read-supervisor-storm-conf conf storm-id)
+        storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
         workerroot (worker-root conf worker-id)
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         blob-file-names (get-blob-file-names blobstore-map)
@@ -1053,7 +1056,7 @@
           jlp (jlp stormroot conf)
           stormjar (ConfigUtils/supervisorStormJarPath stormroot)
           _ (log-message "zliu stormjar: " stormjar)
-          storm-conf (read-supervisor-storm-conf conf storm-id)
+          storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
           topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
                            [cp]
                            [])
@@ -1158,7 +1161,7 @@
       (finally
         (.shutdown blob-store)))
     (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
-    (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+    (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)
     (let [classloader (.getContextClassLoader (Thread/currentThread))
           resources-jar (resources-jar)
           url (.getResource classloader RESOURCES-SUBDIR)

http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index d4d66dc..92093ae 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -590,7 +590,7 @@
   (def latest-log-config (atom {}))
   (def original-log-levels (atom {}))
 
-  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+  (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
         storm-conf (override-login-config-with-system-property storm-conf)
         acls (Utils/getWorkerACL storm-conf)
         cluster-state (cluster/mk-distributed-cluster-state conf :auth-conf storm-conf :acls acls :context (ClusterStateContext. DaemonType/WORKER))

http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index b657ec4..a489e74 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -430,7 +430,30 @@ public class ConfigUtils {
         return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"));
     }
 
+    //For testing only
+    // for java
+    // try (SetMockedSupervisorStormConf mocked = new SetMockedSupervisorStormConf(conf)) {
+    //    run test ...
+    // }
+    //
+    // for clojure
+    // (with-open [mock (SetMockedSupervisorStormConf. conf)]
+    //     run test ...)
+    public static class SetMockedSupervisorStormConf implements Closeable {
+        public SetMockedSupervisorStormConf(Map conf) {
+            mockedSupervisorStormConf = conf;
+        }
+
+        @Override
+        public void close() {
+            mockedSupervisorStormConf = null;
+        }
+    }
+    private static Map mockedSupervisorStormConf = null;
     public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException {
+        if (mockedSupervisorStormConf != null) {
+            return mockedSupervisorStormConf;
+        }
         String stormRoot = supervisorStormDistRoot(conf, stormId);
         String confPath = supervisorStormConfPath(stormRoot);
         return readSupervisorStormConfGivenPath(conf, confPath);

http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 3b5877c..8a07006 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -316,13 +316,13 @@
                                     mock-cp)
               _ (log-message "zliu testing 1, exp-args is: " exp-args)
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                      WORKER-CHILDOPTS string-opts}}]
-          (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
-              (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-string-opts}
-                     add-to-classpath mock-cp
+                                      WORKER-CHILDOPTS string-opts}}
+              mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                            topo-string-opts}]
+          (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+              (stubbing [add-to-classpath mock-cp
                      launch-process nil
-                         supervisor-stormdist-root nil
                      set-worker-user! nil
                      supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
@@ -342,15 +342,15 @@
               exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
               _ (log-message "zliu testing 2, exp-args is: " exp-args)
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                      WORKER-CHILDOPTS list-opts}}]
-            (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
-                (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-list-opts}
-                     add-to-classpath mock-cp
+                                      WORKER-CHILDOPTS list-opts}}
+              mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                            topo-list-opts}]
+            (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                (stubbing [add-to-classpath mock-cp
                      launch-process nil
                      set-worker-user! nil
                      supervisor/jlp nil
-                           supervisor-stormdist-root nil
                      supervisor/write-log-metadata! nil
                      supervisor/create-blobstore-links nil
                      worker-artifacts-root "/tmp/workers-artifacts"]
@@ -366,13 +366,13 @@
         (let [topo-cp (str file-path-separator "any" file-path-separator "path")
               exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
               _ (log-message "zliu testing 3, exp-args is: " exp-args ", str is " (pr-str exp-args))
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
-          (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
-                (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
-                     supervisor/jlp nil
+              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
+              mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
+          (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                (stubbing [supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      set-worker-user! nil
-                           supervisor-stormdist-root nil
                      supervisor/write-log-metadata! nil
                      launch-process nil
                      current-classpath (str file-path-separator "base")
@@ -390,14 +390,14 @@
               full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
               exp-args (exp-args-fn [] [] mock-cp)
               _ (log-message "zliu testing  4, exp-args is: " exp-args)
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
-          (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
-            (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
-                     supervisor/jlp nil
+              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
+              mocked-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}]
+          (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+            (stubbing [supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      launch-process nil
                      set-worker-user! nil
-                       supervisor-stormdist-root nil
                      supervisor/write-log-metadata! nil
                      current-classpath (str file-path-separator "base")
                      supervisor/create-blobstore-links nil]
@@ -471,15 +471,15 @@
                                         STORM-LOCAL-DIR storm-local
                                         STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
                                         SUPERVISOR-RUN-WORKER-AS-USER true
-                                        WORKER-CHILDOPTS string-opts}}]
-            (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
-              (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-string-opts
-                                                   TOPOLOGY-SUBMITTER-USER "me"}
-                       add-to-classpath mock-cp
+                                        WORKER-CHILDOPTS string-opts}}
+                mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                              topo-string-opts
+                                              TOPOLOGY-SUBMITTER-USER "me"}]
+            (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+              (stubbing [add-to-classpath mock-cp
                        launch-process nil
                        set-worker-user! nil
-                         supervisor-stormdist-root nil
                        supervisor/java-cmd "java"
                        supervisor/jlp nil
                        supervisor/write-log-metadata! nil]
@@ -503,14 +503,14 @@
                                         STORM-LOCAL-DIR storm-local
                                         STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
                                         SUPERVISOR-RUN-WORKER-AS-USER true
-                                        WORKER-CHILDOPTS list-opts}}]
-            (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
-              (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-list-opts
-                                                   TOPOLOGY-SUBMITTER-USER "me"}
-                       add-to-classpath mock-cp
+                                        WORKER-CHILDOPTS list-opts}}
+                                        mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                                                      topo-list-opts
+                                                                      TOPOLOGY-SUBMITTER-USER "me"}]
+            (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+              (stubbing [add-to-classpath mock-cp
                        launch-process nil
-                         supervisor-stormdist-root nil
                        set-worker-user! nil
                        supervisor/java-cmd "java"
                        supervisor/jlp nil