You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:48 UTC
[12/23] storm git commit: Updated to callings for
supervisor-stormjar-path
Updated to callings for supervisor-stormjar-path
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/635488db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/635488db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/635488db
Branch: refs/heads/master
Commit: 635488db5e0670f1e4348fab5e3fd7d3970c5e63
Parents: 3d2796d
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Jan 15 12:50:00 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/config.clj | 7 +-
.../src/clj/org/apache/storm/daemon/common.clj | 2 +-
.../clj/org/apache/storm/daemon/supervisor.clj | 36 ++++++---
.../src/clj/org/apache/storm/daemon/task.clj | 4 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 83 +++++++++++++++++---
.../clj/org/apache/storm/supervisor_test.clj | 58 ++++++++------
6 files changed, 135 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index d65c439..28b06f5 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -89,7 +89,9 @@
(defn absolute-storm-local-dir [conf]
(let [storm-home (System/getProperty "storm.home")
- path (conf STORM-LOCAL-DIR)]
+ _ (log-message "zliu clojure storm.home is " storm-home)
+ path (conf STORM-LOCAL-DIR)
+ _ (log-message "zliu clojure path is " path)]
(if path
(if (is-absolute-path? path) path (str storm-home file-path-separator path))
(str storm-home file-path-separator "storm-local"))))
@@ -168,7 +170,8 @@
(defn supervisor-local-dir
[conf]
- (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")]
+ (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")
+ _ (log-message "zliu supervisor-local-dir ret is " ret)]
(FileUtils/forceMkdir (File. ret))
ret))
http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 77791f4..a951a75 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -376,7 +376,7 @@
(:component->stream->fields worker)
(:storm-id worker)
(supervisor-storm-resources-path
- (supervisor-stormdist-root (:conf worker) (:storm-id worker)))
+ (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
(worker-pids-root (:conf worker) (:worker-id worker))
(:port worker)
(:task-ids worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 838a98f..58d046a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -118,7 +118,11 @@
(map-val :master-code-dir assignments-snapshot))
(defn- read-downloaded-storm-ids [conf]
- (map #(url-decode %) (read-dir-contents (supervisor-stormdist-root conf)))
+ (let [dir (ConfigUtils/supervisorStormDistRoot conf)
+ _ (log-message "zliu java supervisorLocalDir " (ConfigUtils/supervisorLocalDir conf))
+ cdir (supervisor-stormdist-root conf)
+ _ (log-message "zliu jdir is:" dir ",cdir is" cdir ",clj dir equals to nil? " (nil? cdir) "jdir = cdir?" (= dir cdir))]
+ (map #(url-decode %) (read-dir-contents dir))) ; (supervisor-stormdist-root conf)));
)
(defn read-worker-heartbeat [conf id]
@@ -347,8 +351,8 @@
(defn required-topo-files-exist?
[conf storm-id]
- (let [stormroot (supervisor-stormdist-root conf storm-id)
- stormjarpath (supervisor-stormjar-path stormroot)
+ (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
+ stormjarpath (ConfigUtils/supervisorStormJarPath stormroot)
stormcodepath (supervisor-stormcode-path stormroot)
stormconfpath (supervisor-stormconf-path stormroot)]
(and (every? exists-file? [stormroot stormconfpath stormcodepath])
@@ -501,13 +505,13 @@
(defn rm-topo-files
[conf storm-id localizer rm-blob-refs?]
- (let [path (supervisor-stormdist-root conf storm-id)]
+ (let [path (ConfigUtils/supervisorStormDistRoot conf storm-id)]
(try
(if rm-blob-refs?
(remove-blob-references localizer storm-id conf))
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(rmr-as-user conf storm-id path)
- (rmr (supervisor-stormdist-root conf storm-id)))
+ (rmr (ConfigUtils/supervisorStormDistRoot conf storm-id)))
(catch Exception e
(log-message e (str "Exception removing: " storm-id))))))
@@ -632,7 +636,7 @@
new-assignment @(:curr-assignment supervisor)
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
(doseq [topology-id downloaded-storm-ids]
- (let [storm-root (supervisor-stormdist-root conf topology-id)]
+ (let [storm-root (ConfigUtils/supervisorStormDistRoot conf topology-id)]
(when (assigned-storm-ids topology-id)
(log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root)
(update-blobs-for-topology! conf topology-id (:localizer supervisor))))))
@@ -923,7 +927,7 @@
:distributed [conf storm-id master-code-dir localizer]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
- stormroot (supervisor-stormdist-root conf storm-id)
+ stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
blobstore (Utils/getClientBlobStoreForSupervisor conf)]
(FileUtils/forceMkdir (File. tmproot))
(if-not on-windows?
@@ -931,13 +935,13 @@
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
(Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id)
- (supervisor-stormjar-path tmproot) blobstore)
+ (ConfigUtils/supervisorStormJarPath tmproot) blobstore)
(Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id)
(supervisor-stormcode-path tmproot) blobstore)
(Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id)
(supervisor-stormconf-path tmproot) blobstore)
(.shutdown blobstore)
- (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+ (extract-dir-from-jar (ConfigUtils/supervisorStormJarPath tmproot) RESOURCES-SUBDIR tmproot)
(download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
tmproot)
(if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
@@ -1008,7 +1012,7 @@
(defn create-blobstore-links
"Create symlinks in worker launch directory for all blobs"
[conf storm-id worker-id]
- (let [stormroot (supervisor-stormdist-root conf storm-id)
+ (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
storm-conf (read-supervisor-storm-conf conf storm-id)
workerroot (worker-root conf worker-id)
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
@@ -1044,16 +1048,22 @@
storm-log-conf-dir
(str storm-home file-path-separator storm-log-conf-dir))
(str storm-home file-path-separator "log4j2"))
- stormroot (supervisor-stormdist-root conf storm-id)
+ stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
+ _ (log-message "zliu stormroot: " stormroot)
jlp (jlp stormroot conf)
- stormjar (supervisor-stormjar-path stormroot)
+ stormjar (ConfigUtils/supervisorStormJarPath stormroot)
+ _ (log-message "zliu stormjar: " stormjar)
storm-conf (read-supervisor-storm-conf conf storm-id)
topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
[cp]
[])
+ _ (log-message "zliu worker-classpath: " (worker-classpath))
+ _ (log-message "zliu stormjar: " stormjar)
+ _ (log-message "zliu topo-classpath: " topo-classpath)
classpath (-> (worker-classpath)
(add-to-classpath [stormjar])
(add-to-classpath topo-classpath))
+ _ (log-message "zliu classpath" classpath)
top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero
(int (Math/ceil mem-onheap)) ;; round up
@@ -1139,7 +1149,7 @@
(defmethod download-storm-code
:local [conf storm-id master-code-dir localizer]
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
- stormroot (supervisor-stormdist-root conf storm-id)
+ stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
(try
(FileUtils/forceMkdir (File. tmproot))
http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 1ae9b22..a8114be 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -23,7 +23,7 @@
(:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo])
(:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
- (:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.utils Utils ConfigUtils])
(:import [org.apache.storm.generated ShellComponent JavaObject])
(:import [org.apache.storm.spout ShellSpout])
(:import [java.util Collection List ArrayList])
@@ -42,7 +42,7 @@
(:component->stream->fields worker)
(:storm-id worker)
(supervisor-storm-resources-path
- (supervisor-stormdist-root conf (:storm-id worker)))
+ (ConfigUtils/supervisorStormDistRoot conf (:storm-id worker)))
(worker-pids-root conf (:worker-id worker))
(int %)
(:port worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 3dd6d4e..5ada400 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -129,10 +129,8 @@ public class ConfigUtils {
// }
//
// for clojure
- // (let [something (SetMockedStormConfig. conf)]
- // (try
- // run test ...
- // (finally (.close something))))
+ // (with-open [mock (SetMockedStormConfig. conf)]
+ // run test ...)
public static class SetMockedStormConfig implements Closeable {
public SetMockedStormConfig(Map conf) {
mockedStormConfig = conf;
@@ -162,11 +160,14 @@ public class ConfigUtils {
}
public static String absoluteStormLocalDir(Map conf) {
+ LOG.info("zliu conf map is " + conf);
String stormHome = System.getProperty("storm.home");
- String localDir = String.valueOf(conf.get(Config.STORM_LOCAL_DIR));
- if (localDir.equals("null")) {
+ LOG.info("zliu stormhome is " + stormHome);
+ String localDir = (String) conf.get(Config.STORM_LOCAL_DIR);
+ if (localDir == null) {
return (stormHome + FILE_SEPARATOR + "storm-local");
} else {
+ LOG.info("zliu java local dir is " + localDir + ", isAbsolute:" + (new File(localDir).isAbsolute()));
if (new File(localDir).isAbsolute()) {
return localDir;
} else {
@@ -225,7 +226,12 @@ public class ConfigUtils {
}
public static String stormDistPath(String stormRoot) {
- return stormRoot + FILE_SEPARATOR + "stormdist";
+ String ret = "";
+ // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+ if (stormRoot != null) {
+ ret = stormRoot;
+ }
+ return ret + FILE_SEPARATOR + "stormdist";
}
public static String stormTmpPath(String stormRoot) {
@@ -280,7 +286,7 @@ public class ConfigUtils {
}
public static String supervisorLocalDir(Map conf) throws IOException {
- String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPARATOR + "supervisor";
+ String ret = absoluteStormLocalDir(conf) + FILE_SEPARATOR + "supervisor";
FileUtils.forceMkdir(new File(ret));
return ret;
}
@@ -289,28 +295,74 @@ public class ConfigUtils {
return ((supervisorLocalDir(conf) + FILE_SEPARATOR + "isupervisor"));
}
+ //For testing only
+ // for java
+ // try (SetMockedSupervisorStormDistRoot mocked = new SetMockedSupervisorStormDistRoot(conf)) {
+ // run test ...
+ // }
+ //
+ // for clojure
+ // (with-open [mock (SetMockedSupervisorStormDistRoot. conf)]
+ // run test ...)
+ public static class SetMockedSupervisorStormDistRoot implements Closeable {
+ public SetMockedSupervisorStormDistRoot(Map conf) {
+ mockedSupervisorStormDistRoot = conf;
+ }
+ @Override
+ public void close() {
+ mockedSupervisorStormDistRoot = null;
+ }
+ }
+ private static Map mockedSupervisorStormDistRoot = null;
public static String supervisorStormDistRoot(Map conf) throws IOException {
+ LOG.info("zliu supervisorStormDistRoot resl is: " + stormDistPath(supervisorLocalDir(conf)) + "mocked set is " + mockedSupervisorStormDistRoot);
+ if (mockedSupervisorStormDistRoot != null) {
+ return null;
+ }
return stormDistPath(supervisorLocalDir(conf)); // TODO: no need to forceMake here?, clj does not.
}
public static String supervisorStormDistRoot(Map conf, String stormId) throws IOException {
+ if (mockedSupervisorStormDistRoot != null) {
+ return null;
+ }
return supervisorStormDistRoot(conf) + FILE_SEPARATOR + stormId; // TODO: need to (url-encode storm-id)? Not.
}
public static String supervisorStormJarPath(String stormRoot) {
- return (stormRoot + FILE_SEPARATOR + "stormjar.jar");
+ String ret = "";
+ // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+ if (stormRoot != null) {
+ ret = stormRoot;
+ }
+ return (ret + FILE_SEPARATOR + "stormjar.jar");
}
public static String supervisorStormMetaFilePath(String stormRoot) {
- return (stormRoot + FILE_SEPARATOR + "storm-code-distributor.meta");
+ String ret = "";
+ // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+ if (stormRoot != null) {
+ ret = stormRoot;
+ }
+ return (ret + FILE_SEPARATOR + "storm-code-distributor.meta");
}
public static String supervisorStormCodePath(String stormRoot) {
- return (stormRoot + FILE_SEPARATOR + "stormcode.ser");
+ String ret = "";
+ // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+ if (stormRoot != null) {
+ ret = stormRoot;
+ }
+ return (ret + FILE_SEPARATOR + "stormcode.ser");
}
public static String supervisorStormConfPath(String stormRoot) {
- return (stormRoot + FILE_SEPARATOR + "stormconf.ser");
+ String ret = "";
+ // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+ if (stormRoot != null) {
+ ret = stormRoot;
+ }
+ return (ret + FILE_SEPARATOR + "stormconf.ser");
}
public static String supervisorTmpDir(Map conf) throws IOException {
@@ -320,7 +372,12 @@ public class ConfigUtils {
}
public static String supervisorStormResourcesPath(String stormRoot) {
- return (stormRoot + FILE_SEPARATOR + RESOURCES_SUBDIR);
+ String ret = "";
+ // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+ if (stormRoot != null) {
+ ret = stormRoot;
+ }
+ return (ret + FILE_SEPARATOR + RESOURCES_SUBDIR);
}
public static LocalState supervisorState(Map conf) throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/635488db/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index d9daf32..5be2059 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -26,7 +26,7 @@
(:import [java.io File])
(:import [java.nio.file Files])
(:import [java.nio.file.attribute FileAttribute])
- (:use [org.apache.storm config testing util timer])
+ (:use [org.apache.storm config testing util timer log])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
[org.apache.storm [thrift :as thrift] [cluster :as cluster]])
@@ -314,59 +314,65 @@
exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
["-Dkau=aux" "-Xmx2048m"]
mock-cp)
+ _ (log-message "zliu testing 1, exp-args is: " exp-args)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
WORKER-CHILDOPTS string-opts}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-string-opts}
add-to-classpath mock-cp
- supervisor-stormdist-root nil
launch-process nil
+ supervisor-stormdist-root nil
set-worker-user! nil
supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
supervisor/write-log-metadata! nil
supervisor/create-blobstore-links nil]
- (supervisor/launch-worker mock-supervisor
+ (supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id
mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
+ (verify-first-call-args-for-indices launch-process
[0]
- exp-args))))
+ exp-args)))))
(testing "testing *.worker.childopts as list of strings, with spaces in values"
(let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
+ _ (log-message "zliu testing 2, exp-args is: " exp-args)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
WORKER-CHILDOPTS list-opts}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-list-opts}
add-to-classpath mock-cp
- supervisor-stormdist-root nil
launch-process nil
set-worker-user! nil
supervisor/jlp nil
+ supervisor-stormdist-root nil
supervisor/write-log-metadata! nil
supervisor/create-blobstore-links nil
worker-artifacts-root "/tmp/workers-artifacts"]
- (supervisor/launch-worker mock-supervisor
+ (supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
mock-worker-id
mock-mem-onheap)
- (verify-first-call-args-for-indices launch-process
+ (verify-first-call-args-for-indices launch-process
[0]
- exp-args))))
+ exp-args)))))
(testing "testing topology.classpath is added to classpath"
(let [topo-cp (str file-path-separator "any" file-path-separator "path")
exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+ _ (log-message "zliu testing 3, exp-args is: " exp-args ", str is " (pr-str exp-args))
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
- supervisor-stormdist-root nil
+ (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
set-worker-user! nil
+ supervisor-stormdist-root nil
supervisor/write-log-metadata! nil
launch-process nil
current-classpath (str file-path-separator "base")
@@ -378,18 +384,20 @@
mock-mem-onheap)
(verify-first-call-args-for-indices launch-process
[0]
- exp-args))))
+ exp-args)))))
(testing "testing topology.environment is added to environment for worker launch"
(let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
exp-args (exp-args-fn [] [] mock-cp)
+ _ (log-message "zliu testing 4, exp-args is: " exp-args)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
- supervisor-stormdist-root nil
+ (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
launch-process nil
set-worker-user! nil
+ supervisor-stormdist-root nil
supervisor/write-log-metadata! nil
current-classpath (str file-path-separator "base")
supervisor/create-blobstore-links nil]
@@ -400,7 +408,7 @@
mock-mem-onheap)
(verify-first-call-args-for-indices launch-process
[2]
- full-env)))))))
+ full-env))))))))
(deftest test-worker-launch-command-run-as-user
(testing "*.worker.childopts configuration"
@@ -464,13 +472,14 @@
STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
SUPERVISOR-RUN-WORKER-AS-USER true
WORKER-CHILDOPTS string-opts}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-string-opts
TOPOLOGY-SUBMITTER-USER "me"}
add-to-classpath mock-cp
- supervisor-stormdist-root nil
launch-process nil
set-worker-user! nil
+ supervisor-stormdist-root nil
supervisor/java-cmd "java"
supervisor/jlp nil
supervisor/write-log-metadata! nil]
@@ -481,7 +490,7 @@
mock-mem-onheap)
(verify-first-call-args-for-indices launch-process
[0]
- exp-launch))
+ exp-launch)))
(is (= (slurp worker-script) exp-script))))
(finally (rmr storm-local)))
(.mkdirs (io/file storm-local "workers" mock-worker-id))
@@ -495,12 +504,13 @@
STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
SUPERVISOR-RUN-WORKER-AS-USER true
WORKER-CHILDOPTS list-opts}}]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
topo-list-opts
TOPOLOGY-SUBMITTER-USER "me"}
add-to-classpath mock-cp
- supervisor-stormdist-root nil
launch-process nil
+ supervisor-stormdist-root nil
set-worker-user! nil
supervisor/java-cmd "java"
supervisor/jlp nil
@@ -512,7 +522,7 @@
mock-mem-onheap)
(verify-first-call-args-for-indices launch-process
[0]
- exp-launch))
+ exp-launch)))
(is (= (slurp worker-script) exp-script))))
(finally (rmr storm-local))))))
@@ -731,4 +741,4 @@
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
(get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
+ )))
\ No newline at end of file