You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:39 UTC
[03/23] storm git commit: Updates workerRoot, worker user, path, state,
set. To do a few left.
Updates workerRoot, worker user, path, state, set. To do a few left.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0dbdcf39
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0dbdcf39
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0dbdcf39
Branch: refs/heads/master
Commit: 0dbdcf39bf411a4ee5abac5e740c07e8813ea025
Parents: ba70bf1
Author: zhuol <zh...@yahoo-inc.com>
Authored: Sun Jan 17 16:37:35 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 2 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 2 +-
.../clj/org/apache/storm/daemon/supervisor.clj | 55 ++++++++++++--------
.../src/clj/org/apache/storm/daemon/task.clj | 2 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 6 +--
storm-core/src/clj/org/apache/storm/testing.clj | 2 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 45 +++++++++++++---
.../clj/org/apache/storm/supervisor_test.clj | 24 ++++-----
8 files changed, 89 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index d1cb2d9..6c184fd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -377,7 +377,7 @@
(:storm-id worker)
(ConfigUtils/supervisorStormResourcesPath
(ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
- (worker-pids-root (:conf worker) (:worker-id worker))
+ (ConfigUtils/workerPidsRoot (:conf worker) (:worker-id worker))
(:port worker)
(:task-ids worker)
(:default-shared-resources worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 9d978ae..ba38701 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1040,7 +1040,7 @@
(defn code-ids [blob-store]
(let [to-id (reify KeyFilter
- (filter [this key] (get-id-from-blob-key key)))]
+ (filter [this key] (ConfigUtils/getIdFromBlobKey key)))]
(set (.filterAndListKeys blob-store to-id))))
(defn cleanup-storm-ids [conf storm-cluster-state blob-store]
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 4296a86..fb371c5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -126,7 +126,7 @@
)
(defn read-worker-heartbeat [conf id]
- (let [local-state (worker-state conf id)]
+ (let [local-state (ConfigUtils/workerState conf id)]
(try
(ls-worker-heartbeat local-state)
(catch Exception e
@@ -135,12 +135,13 @@
(defn my-worker-ids [conf]
- (read-dir-contents (worker-root conf)))
+ (filter #(not= "null" %) (read-dir-contents (ConfigUtils/workerRoot conf))))
(defn read-worker-heartbeats
"Returns map from worker id to heartbeat"
[conf]
- (let [ids (my-worker-ids conf)]
+ (let [ids (my-worker-ids conf)
+ _ (log-message "zliu my-worker-ids are" (prn-str ids))]
(into {}
(dofor [id ids]
[id (read-worker-heartbeat conf id)]))
@@ -172,7 +173,9 @@
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
- approved-ids (set (keys (ls-approved-workers local-state)))]
+ _ (log-message "zliu id->heartbeat is:" (prn-str id->heartbeat))
+ approved-ids (set (keys (ls-approved-workers local-state)))
+ _ (log-message "zliu approved-ids is:" (prn-str approved-ids))]
(into
{}
(dofor [[id hb] id->heartbeat]
@@ -196,7 +199,7 @@
)))
(defn- wait-for-worker-launch [conf id start-time]
- (let [state (worker-state conf id)]
+ (let [state (ConfigUtils/workerState conf id)]
(loop []
(let [hb (ls-worker-heartbeat state)]
(when (and
@@ -258,16 +261,16 @@
(defn try-cleanup-worker [conf id]
(try
- (if (.exists (File. (worker-root conf id)))
+ (if (.exists (File. (ConfigUtils/workerRoot conf id)))
(do
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
- (rmr-as-user conf id (worker-root conf id))
+ (rmr-as-user conf id (ConfigUtils/workerRoot conf id))
(do
- (rmr (worker-heartbeats-root conf id))
+ (rmr (ConfigUtils/workerHeartbeatsRoot conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
- (rmr (worker-pids-root conf id))
- (rmr (worker-root conf id))))
- (remove-worker-user! conf id)
+ (rmr (ConfigUtils/workerPidsRoot conf id))
+ (rmr (ConfigUtils/workerRoot conf id))))
+ (ConfigUtils/removeWorkerUserWSE conf id)
(remove-dead-worker id)
))
(catch IOException e
@@ -281,11 +284,12 @@
(defn shutdown-worker [supervisor id]
(log-message "Shutting down " (:supervisor-id supervisor) ":" id)
(let [conf (:conf supervisor)
- pids (read-dir-contents (worker-pids-root conf id))
+ pids (read-dir-contents (ConfigUtils/workerPidsRoot conf id))
+ _ (log-message "zliu pids are:" (pr-str pids) ", worker-pids-root is:" (ConfigUtils/workerPidsRoot conf id))
thread-pid (@(:worker-thread-pids-atom supervisor) id)
shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS)
as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
- user (get-worker-user conf id)]
+ user (ConfigUtils/getWorkerUser conf id)]
(when thread-pid
(psim/kill-process thread-pid))
(doseq [pid pids]
@@ -300,9 +304,9 @@
(worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
(force-kill-process pid))
(if as-user
- (rmr-as-user conf id (worker-pid-path conf id pid))
+ (rmr-as-user conf id (ConfigUtils/workerPidPath conf id pid))
(try
- (rmpath (worker-pid-path conf id pid))
+ (rmpath (ConfigUtils/workerPidPath conf id pid))
(catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
(try-cleanup-worker conf id))
(log-message "Shut down " (:supervisor-id supervisor) ":" id))
@@ -378,8 +382,9 @@
(do
(log-message "Launching worker with assignment "
(get-worker-assignment-helper-msg assignment supervisor port id))
- (local-mkdirs (worker-pids-root conf id))
- (local-mkdirs (worker-heartbeats-root conf id))
+ (local-mkdirs (ConfigUtils/workerPidsRoot conf id))
+ (log-message "zliu create worker heartbeatroot as " (ConfigUtils/workerHeartbeatsRoot conf id))
+ (local-mkdirs (ConfigUtils/workerHeartbeatsRoot conf id))
(launch-worker supervisor
(:storm-id assignment)
port
@@ -396,6 +401,9 @@
^LocalState local-state (:local-state supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
assigned-executors (defaulted (ls-local-assignments local-state) {})
+ _ (log-message "zliu storm-cluster-state is " (prn-str storm-cluster-state))
+ _ (log-message "zliu local-state is " (prn-str local-state))
+ _ (log-message "zliu assigned-executors is " (prn-str assigned-executors))
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
keepers (filter-val
@@ -420,6 +428,9 @@
(log-debug "Syncing processes")
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Allocated: " allocated)
+ (log-message "zliu Syncing processes")
+ (log-message "zliu allocated is " (prn-str allocated))
+ (log-message "zliu new-worker-ids is " (prn-str new-worker-ids))
(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
@@ -1017,7 +1028,7 @@
[conf storm-id worker-id]
(let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
- workerroot (worker-root conf worker-id)
+ workerroot (ConfigUtils/workerRoot conf worker-id)
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
blob-file-names (get-blob-file-names blobstore-map)
resource-file-names (cons RESOURCES-SUBDIR blob-file-names)]
@@ -1030,7 +1041,7 @@
(defn create-artifacts-link
"Create a symlink from workder directory to its port artifacts directory"
[conf storm-id port worker-id]
- (let [worker-dir (worker-root conf worker-id)
+ (let [worker-dir (ConfigUtils/workerRoot conf worker-id)
topo-dir (worker-artifacts-root conf storm-id)]
(log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
storm-id " to its port artifacts directory")
@@ -1127,13 +1138,13 @@
command (->> command (map str) (filter (complement empty?)))]
(log-message "Launching worker with command: " (shell-cmd command))
(write-log-metadata! storm-conf user worker-id storm-id port conf)
- (set-worker-user! conf worker-id user)
+ (ConfigUtils/setWorkerUserWSE conf worker-id user)
(create-artifacts-link conf storm-id port worker-id)
(let [log-prefix (str "Worker Process " worker-id)
callback (fn [exit-code]
(log-message log-prefix " exited with code: " exit-code)
(add-dead-worker worker-id))
- worker-dir (worker-root conf worker-id)]
+ worker-dir (ConfigUtils/workerRoot conf worker-id)]
(remove-dead-worker worker-id)
(create-blobstore-links conf storm-id worker-id)
(if run-worker-as-user
@@ -1186,7 +1197,7 @@
(:assignment-id supervisor)
port
worker-id)]
- (set-worker-user! conf worker-id "")
+ (ConfigUtils/setWorkerUserWSE conf worker-id "")
(psim/register-process pid worker)
(swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
))
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 33b8af9..da5a537 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -43,7 +43,7 @@
(:storm-id worker)
(ConfigUtils/supervisorStormResourcesPath
(ConfigUtils/supervisorStormDistRoot conf (:storm-id worker)))
- (worker-pids-root conf (:worker-id worker))
+ (ConfigUtils/workerPidsRoot conf (:worker-id worker))
(int %)
(:port worker)
(:task-ids worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 92093ae..590ea4c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -77,7 +77,7 @@
(defn do-heartbeat [worker]
(let [conf (:conf worker)
- state (worker-state conf (:worker-id worker))]
+ state (ConfigUtils/workerState conf (:worker-id worker))]
;; do the local-file-system heartbeat.
(ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker))
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
@@ -252,7 +252,7 @@
(mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
(into {}))
- topology (read-supervisor-topology conf storm-id)
+ topology (ConfigUtils/readSupervisorTopology conf storm-id)
mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf))]
@@ -581,7 +581,7 @@
;; process. supervisor will register it in this case
(when (= :distributed (ConfigUtils/clusterMode conf))
(let [pid (process-pid)]
- (touch (worker-pid-path conf worker-id pid))
+ (touch (ConfigUtils/workerPidPath conf worker-id pid))
(spit (worker-artifacts-pid-path conf storm-id port) pid)))
(declare establish-log-setting-callback)
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 08752c5..7d76eeb 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -345,7 +345,7 @@
(let [supervisor-id (:supervisor-id supervisor)
conf (:conf supervisor)
existing (get @capture-atom [supervisor-id port] [])]
- (set-worker-user! conf worker-id "")
+ (ConfigUtils/setWorkerUserWSE conf worker-id "")
(swap! capture-atom assoc [supervisor-id port] (conj existing storm-id)))))
(defn find-worker-id
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index a489e74..a739482 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -21,15 +21,11 @@ package org.apache.storm.utils;
import org.apache.storm.Config;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.Utils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
@@ -469,13 +465,16 @@ public class ConfigUtils {
return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "/workers-users");
}
+ /* Never get used TODO : may delete it*/
public static String workerUserFile(Map conf, String workerId) {
return (workerUserRoot(conf) + FILE_SEPARATOR + workerId);
}
- public static String getWorkerUser(Map conf, String workerId) throws IOException {
+ public static String getWorkerUser(Map conf, String workerId) {
LOG.info("GET worker-user for {}", workerId);
File file = new File(workerUserFile(conf, workerId));
+ //LOG.info("zliu to read access worker user file: " + (file.getCanonicalPath()));
+ LOG.info("zliu this file is existed?" + file.exists());
try (InputStream in = new FileInputStream(file);
Reader reader = new InputStreamReader(in);
@@ -488,6 +487,9 @@ public class ConfigUtils {
}
String ret = sb.toString().trim();
return ret;
+ } catch (IOException e) {
+ LOG.error("Failed to get worker user for " + workerId + ".");
+ return null;
}
}
@@ -504,9 +506,33 @@ public class ConfigUtils {
return ret;
}
+ //For testing only
+ // for java
+ // try (SetMockedWorkerUserWSE mocked = new SetMockedWorkerUserWSE(conf)) {
+ // run test ...
+ // }
+ //
+ // for clojure
+ // (with-open [mock (SetMockedWorkerUserWSE. conf)]
+ // run test ...)
+ public static class SetMockedWorkerUserWSE implements Closeable {
+ public SetMockedWorkerUserWSE(Map conf) {
+ mockedWorkerUserWSE = conf;
+ }
+
+ @Override
+ public void close() {
+ mockedWorkerUserWSE = null;
+ }
+ }
+ private static Map mockedWorkerUserWSE = null;
public static void setWorkerUserWSE(Map conf, String workerId, String user) throws IOException {
+ if (mockedWorkerUserWSE != null) {
+ return;
+ }
LOG.info("SET worker-user {} {}", workerId, user);
File file = new File(workerUserFile(conf, workerId));
+ LOG.info("zliu SET worker-user to create worker file:" + file.getCanonicalPath());
file.getParentFile().mkdirs();
try (FileWriter fw = new FileWriter(file);
@@ -562,6 +588,9 @@ public class ConfigUtils {
}
public static String workerRoot(Map conf) {
+ LOG.info("zliu workers root's current listFiles are:");
+ File r = new File((absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers")); //TODO delete me
+ if (r.exists()) for (File f : r.listFiles()) {LOG.info(f.getName());} //TODO delete me
return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers");
}
@@ -573,16 +602,16 @@ public class ConfigUtils {
return (workerRoot(conf, id) + FILE_SEPARATOR + "pids");
}
- public static String workerPidsRoot(Map conf, String id, String pid) {
+ public static String workerPidPath(Map conf, String id, String pid) {
return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid);
}
- public static String workerHeartbeatRoot(Map conf, String id) {
+ public static String workerHeartbeatsRoot(Map conf, String id) {
return (workerRoot(conf, id) + FILE_SEPARATOR + "heartbeats");
}
public static LocalState workerState(Map conf, String id) throws IOException {
- return new LocalState(workerHeartbeatRoot(conf, id));
+ return new LocalState(workerHeartbeatsRoot(conf, id));
}
public static void overrideLoginConfigWithSystemProperty(Map conf) { // note that we delete the return value
http://git-wip-us.apache.org/repos/asf/storm/blob/0dbdcf39/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 8a07006..5c9a279 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -320,10 +320,10 @@
mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-string-opts}]
(with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
- mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+ mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
(stubbing [add-to-classpath mock-cp
launch-process nil
- set-worker-user! nil
supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
supervisor/write-log-metadata! nil
@@ -346,10 +346,10 @@
mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-list-opts}]
(with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
- mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+ mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
(stubbing [add-to-classpath mock-cp
launch-process nil
- set-worker-user! nil
supervisor/jlp nil
supervisor/write-log-metadata! nil
supervisor/create-blobstore-links nil
@@ -369,10 +369,10 @@
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
(with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
- mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+ mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
(stubbing [supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
- set-worker-user! nil
supervisor/write-log-metadata! nil
launch-process nil
current-classpath (str file-path-separator "base")
@@ -393,11 +393,11 @@
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
mocked-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}]
(with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
- mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+ mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
(stubbing [supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
launch-process nil
- set-worker-user! nil
supervisor/write-log-metadata! nil
current-classpath (str file-path-separator "base")
supervisor/create-blobstore-links nil]
@@ -476,10 +476,10 @@
topo-string-opts
TOPOLOGY-SUBMITTER-USER "me"}]
(with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
- mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+ mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
(stubbing [add-to-classpath mock-cp
launch-process nil
- set-worker-user! nil
supervisor/java-cmd "java"
supervisor/jlp nil
supervisor/write-log-metadata! nil]
@@ -508,10 +508,10 @@
topo-list-opts
TOPOLOGY-SUBMITTER-USER "me"}]
(with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
- mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)
+ mock3 (org.apache.storm.utils.ConfigUtils$SetMockedWorkerUserWSE. {})]
(stubbing [add-to-classpath mock-cp
launch-process nil
- set-worker-user! nil
supervisor/java-cmd "java"
supervisor/jlp nil
supervisor/write-log-metadata! nil]