You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:39 UTC

[03/23] storm git commit: Updates workerRoot, worker user, path, state, set. To do a few left.

Updates workerRoot, worker user, path, state, set. To do a few left.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0dbdcf39
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0dbdcf39
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0dbdcf39

Branch: refs/heads/master
Commit: 0dbdcf39bf411a4ee5abac5e740c07e8813ea025
Parents: ba70bf1
Author: zhuol <zh...@yahoo-inc.com>
Authored: Sun Jan 17 16:37:35 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  2 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  2 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  | 55 ++++++++++++--------
 .../src/clj/org/apache/storm/daemon/task.clj    |  2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  6 +--
 storm-core/src/clj/org/apache/storm/testing.clj |  2 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 45 +++++++++++++---
 .../clj/org/apache/storm/supervisor_test.clj    | 24 ++++-----
 8 files changed, 89 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index d1cb2d9..6c184fd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -377,7 +377,7 @@
                           (:storm-id worker)
                           (ConfigUtils/supervisorStormResourcesPath
                             (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
-                          (worker-pids-root (:conf worker) (:worker-id worker))
+                          (ConfigUtils/workerPidsRoot (:conf worker) (:worker-id worker))
                           (:port worker)
                           (:task-ids worker)
                           (:default-shared-resources worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 9d978ae..ba38701 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1040,7 +1040,7 @@
 
 (defn code-ids [blob-store]
   (let [to-id (reify KeyFilter
-                (filter [this key] (get-id-from-blob-key key)))]
+                (filter [this key] (ConfigUtils/getIdFromBlobKey key)))]
     (set (.filterAndListKeys blob-store to-id))))
 
 (defn cleanup-storm-ids [conf storm-cluster-state blob-store]

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 4296a86..fb371c5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -126,7 +126,7 @@
   )
 
 (defn read-worker-heartbeat [conf id]
-  (let [local-state (worker-state conf id)]
+  (let [local-state (ConfigUtils/workerState conf id)]
     (try
       (ls-worker-heartbeat local-state)
       (catch Exception e
@@ -135,12 +135,13 @@
 
 
 (defn my-worker-ids [conf]
-  (read-dir-contents (worker-root conf)))
+  (filter #(not= "null" %) (read-dir-contents (ConfigUtils/workerRoot conf))))
 
 (defn read-worker-heartbeats
   "Returns map from worker id to heartbeat"
   [conf]
-  (let [ids (my-worker-ids conf)]
+  (let [ids (my-worker-ids conf)
+        _ (log-message "zliu my-worker-ids are" (prn-str ids))]
     (into {}
       (dofor [id ids]
         [id (read-worker-heartbeat conf id)]))
@@ -172,7 +173,9 @@
   (let [conf (:conf supervisor)
         ^LocalState local-state (:local-state supervisor)
         id->heartbeat (read-worker-heartbeats conf)
-        approved-ids (set (keys (ls-approved-workers local-state)))]
+        _ (log-message "zliu id->heartbeat is:" (prn-str id->heartbeat))
+        approved-ids (set (keys (ls-approved-workers local-state)))
+        _ (log-message "zliu approved-ids is:" (prn-str approved-ids))]
     (into
      {}
      (dofor [[id hb] id->heartbeat]
@@ -196,7 +199,7 @@
      )))
 
 (defn- wait-for-worker-launch [conf id start-time]
-  (let [state (worker-state conf id)]
+  (let [state (ConfigUtils/workerState conf id)]
     (loop []
       (let [hb (ls-worker-heartbeat state)]
         (when (and
@@ -258,16 +261,16 @@
 
 (defn try-cleanup-worker [conf id]
   (try
-    (if (.exists (File. (worker-root conf id)))
+    (if (.exists (File. (ConfigUtils/workerRoot conf id)))
       (do
         (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
-          (rmr-as-user conf id (worker-root conf id))
+          (rmr-as-user conf id (ConfigUtils/workerRoot conf id))
           (do
-            (rmr (worker-heartbeats-root conf id))
+            (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
             ;; this avoids a race condition with worker or subprocess writing pid around same time
-            (rmr (worker-pids-root conf id))
-            (rmr (worker-root conf id))))
-        (remove-worker-user! conf id)
+            (rmr (ConfigUtils/workerPidsRoot conf id))
+            (rmr (ConfigUtils/workerRoot conf id))))
+        (ConfigUtils/removeWorkerUserWSE conf id)
         (remove-dead-worker id)
       ))
   (catch IOException e
@@ -281,11 +284,12 @@
 (defn shutdown-worker [supervisor id]
   (log-message "Shutting down " (:supervisor-id supervisor) ":" id)
   (let [conf (:conf supervisor)
-        pids (read-dir-contents (worker-pids-root conf id))
+        pids (read-dir-contents (ConfigUtils/workerPidsRoot conf id))
+        _ (log-message "zliu pids are:" (pr-str pids) ", worker-pids-root is:" (ConfigUtils/workerPidsRoot conf id))
         thread-pid (@(:worker-thread-pids-atom supervisor) id)
         shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS)
         as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
-        user (get-worker-user conf id)]
+        user (ConfigUtils/getWorkerUser conf id)]
     (when thread-pid
       (psim/kill-process thread-pid))
     (doseq [pid pids]
@@ -300,9 +304,9 @@
         (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
         (force-kill-process pid))
       (if as-user
-        (rmr-as-user conf id (worker-pid-path conf id pid))
+        (rmr-as-user conf id (ConfigUtils/workerPidPath conf id pid))
         (try
-          (rmpath (worker-pid-path conf id pid))
+          (rmpath (ConfigUtils/workerPidPath conf id pid))
           (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
     (try-cleanup-worker conf id))
   (log-message "Shut down " (:supervisor-id supervisor) ":" id))
@@ -378,8 +382,9 @@
             (do
               (log-message "Launching worker with assignment "
                 (get-worker-assignment-helper-msg assignment supervisor port id))
-              (local-mkdirs (worker-pids-root conf id))
-              (local-mkdirs (worker-heartbeats-root conf id))
+              (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
+              (log-message "zliu create worker heartbeatroot as " (ConfigUtils/workerHeartbeatsRoot conf id))
+              (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
               (launch-worker supervisor
                 (:storm-id assignment)
                 port
@@ -396,6 +401,9 @@
         ^LocalState local-state (:local-state supervisor)
         storm-cluster-state (:storm-cluster-state supervisor)
         assigned-executors (defaulted (ls-local-assignments local-state) {})
+        _ (log-message "zliu storm-cluster-state is " (prn-str storm-cluster-state))
+        _ (log-message "zliu local-state is " (prn-str local-state))
+        _ (log-message "zliu assigned-executors is " (prn-str assigned-executors))
         now (current-time-secs)
         allocated (read-allocated-workers supervisor assigned-executors now)
         keepers (filter-val
@@ -420,6 +428,9 @@
     (log-debug "Syncing processes")
     (log-debug "Assigned executors: " assigned-executors)
     (log-debug "Allocated: " allocated)
+    (log-message "zliu Syncing processes")
+    (log-message "zliu allocated is " (prn-str allocated))
+    (log-message "zliu new-worker-ids is " (prn-str new-worker-ids))
     (doseq [[id [state heartbeat]] allocated]
       (when (not= :valid state)
         (log-message
@@ -1017,7 +1028,7 @@
   [conf storm-id worker-id]
   (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
         storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
-        workerroot (worker-root conf worker-id)
+        workerroot (ConfigUtils/workerRoot conf worker-id)
         blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
         blob-file-names (get-blob-file-names blobstore-map)
         resource-file-names (cons RESOURCES-SUBDIR blob-file-names)]
@@ -1030,7 +1041,7 @@
 (defn create-artifacts-link
   "Create a symlink from workder directory to its port artifacts directory"
   [conf storm-id port worker-id]
-  (let [worker-dir (worker-root conf worker-id)
+  (let [worker-dir (ConfigUtils/workerRoot conf worker-id)
         topo-dir (worker-artifacts-root conf storm-id)]
     (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
                  storm-id " to its port artifacts directory")
@@ -1127,13 +1138,13 @@
           command (->> command (map str) (filter (complement empty?)))]
       (log-message "Launching worker with command: " (shell-cmd command))
       (write-log-metadata! storm-conf user worker-id storm-id port conf)
-      (set-worker-user! conf worker-id user)
+      (ConfigUtils/setWorkerUserWSE conf worker-id user)
       (create-artifacts-link conf storm-id port worker-id)
       (let [log-prefix (str "Worker Process " worker-id)
             callback (fn [exit-code]
                        (log-message log-prefix " exited with code: " exit-code)
                        (add-dead-worker worker-id))
-            worker-dir (worker-root conf worker-id)]
+            worker-dir (ConfigUtils/workerRoot conf worker-id)]
         (remove-dead-worker worker-id)
         (create-blobstore-links conf storm-id worker-id)
         (if run-worker-as-user
@@ -1186,7 +1197,7 @@
                                    (:assignment-id supervisor)
                                    port
                                    worker-id)]
-      (set-worker-user! conf worker-id "")
+      (ConfigUtils/setWorkerUserWSE conf worker-id "")
       (psim/register-process pid worker)
       (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
       ))

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 33b8af9..da5a537 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -43,7 +43,7 @@
       (:storm-id worker)
       (ConfigUtils/supervisorStormResourcesPath
         (ConfigUtils/supervisorStormDistRoot conf (:storm-id worker)))
-      (worker-pids-root conf (:worker-id worker))
+      (ConfigUtils/workerPidsRoot conf (:worker-id worker))
       (int %)
       (:port worker)
       (:task-ids worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 92093ae..590ea4c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -77,7 +77,7 @@
 
 (defn do-heartbeat [worker]
   (let [conf (:conf worker)
-        state (worker-state conf (:worker-id worker))]
+        state (ConfigUtils/workerState conf (:worker-id worker))]
     ;; do the local-file-system heartbeat.
     (ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker))
     (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
@@ -252,7 +252,7 @@
                                (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
                                (into {}))
 
-        topology (read-supervisor-topology conf storm-id)
+        topology (ConfigUtils/readSupervisorTopology conf storm-id)
         mq-context  (if mq-context
                       mq-context
                       (TransportFactory/makeContext storm-conf))]
@@ -581,7 +581,7 @@
   ;; process. supervisor will register it in this case
   (when (= :distributed (ConfigUtils/clusterMode conf))
     (let [pid (process-pid)]
-      (touch (worker-pid-path conf worker-id pid))
+      (touch (ConfigUtils/workerPidPath conf worker-id pid))
       (spit (worker-artifacts-pid-path conf storm-id port) pid)))
 
   (declare establish-log-setting-callback)

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 08752c5..7d76eeb 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -345,7 +345,7 @@
     (let [supervisor-id (:supervisor-id supervisor)
           conf (:conf supervisor)
           existing (get @capture-atom [supervisor-id port] [])]
-      (set-worker-user! conf worker-id "")
+      (ConfigUtils/setWorkerUserWSE conf worker-id "")
       (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
 
 (defn find-worker-id

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index a489e74..a739482 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -21,15 +21,11 @@ package org.apache.storm.utils;
 import org.apache.storm.Config;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.Utils;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -469,13 +465,16 @@ public class ConfigUtils {
         return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "/workers-users");
     }
 
+    /* Never get used TODO : may delete it*/
     public static String workerUserFile(Map conf, String workerId) {
         return (workerUserRoot(conf) + FILE_SEPARATOR + workerId);
     }
 
-    public static String getWorkerUser(Map conf, String workerId) throws IOException {
+    public static String getWorkerUser(Map conf, String workerId) {
         LOG.info("GET worker-user for {}", workerId);
         File file = new File(workerUserFile(conf, workerId));
+        //LOG.info("zliu to read access worker user file: " + (file.getCanonicalPath()));
+        LOG.info("zliu this file is existed?" + file.exists());
 
         try (InputStream in = new FileInputStream(file);
              Reader reader = new InputStreamReader(in);
@@ -488,6 +487,9 @@ public class ConfigUtils {
             }
             String ret = sb.toString().trim();
             return ret;
+        } catch (IOException e) {
+            LOG.error("Failed to get worker user for " + workerId + ".");
+            return null;
         }
     }
 
@@ -504,9 +506,33 @@ public class ConfigUtils {
         return ret;
     }
 
+    //For testing only
+    // for java
+    // try (SetMockedWorkerUserWSE mocked = new SetMockedWorkerUserWSE(conf)) {
+    //    run test ...
+    // }
+    //
+    // for clojure
+    // (with-open [mock (SetMockedWorkerUserWSE. conf)]
+    //     run test ...)
+    public static class SetMockedWorkerUserWSE implements Closeable {
+        public SetMockedWorkerUserWSE(Map conf) {
+            mockedWorkerUserWSE = conf;
+        }
+
+        @Override
+        public void close() {
+            mockedWorkerUserWSE = null;
+        }
+    }
+    private static Map mockedWorkerUserWSE = null;
     public static void setWorkerUserWSE(Map conf, String workerId, String user) throws IOException {
+        if (mockedWorkerUserWSE != null) {
+            return;
+        }
         LOG.info("SET worker-user {} {}", workerId, user);
         File file = new File(workerUserFile(conf, workerId));
+        LOG.info("zliu  SET worker-user to create worker file:" + file.getCanonicalPath());
         file.getParentFile().mkdirs();
 
         try (FileWriter fw = new FileWriter(file);
@@ -562,6 +588,9 @@ public class ConfigUtils {
     }
 
     public static String workerRoot(Map conf) {
+        LOG.info("zliu workers root's current listFiles are:");
+        File r = new File((absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers")); //TODO delete me
+        if (r.exists()) for (File f : r.listFiles()) {LOG.info(f.getName());} //TODO delete me
         return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers");
     }
 
@@ -573,16 +602,16 @@ public class ConfigUtils {
         return (workerRoot(conf, id) + FILE_SEPARATOR + "pids");
     }
 
-    public static String workerPidsRoot(Map conf, String id, String pid) {
+    public static String workerPidPath(Map conf, String id, String pid) {
         return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid);
     }
 
-    public static String workerHeartbeatRoot(Map conf, String id) {
+    public static String workerHeartbeatsRoot(Map conf, String id) {
         return (workerRoot(conf, id) + FILE_SEPARATOR + "heartbeats");
     }
 
     public static LocalState workerState(Map conf, String id) throws IOException {
-        return new LocalState(workerHeartbeatRoot(conf, id));
+        return new LocalState(workerHeartbeatsRoot(conf, id));
     }
 
     public static void overrideLoginConfigWithSystemProperty(Map conf) { // note that we delete the return value

http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 8a07006..5c9a279 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -320,10 +320,10 @@
               mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
                                             topo-string-opts}]
           (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
-                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+                      mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
               (stubbing [add-to-classpath mock-cp
                      launch-process nil
-                     set-worker-user! nil
                      supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      supervisor/write-log-metadata! nil
@@ -346,10 +346,10 @@
               mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
                                             topo-list-opts}]
             (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
-                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+                        mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
                 (stubbing [add-to-classpath mock-cp
                      launch-process nil
-                     set-worker-user! nil
                      supervisor/jlp nil
                      supervisor/write-log-metadata! nil
                      supervisor/create-blobstore-links nil
@@ -369,10 +369,10 @@
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
               mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
           (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
-                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+                      mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
                 (stubbing [supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
-                     set-worker-user! nil
                      supervisor/write-log-metadata! nil
                      launch-process nil
                      current-classpath (str file-path-separator "base")
@@ -393,11 +393,11 @@
               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
               mocked-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}]
           (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
-                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                      mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+                      mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
             (stubbing [supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
                      launch-process nil
-                     set-worker-user! nil
                      supervisor/write-log-metadata! nil
                      current-classpath (str file-path-separator "base")
                      supervisor/create-blobstore-links nil]
@@ -476,10 +476,10 @@
                                               topo-string-opts
                                               TOPOLOGY-SUBMITTER-USER "me"}]
             (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
-                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+                        mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
               (stubbing [add-to-classpath mock-cp
                        launch-process nil
-                       set-worker-user! nil
                        supervisor/java-cmd "java"
                        supervisor/jlp nil
                        supervisor/write-log-metadata! nil]
@@ -508,10 +508,10 @@
                                                                       topo-list-opts
                                                                       TOPOLOGY-SUBMITTER-USER "me"}]
             (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
-                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+                        mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+                        mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
               (stubbing [add-to-classpath mock-cp
                        launch-process nil
-                       set-worker-user! nil
                        supervisor/java-cmd "java"
                        supervisor/jlp nil
                        supervisor/write-log-metadata! nil]