You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:48 UTC

[12/23] storm git commit: Updated to callings for supervisor-stormjar-path

Updated to callings for supervisor-stormjar-path


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/635488db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/635488db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/635488db

Branch: refs/heads/master
Commit: 635488db5e0670f1e4348fab5e3fd7d3970c5e63
Parents: 3d2796d
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Jan 15 12:50:00 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/config.clj  |  7 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  2 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  | 36 ++++++---
 .../src/clj/org/apache/storm/daemon/task.clj    |  4 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 83 +++++++++++++++++---
 .../clj/org/apache/storm/supervisor_test.clj    | 58 ++++++++------
 6 files changed, 135 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index d65c439..28b06f5 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -89,7 +89,9 @@
 
 (defn absolute-storm-local-dir [conf]
   (let [storm-home (System/getProperty "storm.home")
-        path (conf STORM-LOCAL-DIR)]
+        _ (log-message "zliu clojure storm.home is " storm-home)
+        path (conf STORM-LOCAL-DIR)
+        _ (log-message "zliu clojure path is " path)]
     (if path
       (if (is-absolute-path? path) path (str storm-home file-path-separator path))
       (str storm-home file-path-separator "storm-local"))))
@@ -168,7 +170,8 @@
 
 (defn supervisor-local-dir
   [conf]
-  (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")]
+  (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")
+        _ (log-message "zliu supervisor-local-dir ret is " ret)]
     (FileUtils/forceMkdir (File. ret))
     ret))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 77791f4..a951a75 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -376,7 +376,7 @@
                           (:component->stream->fields worker)
                           (:storm-id worker)
                           (supervisor-storm-resources-path
-                            (supervisor-stormdist-root (:conf worker) (:storm-id worker)))
+                            (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
                           (worker-pids-root (:conf worker) (:worker-id worker))
                           (:port worker)
                           (:task-ids worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 838a98f..58d046a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -118,7 +118,11 @@
   (map-val :master-code-dir assignments-snapshot))
 
 (defn- read-downloaded-storm-ids [conf]
-  (map #(url-decode %) (read-dir-contents (supervisor-stormdist-root conf)))
+  (let [dir (ConfigUtils/supervisorStormDistRoot conf)
+        _ (log-message "zliu java supervisorLocalDir " (ConfigUtils/supervisorLocalDir conf))
+        cdir (supervisor-stormdist-root conf)
+        _ (log-message "zliu jdir is:" dir ",cdir is"  cdir ",clj dir equals to nil? " (nil? cdir) "jdir = cdir?" (= dir cdir))]
+  (map #(url-decode %) (read-dir-contents dir))) ; (supervisor-stormdist-root conf)));
   )
 
 (defn read-worker-heartbeat [conf id]
@@ -347,8 +351,8 @@
 
 (defn required-topo-files-exist?
   [conf storm-id]
-  (let [stormroot (supervisor-stormdist-root conf storm-id)
-        stormjarpath (supervisor-stormjar-path stormroot)
+  (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
+        stormjarpath (ConfigUtils/supervisorStormJarPath stormroot)
         stormcodepath (supervisor-stormcode-path stormroot)
         stormconfpath (supervisor-stormconf-path stormroot)]
     (and (every? exists-file? [stormroot stormconfpath stormcodepath])
@@ -501,13 +505,13 @@
 
 (defn rm-topo-files
   [conf storm-id localizer rm-blob-refs?]
-  (let [path (supervisor-stormdist-root conf storm-id)]
+  (let [path (ConfigUtils/supervisorStormDistRoot conf storm-id)]
     (try
       (if rm-blob-refs?
         (remove-blob-references localizer storm-id conf))
       (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
         (rmr-as-user conf storm-id path)
-        (rmr (supervisor-stormdist-root conf storm-id)))
+        (rmr (ConfigUtils/supervisorStormDistRoot conf storm-id)))
       (catch Exception e
         (log-message e (str "Exception removing: " storm-id))))))
 
@@ -632,7 +636,7 @@
             new-assignment @(:curr-assignment supervisor)
             assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
         (doseq [topology-id downloaded-storm-ids]
-          (let [storm-root (supervisor-stormdist-root conf topology-id)]
+          (let [storm-root (ConfigUtils/supervisorStormDistRoot conf topology-id)]
             (when (assigned-storm-ids topology-id)
               (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root)
               (update-blobs-for-topology! conf topology-id (:localizer supervisor))))))
@@ -923,7 +927,7 @@
   :distributed [conf storm-id master-code-dir localizer]
   ;; Downloading to permanent location is atomic
   (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
-        stormroot (supervisor-stormdist-root conf storm-id)
+        stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
         blobstore (Utils/getClientBlobStoreForSupervisor conf)]
     (FileUtils/forceMkdir (File. tmproot))
     (if-not on-windows?
@@ -931,13 +935,13 @@
       (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
         (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
     (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id)
-      (supervisor-stormjar-path tmproot) blobstore)
+      (ConfigUtils/supervisorStormJarPath tmproot) blobstore)
     (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id)
       (supervisor-stormcode-path tmproot) blobstore)
     (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id)
       (supervisor-stormconf-path tmproot) blobstore)
     (.shutdown blobstore)
-    (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+    (extract-dir-from-jar (ConfigUtils/supervisorStormJarPath tmproot) RESOURCES-SUBDIR tmproot)
     (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
       tmproot)
     (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
@@ -1008,7 +1012,7 @@
 (defn create-blobstore-links
   "Create symlinks in worker launch directory for all blobs"
   [conf storm-id worker-id]
-  (let [stormroot (supervisor-stormdist-root conf storm-id)
+  (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
         storm-conf (read-supervisor-storm-conf conf storm-id)
         workerroot (worker-root conf worker-id)
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
@@ -1044,16 +1048,22 @@
                                     storm-log-conf-dir
                                     (str storm-home file-path-separator storm-log-conf-dir))
                                   (str storm-home file-path-separator "log4j2"))
-          stormroot (supervisor-stormdist-root conf storm-id)
+          stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
+          _ (log-message "zliu stormroot: " stormroot)
           jlp (jlp stormroot conf)
-          stormjar (supervisor-stormjar-path stormroot)
+          stormjar (ConfigUtils/supervisorStormJarPath stormroot)
+          _ (log-message "zliu stormjar: " stormjar)
           storm-conf (read-supervisor-storm-conf conf storm-id)
           topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
                            [cp]
                            [])
+          _ (log-message "zliu worker-classpath: " (worker-classpath))
+          _ (log-message "zliu stormjar: " stormjar)
+          _ (log-message "zliu topo-classpath: " topo-classpath)
           classpath (-> (worker-classpath)
                         (add-to-classpath [stormjar])
                         (add-to-classpath topo-classpath))
+          _ (log-message "zliu classpath" classpath)
           top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
           mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero
                        (int (Math/ceil mem-onheap)) ;; round up
@@ -1139,7 +1149,7 @@
 (defmethod download-storm-code
   :local [conf storm-id master-code-dir localizer]
   (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
-        stormroot (supervisor-stormdist-root conf storm-id)
+        stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
         blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
     (try
       (FileUtils/forceMkdir (File. tmproot))

http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 1ae9b22..a8114be 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -23,7 +23,7 @@
   (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo])
   (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
-  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.utils Utils ConfigUtils])
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
   (:import [java.util Collection List ArrayList])
@@ -42,7 +42,7 @@
       (:component->stream->fields worker)
       (:storm-id worker)
       (supervisor-storm-resources-path
-        (supervisor-stormdist-root conf (:storm-id worker)))
+        (ConfigUtils/supervisorStormDistRoot conf (:storm-id worker)))
       (worker-pids-root conf (:worker-id worker))
       (int %)
       (:port worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 3dd6d4e..5ada400 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -129,10 +129,8 @@ public class ConfigUtils {
     // }
     //
     // for clojure
-    // (let [something (SetMockedStormConfig. conf)]
-    //   (try
-    //     run test ...
-    //     (finally (.close something))))
+    // (with-open [mock (SetMockedStormConfig. conf)]
+    //     run test ...)
     public static class SetMockedStormConfig implements Closeable {
         public SetMockedStormConfig(Map conf) {
             mockedStormConfig = conf;
@@ -162,11 +160,14 @@ public class ConfigUtils {
     }
 
     public static String absoluteStormLocalDir(Map conf) {
+        LOG.info("zliu conf map is " + conf);
         String stormHome = System.getProperty("storm.home");
-        String localDir = String.valueOf(conf.get(Config.STORM_LOCAL_DIR));
-        if (localDir.equals("null")) {
+        LOG.info("zliu stormhome is " + stormHome);
+        String localDir = (String) conf.get(Config.STORM_LOCAL_DIR);
+        if (localDir == null) {
             return (stormHome + FILE_SEPARATOR + "storm-local");
         } else {
+            LOG.info("zliu java local dir is " + localDir + ", isAbsolute:" + (new File(localDir).isAbsolute()));
             if (new File(localDir).isAbsolute()) {
                 return localDir;
             } else {
@@ -225,7 +226,12 @@ public class ConfigUtils {
     }
 
     public static String stormDistPath(String stormRoot) {
-        return stormRoot + FILE_SEPARATOR + "stormdist";
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return ret + FILE_SEPARATOR + "stormdist";
     }
 
     public static String stormTmpPath(String stormRoot) {
@@ -280,7 +286,7 @@ public class ConfigUtils {
     }
 
     public static String supervisorLocalDir(Map conf) throws IOException {
-        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPARATOR + "supervisor";
+        String ret = absoluteStormLocalDir(conf) + FILE_SEPARATOR + "supervisor";
         FileUtils.forceMkdir(new File(ret));
         return ret;
     }
@@ -289,28 +295,74 @@ public class ConfigUtils {
         return ((supervisorLocalDir(conf) + FILE_SEPARATOR + "isupervisor"));
     }
 
+    //For testing only
+    // for java
+    // try (SetMockedSupervisorStormDistRoot mocked = new SetMockedSupervisorStormDistRoot(conf)) {
+    //    run test ...
+    // }
+    //
+    // for clojure
+    // (with-open [mock (SetMockedSupervisorStormDistRoot. conf)]
+    //     run test ...)
+    public static class SetMockedSupervisorStormDistRoot implements Closeable {
+        public SetMockedSupervisorStormDistRoot(Map conf) {
+            mockedSupervisorStormDistRoot = conf;
+        }
+        @Override
+        public void close() {
+            mockedSupervisorStormDistRoot = null;
+        }
+    }
+    private static Map mockedSupervisorStormDistRoot = null;
     public static String supervisorStormDistRoot(Map conf) throws IOException {
+        LOG.info("zliu supervisorStormDistRoot resl is: " + stormDistPath(supervisorLocalDir(conf)) + "mocked set is " + mockedSupervisorStormDistRoot);
+        if (mockedSupervisorStormDistRoot != null) {
+            return null;
+        }
         return stormDistPath(supervisorLocalDir(conf)); // TODO: no need to forceMake here?, clj does not.
     }
 
     public static String supervisorStormDistRoot(Map conf, String stormId) throws IOException {
+        if (mockedSupervisorStormDistRoot != null) {
+            return null;
+        }
         return supervisorStormDistRoot(conf) + FILE_SEPARATOR + stormId; // TODO: need to (url-encode storm-id)? Not.
     }
 
     public static String supervisorStormJarPath(String stormRoot) {
-        return (stormRoot + FILE_SEPARATOR + "stormjar.jar");
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return (ret + FILE_SEPARATOR + "stormjar.jar");
     }
 
     public static String supervisorStormMetaFilePath(String stormRoot) {
-        return (stormRoot + FILE_SEPARATOR + "storm-code-distributor.meta");
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return (ret + FILE_SEPARATOR + "storm-code-distributor.meta");
     }
 
     public static String supervisorStormCodePath(String stormRoot) {
-        return (stormRoot + FILE_SEPARATOR + "stormcode.ser");
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return (ret + FILE_SEPARATOR + "stormcode.ser");
     }
 
     public static String supervisorStormConfPath(String stormRoot) {
-        return (stormRoot + FILE_SEPARATOR + "stormconf.ser");
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return (ret + FILE_SEPARATOR + "stormconf.ser");
     }
 
     public static String supervisorTmpDir(Map conf) throws IOException {
@@ -320,7 +372,12 @@ public class ConfigUtils {
     }
 
     public static String supervisorStormResourcesPath(String stormRoot) {
-        return (stormRoot + FILE_SEPARATOR + RESOURCES_SUBDIR);
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return (ret + FILE_SEPARATOR + RESOURCES_SUBDIR);
     }
 
     public static LocalState supervisorState(Map conf) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index d9daf32..5be2059 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -26,7 +26,7 @@
   (:import [java.io File])
   (:import [java.nio.file Files])
   (:import [java.nio.file.attribute FileAttribute])
-  (:use [org.apache.storm config testing util timer])
+  (:use [org.apache.storm config testing util timer log])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
             [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
@@ -314,59 +314,65 @@
               exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
                                     ["-Dkau=aux" "-Xmx2048m"]
                                     mock-cp)
+              _ (log-message "zliu testing 1, exp-args is: " exp-args)
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
                                       WORKER-CHILDOPTS string-opts}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+          (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+              (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
                                                    topo-string-opts}
                      add-to-classpath mock-cp
-                     supervisor-stormdist-root nil
                      launch-process nil
+                         supervisor-stormdist-root nil
                      set-worker-user! nil
                      supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      supervisor/write-log-metadata! nil
                      supervisor/create-blobstore-links nil]
-            (supervisor/launch-worker mock-supervisor
+                (supervisor/launch-worker mock-supervisor
                                       mock-storm-id
                                       mock-port
                                       mock-worker-id
                                       mock-mem-onheap)
-            (verify-first-call-args-for-indices launch-process
+                (verify-first-call-args-for-indices launch-process
                                                 [0]
-                                                exp-args))))
+                                                exp-args)))))
       (testing "testing *.worker.childopts as list of strings, with spaces in values"
         (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
               topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
               exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
+              _ (log-message "zliu testing 2, exp-args is: " exp-args)
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
                                       WORKER-CHILDOPTS list-opts}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+            (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+                (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
                                                    topo-list-opts}
                      add-to-classpath mock-cp
-                     supervisor-stormdist-root nil
                      launch-process nil
                      set-worker-user! nil
                      supervisor/jlp nil
+                           supervisor-stormdist-root nil
                      supervisor/write-log-metadata! nil
                      supervisor/create-blobstore-links nil
                      worker-artifacts-root "/tmp/workers-artifacts"]
-            (supervisor/launch-worker mock-supervisor
+                (supervisor/launch-worker mock-supervisor
                                       mock-storm-id
                                       mock-port
                                       mock-worker-id
                                       mock-mem-onheap)
-            (verify-first-call-args-for-indices launch-process
+                (verify-first-call-args-for-indices launch-process
                                                 [0]
-                                                exp-args))))
+                                                exp-args)))))
       (testing "testing topology.classpath is added to classpath"
         (let [topo-cp (str file-path-separator "any" file-path-separator "path")
               exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+              _ (log-message "zliu testing 3, exp-args is: " exp-args ", str is " (pr-str exp-args))
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
-                     supervisor-stormdist-root nil
+          (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+                (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
                      supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      set-worker-user! nil
+                           supervisor-stormdist-root nil
                      supervisor/write-log-metadata! nil
                      launch-process nil
                      current-classpath (str file-path-separator "base")
@@ -378,18 +384,20 @@
                                               mock-mem-onheap)
                     (verify-first-call-args-for-indices launch-process
                                                         [0]
-                                                        exp-args))))
+                                                        exp-args)))))
       (testing "testing topology.environment is added to environment for worker launch"
         (let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
               full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
               exp-args (exp-args-fn [] [] mock-cp)
+              _ (log-message "zliu testing  4, exp-args is: " exp-args)
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
-                     supervisor-stormdist-root nil
+          (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+            (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
                      supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      launch-process nil
                      set-worker-user! nil
+                       supervisor-stormdist-root nil
                      supervisor/write-log-metadata! nil
                      current-classpath (str file-path-separator "base")
                      supervisor/create-blobstore-links nil]
@@ -400,7 +408,7 @@
                                               mock-mem-onheap)
                     (verify-first-call-args-for-indices launch-process
                                                         [2]
-                                                        full-env)))))))
+                                                        full-env))))))))
 
 (deftest test-worker-launch-command-run-as-user
   (testing "*.worker.childopts configuration"
@@ -464,13 +472,14 @@
                                         STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
                                         SUPERVISOR-RUN-WORKER-AS-USER true
                                         WORKER-CHILDOPTS string-opts}}]
