You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:40 UTC
[04/23] storm git commit: Updated to read-supervisor-storm-conf
Updated to read-supervisor-storm-conf
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ba70bf1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ba70bf1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ba70bf1c
Branch: refs/heads/master
Commit: ba70bf1cfe72aeabec104d8ea664f087b719a08c
Parents: 310128d
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Jan 15 14:28:04 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/supervisor.clj | 21 +++---
.../src/clj/org/apache/storm/daemon/worker.clj | 2 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 23 +++++++
.../clj/org/apache/storm/supervisor_test.clj | 72 ++++++++++----------
4 files changed, 72 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 76af578..4296a86 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -472,7 +472,7 @@
(defn remove-blob-references
"Remove a reference to a blob when its no longer needed."
[localizer storm-id conf]
- (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
user (storm-conf TOPOLOGY-SUBMITTER-USER)
topo-name (storm-conf TOPOLOGY-NAME)]
@@ -495,7 +495,7 @@
"For each of the downloaded topologies, adds references to the blobs that the topologies are
using. This is used to reconstruct the cache on restart."
[localizer storm-id conf]
- (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
user (storm-conf TOPOLOGY-SUBMITTER-USER)
topo-name (storm-conf TOPOLOGY-NAME)
@@ -613,7 +613,7 @@
"Update each blob listed in the topology configuration if the latest version of the blob
has not been downloaded."
[conf storm-id localizer]
- (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
user (storm-conf TOPOLOGY-SUBMITTER-USER)
localresources (blobstore-map-to-localresources blobstore-map)]
@@ -724,7 +724,7 @@
action ^ProfileAction (:action pro-action)
stop? (> (System/currentTimeMillis) (:timestamp pro-action))
target-dir (worker-artifacts-root conf storm-id port)
- storm-conf (read-supervisor-storm-conf conf storm-id)
+ storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
user (storm-conf TOPOLOGY-SUBMITTER-USER)
environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {})
worker-pid (slurp (worker-artifacts-pid-path conf storm-id port))
@@ -950,7 +950,7 @@
(FileUtils/forceMkdir (File. stormroot))
(Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot))
(doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE)))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot))
+ (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot))
(do
(log-message "Failed to download blob resources for storm-id " storm-id)
(rmr tmproot)))))
@@ -962,7 +962,10 @@
(when (not (.exists (.getParentFile file)))
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(do (FileUtils/forceMkdir (.getParentFile file))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
+ (setup-storm-code-dir
+ conf
+ (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
+ (.getCanonicalPath (.getParentFile file))))
(.mkdirs (.getParentFile file))))
(let [writer (java.io.FileWriter. file)
yaml (Yaml.)]
@@ -1013,7 +1016,7 @@
"Create symlinks in worker launch directory for all blobs"
[conf storm-id worker-id]
(let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id)
- storm-conf (read-supervisor-storm-conf conf storm-id)
+ storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
workerroot (worker-root conf worker-id)
blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
blob-file-names (get-blob-file-names blobstore-map)
@@ -1053,7 +1056,7 @@
jlp (jlp stormroot conf)
stormjar (ConfigUtils/supervisorStormJarPath stormroot)
_ (log-message "zliu stormjar: " stormjar)
- storm-conf (read-supervisor-storm-conf conf storm-id)
+ storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
[cp]
[])
@@ -1158,7 +1161,7 @@
(finally
(.shutdown blob-store)))
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+ (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)
(let [classloader (.getContextClassLoader (Thread/currentThread))
resources-jar (resources-jar)
url (.getResource classloader RESOURCES-SUBDIR)
http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index d4d66dc..92093ae 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -590,7 +590,7 @@
(def latest-log-config (atom {}))
(def original-log-levels (atom {}))
- (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ (let [storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id))
storm-conf (override-login-config-with-system-property storm-conf)
acls (Utils/getWorkerACL storm-conf)
cluster-state (cluster/mk-distributed-cluster-state conf :auth-conf storm-conf :acls acls :context (ClusterStateContext. DaemonType/WORKER))
http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index b657ec4..a489e74 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -430,7 +430,30 @@ public class ConfigUtils {
return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"));
}
+ //For testing only
+ // for java
+ // try (SetMockedSupervisorStormConf mocked = new SetMockedSupervisorStormConf(conf)) {
+ // run test ...
+ // }
+ //
+ // for clojure
+ // (with-open [mock (SetMockedSupervisorStormConf. conf)]
+ // run test ...)
+ public static class SetMockedSupervisorStormConf implements Closeable {
+ public SetMockedSupervisorStormConf(Map conf) {
+ mockedSupervisorStormConf = conf;
+ }
+
+ @Override
+ public void close() {
+ mockedSupervisorStormConf = null;
+ }
+ }
+ private static Map mockedSupervisorStormConf = null;
public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException {
+ if (mockedSupervisorStormConf != null) {
+ return mockedSupervisorStormConf;
+ }
String stormRoot = supervisorStormDistRoot(conf, stormId);
String confPath = supervisorStormConfPath(stormRoot);
return readSupervisorStormConfGivenPath(conf, confPath);
http://git-wip-us.apache.org/repos/asf/storm/blob/ba70bf1c/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 3b5877c..8a07006 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -316,13 +316,13 @@
mock-cp)
_ (log-message "zliu testing 1, exp-args is: " exp-args)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
- WORKER-CHILDOPTS string-opts}}]
- (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-string-opts}
- add-to-classpath mock-cp
+ WORKER-CHILDOPTS string-opts}}
+ mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-string-opts}]
+ (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ (stubbing [add-to-classpath mock-cp
launch-process nil
- supervisor-stormdist-root nil
set-worker-user! nil
supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
@@ -342,15 +342,15 @@
exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
_ (log-message "zliu testing 2, exp-args is: " exp-args)
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
- WORKER-CHILDOPTS list-opts}}]
- (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts}
- add-to-classpath mock-cp
+ WORKER-CHILDOPTS list-opts}}
+ mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-list-opts}]
+ (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ (stubbing [add-to-classpath mock-cp
launch-process nil
set-worker-user! nil
supervisor/jlp nil
- supervisor-stormdist-root nil
supervisor/write-log-metadata! nil
supervisor/create-blobstore-links nil
worker-artifacts-root "/tmp/workers-artifacts"]
@@ -366,13 +366,13 @@
(let [topo-cp (str file-path-separator "any" file-path-separator "path")
exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
_ (log-message "zliu testing 3, exp-args is: " exp-args ", str is " (pr-str exp-args))
- mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
- (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
- supervisor/jlp nil
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
+ mocked-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}]
+ (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ (stubbing [supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
set-worker-user! nil
- supervisor-stormdist-root nil
supervisor/write-log-metadata! nil
launch-process nil
current-classpath (str file-path-separator "base")
@@ -390,14 +390,14 @@
full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
exp-args (exp-args-fn [] [] mock-cp)
_ (log-message "zliu testing 4, exp-args is: " exp-args)
- mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
- (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
- supervisor/jlp nil
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}
+ mocked-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}]
+ (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ (stubbing [supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
launch-process nil
set-worker-user! nil
- supervisor-stormdist-root nil
supervisor/write-log-metadata! nil
current-classpath (str file-path-separator "base")
supervisor/create-blobstore-links nil]
@@ -471,15 +471,15 @@
STORM-LOCAL-DIR storm-local
STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
SUPERVISOR-RUN-WORKER-AS-USER true
- WORKER-CHILDOPTS string-opts}}]
- (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-string-opts
- TOPOLOGY-SUBMITTER-USER "me"}
- add-to-classpath mock-cp
+ WORKER-CHILDOPTS string-opts}}
+ mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-string-opts
+ TOPOLOGY-SUBMITTER-USER "me"}]
+ (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ (stubbing [add-to-classpath mock-cp
launch-process nil
set-worker-user! nil
- supervisor-stormdist-root nil
supervisor/java-cmd "java"
supervisor/jlp nil
supervisor/write-log-metadata! nil]
@@ -503,14 +503,14 @@
STORM-LOCAL-DIR storm-local
STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
SUPERVISOR-RUN-WORKER-AS-USER true
- WORKER-CHILDOPTS list-opts}}]
- (with-open [mock (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})]
- (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
- topo-list-opts
- TOPOLOGY-SUBMITTER-USER "me"}
- add-to-classpath mock-cp
+ WORKER-CHILDOPTS list-opts}}
+ mocked-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-list-opts
+ TOPOLOGY-SUBMITTER-USER "me"}]
+ (with-open [mock1 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormDistRoot. {})
+ mock2 (org.apache.storm.utils.ConfigUtils$SetMockedSupervisorStormConf. mocked-supervisor-storm-conf)]
+ (stubbing [add-to-classpath mock-cp
launch-process nil
- supervisor-stormdist-root nil
set-worker-user! nil
supervisor/java-cmd "java"
supervisor/jlp nil