You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/04/22 18:39:17 UTC
[05/10] git commit: Support *.worker.childopts as list or as string
Support *.worker.childopts as list or as string
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c970225d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c970225d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c970225d
Branch: refs/heads/master
Commit: c970225dc43721e1a7827408df7542aed94e11cb
Parents: 6d1ad6a
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Wed Jan 15 02:02:41 2014 +0000
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Wed Jan 15 03:16:53 2014 +0000
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 45 +++++++++++---------
storm-core/src/jvm/backtype/storm/Config.java | 5 +--
.../jvm/backtype/storm/ConfigValidation.java | 17 ++++++++
.../test/clj/backtype/storm/config_test.clj | 20 +++++++++
4 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index a38ee5c..1630766 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -438,6 +438,11 @@
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
))
+(defn- substitute-worker-childopts [value port]
+ (let [sub-fn (fn [s] (.replaceAll s "%ID%" (str port)))]
+ (if (list? value)
+ (map sub-fn value)
+ (-> value sub-fn (.split " ")))))
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
@@ -448,27 +453,27 @@
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
- (.replaceAll s "%ID%" (str port)))
- topo-worker-childopts (when-let [s (conf TOPOLOGY-WORKER-CHILDOPTS)]
- (.replaceAll s "%ID%" (str port)))
+ (substitute-worker-childopts s port))
+ topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
+ (substitute-worker-childopts s port))
logfilename (str "worker-" port ".log")
- command ["java"
- "-server"
- worker-childopts
- topo-worker-childopts
- (str "-Djava.library.path=" (conf JAVA-LIBRARY-PATH))
- (str "-Dlogfile.name=" logfilename)
- (str "-Dstorm.home=" storm-home)
- (str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml")
- (str "-Dstorm.id=" storm-id)
- (str "-Dworker.id=" worker-id)
- (str "-Dworker.port=" port)
- "-cp" classpath
- "backtype.storm.daemon.worker"
- storm-id
- (:assignment-id supervisor)
- port
- worker-id]
+ command (concat
+ ["java" "-server"]
+ worker-childopts
+ topo-worker-childopts
+ [(str "-Djava.library.path=" (conf JAVA-LIBRARY-PATH))
+ (str "-Dlogfile.name=" logfilename)
+ (str "-Dstorm.home=" storm-home)
+ (str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml")
+ (str "-Dstorm.id=" storm-id)
+ (str "-Dworker.id=" worker-id)
+ (str "-Dworker.port=" port)
+ "-cp" classpath
+ "backtype.storm.daemon.worker"
+ storm-id
+ (:assignment-id supervisor)
+ port
+ worker-id])
command (->> command (map str) (filter (complement empty?)))
shell-cmd (->> command (map #(str \' % \')) (clojure.string/join " "))]
(log-message "Launching worker with command: " shell-cmd)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 281ae52..1fc0d78 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -459,8 +459,7 @@ public class Config extends HashMap<String, Object> {
* with an identifier for this worker.
*/
public static final String WORKER_CHILDOPTS = "worker.childopts";
- public static final Object WORKER_CHILDOPTS_SCHEMA = String.class;
-
+ public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* How often this worker should heartbeat to the supervisor.
@@ -662,7 +661,7 @@ public class Config extends HashMap<String, Object> {
* Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS.
*/
public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
- public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = String.class;
+ public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
* This config is available for TransactionalSpouts, and contains the id ( a String) for
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 15ef6ba..3accb82 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -135,4 +135,21 @@ public class ConfigValidation {
"Field " + name + " must be an Iterable containing only Strings or Maps of Strings");
}
};
+
+ /**
+ * Validates a String or a list of Strings
+ */
+ public static Object StringOrStringListValidator = new FieldValidator() {
+
+ private FieldValidator fv = FieldListValidatorFactory(String.class);
+
+ @Override
+ public void validateField(String name, Object o) throws IllegalArgumentException {
+ if (o == null || o instanceof String) {
+ // A null value or a String value is acceptable
+ return;
+ }
+ this.fv.validateField(name, o);
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 93c7df9..01f788b 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -83,3 +83,23 @@
(catch Exception e e))))
(is (thrown-cause? java.lang.IllegalArgumentException
(.validateField validator "test" 42)))))
+
+(deftest test-worker-childopts-is-string-or-string-list
+ (let [pass-cases [nil "some string" ["some" "string" "list"]]]
+ (testing "worker.childopts validates"
+ (let [validator (CONFIG-SCHEMA-MAP WORKER-CHILDOPTS)]
+ (doseq [value pass-cases]
+ (is (nil? (try
+ (.validateField validator "test" value)
+ (catch Exception e e)))))
+ (is (thrown-cause? java.lang.IllegalArgumentException
+ (.validateField validator "test" 42)))))
+
+ (testing "topology.worker.childopts validates"
+ (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKER-CHILDOPTS)]
+ (doseq [value pass-cases]
+ (is (nil? (try
+ (.validateField validator "test" value)
+ (catch Exception e e)))))
+ (is (thrown-cause? java.lang.IllegalArgumentException
+ (.validateField validator "test" 42)))))))