You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/04/22 18:39:17 UTC

[05/10] git commit: Support *.worker.childopts as list or as string

Support *.worker.childopts as list or as string


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c970225d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c970225d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c970225d

Branch: refs/heads/master
Commit: c970225dc43721e1a7827408df7542aed94e11cb
Parents: 6d1ad6a
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Wed Jan 15 02:02:41 2014 +0000
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Wed Jan 15 03:16:53 2014 +0000

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    | 45 +++++++++++---------
 storm-core/src/jvm/backtype/storm/Config.java   |  5 +--
 .../jvm/backtype/storm/ConfigValidation.java    | 17 ++++++++
 .../test/clj/backtype/storm/config_test.clj     | 20 +++++++++
 4 files changed, 64 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index a38ee5c..1630766 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -438,6 +438,11 @@
       (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
       ))
 
+(defn- substitute-worker-childopts [value port]
+  (let [sub-fn (fn [s] (.replaceAll s "%ID%" (str port)))]
+    (if (list? value)
+      (map sub-fn value)
+      (-> value sub-fn (.split " ")))))
 
 (defmethod launch-worker
     :distributed [supervisor storm-id port worker-id]
@@ -448,27 +453,27 @@
           storm-conf (read-supervisor-storm-conf conf storm-id)
           classpath (add-to-classpath (current-classpath) [stormjar])
           worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
-                             (.replaceAll s "%ID%" (str port)))
-          topo-worker-childopts (when-let [s (conf TOPOLOGY-WORKER-CHILDOPTS)]
-                                  (.replaceAll s "%ID%" (str port)))
+                             (substitute-worker-childopts s port))
+          topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
+                                  (substitute-worker-childopts s port))
           logfilename (str "worker-" port ".log")
-          command ["java"
-                   "-server"
-                   worker-childopts
-                   topo-worker-childopts
-                   (str "-Djava.library.path=" (conf JAVA-LIBRARY-PATH))
-                   (str "-Dlogfile.name=" logfilename)
-                   (str "-Dstorm.home=" storm-home)
-                   (str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml")
-                   (str "-Dstorm.id=" storm-id)
-                   (str "-Dworker.id=" worker-id)
-                   (str "-Dworker.port=" port)
-                   "-cp" classpath
-                   "backtype.storm.daemon.worker"
-                   storm-id
-                   (:assignment-id supervisor)
-                   port
-                   worker-id]
+          command (concat
+                    ["java" "-server"]
+                    worker-childopts
+                    topo-worker-childopts
+                    [(str "-Djava.library.path=" (conf JAVA-LIBRARY-PATH))
+                     (str "-Dlogfile.name=" logfilename)
+                     (str "-Dstorm.home=" storm-home)
+                     (str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml")
+                     (str "-Dstorm.id=" storm-id)
+                     (str "-Dworker.id=" worker-id)
+                     (str "-Dworker.port=" port)
+                     "-cp" classpath
+                     "backtype.storm.daemon.worker"
+                     storm-id
+                     (:assignment-id supervisor)
+                     port
+                     worker-id])
           command (->> command (map str) (filter (complement empty?)))
           shell-cmd (->> command (map #(str \' % \')) (clojure.string/join " "))]
       (log-message "Launching worker with command: " shell-cmd)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 281ae52..1fc0d78 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -459,8 +459,7 @@ public class Config extends HashMap<String, Object> {
      * with an identifier for this worker.
      */
     public static final String WORKER_CHILDOPTS = "worker.childopts";
-    public static final Object WORKER_CHILDOPTS_SCHEMA = String.class;
-
+    public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * How often this worker should heartbeat to the supervisor.
@@ -662,7 +661,7 @@ public class Config extends HashMap<String, Object> {
      * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS.
      */
     public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
-    public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = String.class;
+    public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
      * This config is available for TransactionalSpouts, and contains the id ( a String) for

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 15ef6ba..3accb82 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -135,4 +135,21 @@ public class ConfigValidation {
                     "Field " + name + " must be an Iterable containing only Strings or Maps of Strings");
         }
     };
+
+    /**
+     * Validates a String or a list of Strings
+     */
+    public static Object StringOrStringListValidator = new FieldValidator() {
+
+        private FieldValidator fv = FieldListValidatorFactory(String.class);
+
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException {
+            if (o == null || o instanceof String) {
+                // A null value or a String value is acceptable
+                return;
+            }
+            this.fv.validateField(name, o);
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c970225d/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 93c7df9..01f788b 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -83,3 +83,23 @@
                 (catch Exception e e))))
     (is (thrown-cause? java.lang.IllegalArgumentException
       (.validateField validator "test" 42)))))
+
+(deftest test-worker-childopts-is-string-or-string-list
+  (let [pass-cases [nil "some string" ["some" "string" "list"]]]
+    (testing "worker.childopts validates"
+      (let [validator (CONFIG-SCHEMA-MAP WORKER-CHILDOPTS)]
+        (doseq [value pass-cases]
+          (is (nil? (try
+                      (.validateField validator "test" value)
+                      (catch Exception e e)))))
+        (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" 42)))))
+
+    (testing "topology.worker.childopts validates"
+      (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKER-CHILDOPTS)]
+        (doseq [value pass-cases]
+          (is (nil? (try
+                      (.validateField validator "test" value)
+                      (catch Exception e e)))))
+        (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" 42)))))))