-            (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+            (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+              (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
                                                    topo-string-opts
                                                    TOPOLOGY-SUBMITTER-USER "me"}
                        add-to-classpath mock-cp
-                       supervisor-stormdist-root nil
                        launch-process nil
                        set-worker-user! nil
+                         supervisor-stormdist-root nil
                        supervisor/java-cmd "java"
                        supervisor/jlp nil
                        supervisor/write-log-metadata! nil]
@@ -481,7 +490,7 @@
                                                 mock-mem-onheap)
                       (verify-first-call-args-for-indices launch-process
                                                           [0]
-                                                          exp-launch))
+                                                          exp-launch)))
             (is (= (slurp worker-script) exp-script))))
         (finally (rmr storm-local)))
       (.mkdirs (io/file storm-local "workers" mock-worker-id))
@@ -495,12 +504,13 @@
                                         STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
                                         SUPERVISOR-RUN-WORKER-AS-USER true
                                         WORKER-CHILDOPTS list-opts}}]
-            (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+            (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+              (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
                                                    topo-list-opts
                                                    TOPOLOGY-SUBMITTER-USER "me"}
                        add-to-classpath mock-cp
-                       supervisor-stormdist-root nil
                        launch-process nil
+                         supervisor-stormdist-root nil
                        set-worker-user! nil
                        supervisor/java-cmd "java"
                        supervisor/jlp nil
@@ -512,7 +522,7 @@
                                                 mock-mem-onheap)
                       (verify-first-call-args-for-indices launch-process
                                                           [0]
-                                                          exp-launch))
+                                                          exp-launch)))
             (is (= (slurp worker-script) exp-script))))
         (finally (rmr storm-local))))))
 
@@ -731,4 +741,4 @@
      (validate-launched-once (:launched changed)
                              {"sup1" [3 4]}
                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-     )))
+     )))
\ No newline at end of file