You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/29 22:58:49 UTC
[50/50] git commit: Merge branch 'master' into security-upmerge
Merge branch 'master' into security-upmerge
Conflicts:
bin/storm
conf/defaults.yaml
storm-core/src/clj/backtype/storm/daemon/drpc.clj
storm-core/src/clj/backtype/storm/daemon/logviewer.clj
storm-core/src/clj/backtype/storm/daemon/nimbus.clj
storm-core/src/clj/backtype/storm/daemon/supervisor.clj
storm-core/src/clj/backtype/storm/daemon/worker.clj
storm-core/src/clj/backtype/storm/testing.clj
storm-core/src/clj/backtype/storm/ui/helpers.clj
storm-core/src/clj/backtype/storm/util.clj
storm-core/src/jvm/backtype/storm/Config.java
storm-core/src/jvm/backtype/storm/utils/Utils.java
storm-core/src/ui/public/component.html
storm-core/test/clj/backtype/storm/supervisor_test.clj
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/559c883d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/559c883d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/559c883d
Branch: refs/heads/security
Commit: 559c883d5331362808b7e1ada647cbac76a88ab3
Parents: ff8336b b2a8a77
Author: Robert (Bobby) Evans <bo...@apache.org>
Authored: Tue Jul 29 15:57:03 2014 -0500
Committer: Robert (Bobby) Evans <bo...@apache.org>
Committed: Tue Jul 29 15:57:03 2014 -0500
----------------------------------------------------------------------
BYLAWS.md | 96 ++++
CHANGELOG.md | 31 +
LICENSE | 30 +-
README.markdown | 14 +
STORM-UI-REST-API.md | 567 +++++++++++++++++++
bin/storm | 77 ++-
conf/defaults.yaml | 5 +-
conf/storm_env.ini | 2 +-
dev-tools/github/__init__.py | 109 ++++
dev-tools/jira-github-join.py | 80 +++
dev-tools/jira/__init__.py | 232 ++++++++
examples/storm-starter/README.markdown | 30 +-
.../storm-starter/multilang/resources/storm.py | 2 +-
.../src/jvm/storm/starter/RollingTopWords.java | 62 +-
.../src/jvm/storm/starter/util/StormRunner.java | 9 +
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 9 +-
.../src/jvm/storm/kafka/KafkaConfig.java | 2 +-
.../src/jvm/storm/kafka/Partition.java | 9 +-
pom.xml | 6 +-
.../src/clj/backtype/storm/LocalCluster.clj | 7 +-
storm-core/src/clj/backtype/storm/cluster.clj | 31 +-
.../src/clj/backtype/storm/command/monitor.clj | 37 ++
.../src/clj/backtype/storm/daemon/common.clj | 2 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 6 +-
.../src/clj/backtype/storm/daemon/executor.clj | 4 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 7 +-
.../clj/backtype/storm/daemon/supervisor.clj | 104 +++-
.../src/clj/backtype/storm/daemon/worker.clj | 72 +--
storm-core/src/clj/backtype/storm/disruptor.clj | 2 +-
storm-core/src/clj/backtype/storm/event.clj | 2 +-
storm-core/src/clj/backtype/storm/testing.clj | 49 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 2 +-
.../src/clj/backtype/storm/ui/helpers.clj | 5 -
storm-core/src/clj/backtype/storm/util.clj | 52 +-
storm-core/src/clj/backtype/storm/zookeeper.clj | 25 +
.../src/dev/resources/tester_bolt_metrics.py | 35 ++
.../src/dev/resources/tester_spout_metrics.py | 51 ++
storm-core/src/jvm/backtype/storm/Config.java | 171 +++---
.../jvm/backtype/storm/ConfigValidation.java | 70 +++
.../backtype/storm/messaging/netty/Client.java | 13 +-
.../metric/api/rpc/AssignableShellMetric.java | 30 +
.../metric/api/rpc/CombinedShellMetric.java | 31 +
.../storm/metric/api/rpc/CountShellMetric.java | 38 ++
.../storm/metric/api/rpc/IShellMetric.java | 31 +
.../metric/api/rpc/ReducedShellMetric.java | 32 ++
.../storm/multilang/JsonSerializer.java | 15 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 46 ++
.../src/jvm/backtype/storm/spout/ISpout.java | 2 +-
.../jvm/backtype/storm/spout/ShellSpout.java | 72 ++-
.../src/jvm/backtype/storm/task/ShellBolt.java | 75 ++-
.../backtype/storm/task/TopologyContext.java | 28 +
.../storm/testing/PythonShellMetricsBolt.java | 32 ++
.../storm/testing/PythonShellMetricsSpout.java | 35 ++
.../src/jvm/backtype/storm/utils/Monitor.java | 249 ++++++++
.../jvm/backtype/storm/utils/ShellProcess.java | 46 +-
.../src/jvm/backtype/storm/utils/Utils.java | 23 +-
storm-core/src/multilang/py/storm.py | 30 +-
storm-core/src/multilang/rb/storm.rb | 24 +-
storm-core/src/ui/public/component.html | 3 +-
.../src/ui/public/js/jquery.tablesorter.min.js | 9 +-
storm-core/src/ui/public/js/moment.min.js | 6 +
storm-core/src/ui/public/js/script.js | 9 +
.../test/clj/backtype/storm/cluster_test.clj | 3 +-
.../test/clj/backtype/storm/config_test.clj | 41 +-
.../test/clj/backtype/storm/metrics_test.clj | 206 ++++---
.../test/clj/backtype/storm/supervisor_test.clj | 135 ++++-
66 files changed, 3003 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/bin/storm
----------------------------------------------------------------------
diff --cc bin/storm
index 550e01d,a4aadb1..1b5be36
--- a/bin/storm
+++ b/bin/storm
@@@ -449,8 -468,7 +480,8 @@@ COMMANDS = {"jar": jar, "kill": kill, "
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
- "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version,
- "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}
++ "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor,
+ "upload-credentials": upload_credentials}
def parse_config(config_list):
global CONFIG_OPTS
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index d4283a4,2864adc..05948e1
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -188,6 -150,7 +189,8 @@@ topology.max.error.report.per.interval
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer"
topology.trident.batch.emit.interval.millis: 500
+topology.testing.always.try.serialize: false
+ topology.classpath: null
+ topology.environment: null
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
index f916ec6,f3017ce..eb25a86
--- a/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
+++ b/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java
@@@ -19,6 -19,9 +19,10 @@@ package storm.starter.util
import backtype.storm.Config;
import backtype.storm.LocalCluster;
+ import backtype.storm.StormSubmitter;
+ import backtype.storm.generated.AlreadyAliveException;
++import backtype.storm.generated.AuthorizationException;
+ import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
public final class StormRunner {
@@@ -36,4 -39,9 +40,9 @@@
cluster.killTopology(topologyName);
cluster.shutdown();
}
+
+ public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf)
- throws AlreadyAliveException, InvalidTopologyException {
++ throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ StormSubmitter.submitTopology(topologyName, conf, topology);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 2c6f6f3,8ff5a2c..8ead710
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -25,15 -23,16 +25,17 @@@
(:require [backtype.storm.daemon [common :as common]]))
(defprotocol ClusterState
- (set-ephemeral-node [this path data])
+ (set-ephemeral-node [this path data acls])
(delete-node [this path])
- (create-sequential [this path data])
+ (create-sequential [this path data acls])
;; if node does not exist, create persistent with this data
- (set-data [this path data])
+ (set-data [this path data acls])
(get-data [this path watch?])
+ (get-version [this path watch?])
+ (get-data-with-version [this path watch?])
(get-children [this path watch?])
- (mkdirs [this path])
+ (mkdirs [this path acls])
+ (exists-node? [this path watch?])
(close [this])
(register [this callback])
(unregister [this id]))
@@@ -240,31 -231,31 +252,33 @@@
(into {}))))
;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
-(defn mk-storm-cluster-state
- [cluster-state-spec]
+(defnk mk-storm-cluster-state
+ [cluster-state-spec :acls nil]
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
- [true (mk-distributed-cluster-state cluster-state-spec)])
+ [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls)])
assignment-info-callback (atom {})
+ assignment-info-with-version-callback (atom {})
+ assignment-version-callback (atom {})
supervisors-callback (atom nil)
assignments-callback (atom nil)
storm-base-callback (atom {})
+ credentials-callback (atom {})
state-id (register
- cluster-state
- (fn [type path]
- (let [[subtree & args] (tokenize-path path)]
- (condp = subtree
+ cluster-state
+ (fn [type path]
+ (let [[subtree & args] (tokenize-path path)]
+ (condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
- (issue-callback! assignments-callback)
- (issue-map-callback! assignment-info-callback (first args)))
+ (issue-callback! assignments-callback)
+ (issue-map-callback! assignment-info-callback (first args)))
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
+ CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
;; this should never happen
- (halt-process! 30 "Unknown callback for subtree " subtree args)))))]
+ (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
- (mkdirs cluster-state p))
+ (mkdirs cluster-state p acls))
(reify
StormClusterState
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/drpc.clj
index 1340e49,3527b7c..68128c3
--- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj
@@@ -199,48 -133,37 +199,48 @@@
(let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
- service-handler (service-handler)
-
- ;; Requests and returns need to be on separate thread pools, since
- ;; calls to "execute" don't unblock until other thrift methods are
- ;; called. So if 64 threads are calling execute, the server won't
- ;; accept the result invocations that will unblock those threads.
-
- handler-server
- (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
- (THsHaServer$Args.)
- (.workerThreads 64)
- (.executorService
- (ThreadPoolExecutor.
- worker-threads worker-threads 60 TimeUnit/SECONDS
- (ArrayBlockingQueue. queue-size)))
- (.protocolFactory (TBinaryProtocol$Factory.))
- (.processor (DistributedRPC$Processor. service-handler))))
-
- invoke-server
- (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT)))
- (THsHaServer$Args.)
- (.workerThreads 64)
- (.protocolFactory (TBinaryProtocol$Factory.))
- (.processor
- (DistributedRPCInvocations$Processor. service-handler))))]
+ drpc-http-port (int (conf DRPC-HTTP-PORT))
+ drpc-port (int (conf DRPC-PORT))
+ drpc-service-handler (service-handler conf)
+ ;; requests and returns need to be on separate thread pools, since calls to
+ ;; "execute" don't unblock until other thrift methods are called. So if
+ ;; 64 threads are calling execute, the server won't accept the result
+ ;; invocations that will unblock those threads
+ handler-server (when (> drpc-port 0)
+ (ThriftServer. conf
+ (DistributedRPC$Processor. drpc-service-handler)
+ ThriftConnectionType/DRPC))
+ invoke-server (ThriftServer. conf
+ (DistributedRPCInvocations$Processor. drpc-service-handler)
+ ThriftConnectionType/DRPC_INVOCATIONS)
+ http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
- (.addShutdownHook (Runtime/getRuntime) (Thread. (fn []
- (if handler-server (.stop handler-server))
- (.stop invoke-server))))
+ (add-shutdown-hook-with-force-kill-in-1-sec (fn []
- (.stop handler-server)
++ (if handler-server (.stop handler-server))
+ (.stop invoke-server)))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
- (.serve handler-server))))
+ (when (> drpc-http-port 0)
+ (let [app (webapp drpc-service-handler http-creds-handler)
+ filter-class (conf DRPC-HTTP-FILTER)
+ filter-params (conf DRPC-HTTP-FILTER-PARAMS)
+ filters-confs [{:filter-class filter-class
+ :filter-params filter-params}]
+ https-port (int (conf DRPC-HTTPS-PORT))
+ https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH)
+ https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD)
+ https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE)]
+
+ (run-jetty app
+ {:port drpc-http-port :join? false
+ :configurator (fn [server]
+ (config-ssl server
+ https-port
+ https-ks-path
+ https-ks-password
+ https-ks-type)
+ (config-filter server app filters-confs))})))
+ (when handler-server
+ (.serve handler-server)))))
(defn -main []
(launch-server!))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e2952d0,6265479..44ccc6c
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -81,11 -73,9 +81,11 @@@
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
- (halt-process! 20 "Error when processing an event")
+ (exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
+ :id->sched-status (atom {})
+ :cred-renewers (AuthUtils/GetCredentialRenewers conf)
}))
(defn inbox [nimbus]
@@@ -1304,13 -1148,18 +1304,14 @@@
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)
- ;;TODO need to honor NIMBUS-THRIFT-MAX-BUFFER-SIZE for different transports
- options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
- (THsHaServer$Args.)
- (.workerThreads 64)
- (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
- (.processor (Nimbus$Processor. service-handler))
- )
- server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
+ server (ThriftServer. conf (Nimbus$Processor. service-handler)
+ ThriftConnectionType/NIMBUS)]
- (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
+ (add-shutdown-hook-with-force-kill-in-1-sec (fn []
+ (.shutdown service-handler)
+ (.stop server)))
(log-message "Starting Nimbus server...")
- (.serve server)))
+ (.serve server)
+ service-handler))
;; distributed implementation
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 2fe7fb8,cfa8f85..d8ff6b5
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -14,8 -14,8 +14,9 @@@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
+ (:import [java.io OutputStreamWriter BufferedWriter IOException])
- (:import [backtype.storm.scheduler ISupervisor])
+ (:import [backtype.storm.scheduler ISupervisor]
+ [java.net JarURLConnection])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]])
@@@ -235,22 -190,16 +253,26 @@@
(when thread-pid
(psim/kill-process thread-pid))
(doseq [pid pids]
- (kill-process-with-sig-term pid))
+ (if as-user
++ (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -15 " pid))
++ (kill-process-with-sig-term pid)))
+ (if-not (empty? pids) (sleep-secs 1)) ;; allow 1 second for execution of cleanup threads on worker.
+ (doseq [pid pids]
- (force-kill-process pid)
- (try
- (rmpath (worker-pid-path conf id pid))
- (catch Exception e))) ;; on windows, the supervisor may still holds the lock on the worker directory
- (try-cleanup-worker conf id))
++ (if as-user
+ (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
- (ensure-process-killed! pid))
++ (force-kill-process pid))
+ (if as-user
- (rmr-as-user conf id user (worker-pid-path conf id pid))
++ (rmr-as-user conf id user (worker-pid-path conf id pid))
+ (try
+ (rmpath (worker-pid-path conf id pid))
- (catch Exception e)) ;; on windows, the supervisor may still holds the lock on the worker directory
- ))
++ (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
+ (try-cleanup-worker conf id user))
(log-message "Shut down " (:supervisor-id supervisor) ":" id))
+(def SUPERVISOR-ZK-ACLS
+ [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+ (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
+
(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
{:conf conf
:shared-context shared-context
@@@ -582,14 -495,20 +612,23 @@@
jlp (jlp stormroot conf)
stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id)
- classpath (add-to-classpath (current-classpath) [stormjar])
+ topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
+ [cp]
+ [])
+ classpath (-> (current-classpath)
+ (add-to-classpath [stormjar])
+ (add-to-classpath topo-classpath))
+ top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
+ gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ logfilename (logs-filename storm-id port)
-
- worker-childopts (substitute-childopts (conf WORKER-CHILDOPTS) worker-id storm-id port)
- topo-worker-childopts (substitute-childopts (storm-conf TOPOLOGY-WORKER-CHILDOPTS) worker-id storm-id port)
+ worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
- (substitute-worker-childopts s port))
++ (substitute-childopts s worker-id storm-id port))
+ topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
- (substitute-worker-childopts s port))
++ (substitute-childopts s worker-id storm-id port))
+ topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)]
+ (merge env {"LD_LIBRARY_PATH" jlp})
+ {"LD_LIBRARY_PATH" jlp})
- logfilename (str "worker-" port ".log")
command (concat
[(java-cmd) "-server"]
worker-childopts
@@@ -608,21 -526,13 +647,20 @@@
(:assignment-id supervisor)
port
worker-id])
- command (->> command (map str) (filter (complement empty?)))
- shell-cmd (->> command
- (map #(str \' (clojure.string/escape % {\' "\\'"}) \'))
- (clojure.string/join " "))]
- (log-message "Launching worker with command: " shell-cmd)
- (launch-process command :environment topology-worker-environment)
- ))
+ command (->> command (map str) (filter (complement empty?)))]
-
+ (log-message "Launching worker with command: " (shell-cmd command))
+ (write-log-metadata! storm-conf user worker-id storm-id port conf)
+ (set-worker-user! conf worker-id user)
+ (let [log-prefix (str "Worker Process " worker-id)
+ callback (fn [exit-code]
+ (log-message log-prefix " exited with code: " exit-code)
+ (add-dead-worker worker-id))]
+ (remove-dead-worker worker-id)
+ (if run-worker-as-user
+ (let [worker-dir (worker-root conf worker-id)]
- (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment {"LD_LIBRARY_PATH" jlp})] :log-prefix log-prefix :exit-code-callback callback))
- (launch-process command :environment {"LD_LIBRARY_PATH" jlp} :log-prefix log-prefix :exit-code-callback callback)
++ (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback))
++ (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback)
+ ))))
;; local implementation
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index d09ea35,aeabdf6..f1acdec
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -190,17 -175,22 +191,19 @@@
)
:timer-name timer-name))
- (defn recursive-map-worker-data [conf mq-context storm-id assignment-id port
- storm-conf
- worker-id
- cluster-state
- storm-cluster-state
- executors
- transfer-queue
- executor-receive-queue-map
- receive-queue-map
- topology]
- (recursive-map
-(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
++(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
+ (let [assignment-versions (atom {})
- cluster-state (cluster/mk-distributed-cluster-state conf)
- storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
- storm-conf (read-supervisor-storm-conf conf storm-id)
+ executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
+ transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+ :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
+ executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+
+ receive-queue-map (->> executor-receive-queue-map
+ (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
+ (into {}))
+
+ topology (read-supervisor-topology conf storm-id)]
+ (recursive-map
:conf conf
:mq-context (if mq-context
mq-context
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/testing.clj
index 39f3759,54f40e0..f603086
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@@ -208,8 -208,8 +213,8 @@@
supervisors
; because a worker may already be dead
workers)]
- (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
+ (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
- (Thread/sleep 10)
+ (Thread/sleep (rand-int 20))
;; (doseq [d daemons]
;; (if-not ((memfn waiting?) d)
;; (println d)))
@@@ -481,10 -483,7 +490,10 @@@
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
(let [storm-id (common/get-storm-id state storm-name)]
+ ;;Give the topology time to come up without using it to wait for the spouts to complete
+ (simulate-wait cluster-map)
+
- (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
+ (while-timeout timeout-ms (not (every? exhausted? (spout-objects spouts)))
(simulate-wait cluster-map))
(.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
@@@ -583,22 -582,25 +592,24 @@@
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
([tracked-topology]
- (tracked-wait tracked-topology 1))
+ (tracked-wait tracked-topology 1 TEST-TIMEOUT-MS))
([tracked-topology amt]
+ (tracked-wait tracked-topology amt TEST-TIMEOUT-MS))
+ ([tracked-topology amt timeout-ms]
- (let [target (+ amt @(:last-spout-emit tracked-topology))
- track-id (-> tracked-topology :cluster ::track-id)
- waiting? (fn []
- (or (not= target (global-amt track-id "spout-emitted"))
- (not= (global-amt track-id "transferred")
- (global-amt track-id "processed"))
- ))]
- (while-timeout TEST-TIMEOUT-MS (waiting?)
- ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
- ;; (println "Processed: " (global-amt track-id "processed"))
- ;; (println "Transferred: " (global-amt track-id "transferred"))
- (Thread/sleep 500))
- (reset! (:last-spout-emit tracked-topology) target))))
-
-(defnk test-tuple
+ (let [target (+ amt @(:last-spout-emit tracked-topology))
+ track-id (-> tracked-topology :cluster ::track-id)
+ waiting? (fn []
+ (or (not= target (global-amt track-id "spout-emitted"))
+ (not= (global-amt track-id "transferred")
+ (global-amt track-id "processed"))))]
- (while-timeout TEST-TIMEOUT-MS (waiting?)
++ (while-timeout timeout-ms (waiting?)
+ ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
+ ;; (println "Processed: " (global-amt track-id "processed"))
+ ;; (println "Transferred: " (global-amt track-id "transferred"))
+ (Thread/sleep (rand-int 200)))
+ (reset! (:last-spout-emit tracked-topology) target))))
+
+(defnk test-tuple
[values
:stream Utils/DEFAULT_STREAM_ID
:component "component"
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 3c1be09,13e4d41..ec9759c
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -728,10 -688,10 +728,10 @@@
reverse)]
{"componentErrors"
(for [^ErrorInfo e errors]
- {"time" (date-str (.get_error_time_secs e))
+ {"time" (* 1000 (long (.get_error_time_secs e)))
"errorHost" (.get_host e)
"errorPort" (.get_port e)
- "errorWorkerLogLink" (worker-log-link (.get_host e) (.get_port e))
+ "errorWorkerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)
"error" (.get_error e)})}))
(defn spout-stats
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/util.clj
index ed2e30b,3df25b7..0173081
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@@ -398,25 -411,30 +414,35 @@@
(catch ExecuteException e
(log-message "Error when trying to kill " pid ". Process is probably already dead."))))
+(defn read-and-log-stream
+ [prefix stream]
+ (try
+ (let [reader (BufferedReader. (InputStreamReader. stream))]
+ (loop []
+ (if-let [line (.readLine reader)]
+ (do
+ (log-warn (str prefix ":" line))
+ (recur)))))
+ (catch IOException e
+ (log-warn "Error while trying to log stream" e))))
+
- (defn sleep-secs [secs]
- (when (pos? secs)
- (Time/sleep (* (long secs) 1000))))
+ (defn force-kill-process
+ [pid]
+ (send-signal-to-process pid sig-kill))
- (defn sleep-until-secs [target-secs]
- (Time/sleepUntil (* (long target-secs) 1000)))
+ (defn kill-process-with-sig-term
+ [pid]
+ (send-signal-to-process pid sig-term))
+
+ (defn add-shutdown-hook-with-force-kill-in-1-sec
+ "adds the user supplied function as a shutdown hook for cleanup.
+ Also adds a function that sleeps for a second and then sends kill -9 to process to avoid any zombie process in case
+ cleanup function hangs."
+ [func]
+ (.addShutdownHook (Runtime/getRuntime) (Thread. #(func)))
+ (.addShutdownHook (Runtime/getRuntime) (Thread. #((sleep-secs 1)
+ (.halt (Runtime/getRuntime) 20)))))
-(defnk launch-process [command :environment {}]
- (let [builder (ProcessBuilder. command)
- process-env (.environment builder)]
- (doseq [[k v] environment]
- (.put process-env k v))
- (.start builder)))
-
(defprotocol SmartThread
(start [this])
(join [this])
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index ea54313,ac8b6b6..d6b45ea
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -17,10 -17,8 +17,9 @@@
*/
package backtype.storm;
- import backtype.storm.ConfigValidation;
import backtype.storm.serialization.IKryoDecorator;
import backtype.storm.serialization.IKryoFactory;
+import backtype.storm.utils.Utils;
import com.esotericsoftware.kryo.Serializer;
import java.util.ArrayList;
import java.util.HashMap;
@@@ -66,20 -57,20 +65,20 @@@ public class Config extends HashMap<Str
/**
* Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
*/
- public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
- public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
+ public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
+ public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * Netty based messaging: The min # of milliseconds that a peer will wait.
+ * Netty based messaging: The min # of milliseconds that a peer will wait.
*/
- public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
- public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
+ public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
+ public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * Netty based messaging: The max # of milliseconds that a peer will wait.
+ * Netty based messaging: The max # of milliseconds that a peer will wait.
*/
- public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
- public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
+ public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
+ public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Netty based messaging: The # of worker threads for the server.
@@@ -103,9 -94,9 +102,8 @@@
* We check with this interval that whether the Netty channel is writable and try to write pending messages
*/
public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
- public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
-
-
+ public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
+
-
/**
* A list of hosts of ZooKeeper servers used to manage the cluster.
*/
@@@ -234,10 -200,10 +232,10 @@@
* The ceiling of the interval between retries of a Zookeeper operation.
*/
public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
- public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
+ public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
+ * The cluster Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
*/
public static final String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = String.class;
@@@ -307,35 -231,15 +305,35 @@@
* connect to this port to upload jars and submit topologies.
*/
public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
- public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;
+ public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
+ * The number of threads that should be used by the nimbus thrift server.
+ */
+ public static final String NIMBUS_THRIFT_THREADS = "nimbus.thrift.threads";
+ public static final Object NIMBUS_THRIFT_THREADS_SCHEMA = Number.class;
+
+ /**
+ * A list of users that are cluster admins and can run any command. To use this set
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ */
+ public static final String NIMBUS_ADMINS = "nimbus.admins";
+ public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator;
+
+ /**
+ * A list of users that run the supervisors and should be authorized to interact with
+ * nimbus as a supervisor would. To use this set
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ */
+ public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
+ public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator;
+
+ /**
* The maximum buffer size thrift should use when reading messages.
*/
public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
- public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = Number.class;
+ public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
-
/**
* This parameter is used by the storm-deploy project to configure the
* jvm options for the nimbus daemon.
@@@ -395,10 -299,10 +393,10 @@@
* to launching new JVM's and configuring them.</p>
*/
public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
- public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
+ public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * Whether or not nimbus should reassign tasks if it detects that a task goes down.
+ * Whether or not nimbus should reassign tasks if it detects that a task goes down.
* Defaults to true, and it's not recommended to change this value.
*/
public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@@ -565,77 -369,27 +563,77 @@@
* This port is used by Storm DRPC for receiving DPRC requests from clients.
*/
public static final String DRPC_PORT = "drpc.port";
- public static final Object DRPC_PORT_SCHEMA = Number.class;
+ public static final Object DRPC_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * DRPC thrift server worker threads
+ * Class name for authorization plugin for DRPC client
+ */
+ public static final String DRPC_AUTHORIZER = "drpc.authorizer";
+ public static final Object DRPC_AUTHORIZER_SCHEMA = String.class;
+
+ /**
+ * The Access Control List for the DRPC Authorizer.
+ * @see DRPCSimpleAclAuthorizer
+ */
+ public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
+ public static final Object DRPC_AUTHORIZER_ACL_SCHEMA = Map.class;
+
+ /**
+ * File name of the DRPC Authorizer ACL.
+ * @see DRPCSimpleAclAuthorizer
+ */
+ public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename";
+ public static final Object DRPC_AUTHORIZER_ACL_FILENAME_SCHEMA = String.class;
+
+ /**
+ * Whether the DRPCSimpleAclAuthorizer should deny requests for operations
+ * involving functions that have no explicit ACL entry. When set to false
+ * (the default) DRPC functions that have no entry in the ACL will be
+ * permitted, which is appropriate for a development environment. When set
+ * to true, explicit ACL entries are required for every DRPC function, and
+ * any request for functions will be denied.
+ * @see DRPCSimpleAclAuthorizer
+ */
+ public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict";
+ public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class;
+
+ /**
+ * DRPC thrift server worker threads
*/
public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
- public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
+ public static final Object DRPC_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * DRPC thrift server queue size
+ * The maximum buffer size thrift should use when reading messages for DRPC.
+ */
+ public static final String DRPC_MAX_BUFFER_SIZE = "drpc.max_buffer_size";
+ public static final Object DRPC_MAX_BUFFER_SIZE_SCHEMA = Number.class;
+
+ /**
+ * DRPC thrift server queue size
*/
public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
- public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
+ public static final Object DRPC_QUEUE_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
+ * The DRPC invocations transport plug-in for Thrift client/server communication
+ */
+ public static final String DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN = "drpc.invocations.thrift.transport";
+ public static final Object DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
+
+ /**
+ * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
- public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
+ public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = ConfigValidation.IntegerValidator;
/**
+ * DRPC invocations thrift server worker threads
+ */
+ public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
+ public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
+
+ /**
* The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also
* timeout based on the socket timeout on the DRPC client, and separately based on the topology message
* timeout for the topology implementing the DRPC function.
@@@ -672,32 -414,8 +670,32 @@@
* how many workers run on each machine.
*/
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
- public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
+ public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.IntegersValidator;
+ /**
+ * A number representing the maximum number of workers any single topology can acquire.
+ */
+ public static final String NIMBUS_SLOTS_PER_TOPOLOGY = "nimbus.slots.perTopology";
+ public static final Object NIMBUS_SLOTS_PER_TOPOLOGY_SCHEMA = Number.class;
+
+ /**
+ * A class implementing javax.servlet.Filter for DRPC HTTP requests
+ */
+ public static final String DRPC_HTTP_FILTER = "drpc.http.filter";
+ public static final Object DRPC_HTTP_FILTER_SCHEMA = String.class;
+
+ /**
+ * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
+ * service
+ */
+ public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params";
+ public static final Object DRPC_HTTP_FILTER_PARAMS_SCHEMA = Map.class;
+
+ /**
+ * A number representing the maximum number of executors any single topology can acquire.
+ */
+ public static final String NIMBUS_EXECUTORS_PER_TOPOLOGY = "nimbus.executors.perTopology";
+ public static final Object NIMBUS_EXECUTORS_PER_TOPOLOGY_SCHEMA = Number.class;
/**
* This parameter is used by the storm-deploy project to configure the
@@@ -711,8 -430,9 +709,8 @@@
* restart the worker process.
*/
public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
- public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = Number.class;
+ public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
-
/**
* How long a worker can go without heartbeating during the initial launch before
* the supervisor tries to restart the worker process. This value override
@@@ -720,8 -440,9 +718,8 @@@
* overhead to starting and configuring the JVM on launch.
*/
public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
- public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = Number.class;
+ public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
-
/**
* Whether or not the supervisor should launch workers assigned to it. Defaults
* to true -- and you should probably never change this value. This configuration
@@@ -742,25 -464,11 +740,25 @@@
* need to be restarted.
*/
public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
- public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
+ public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
+ * Should the supervior try to run the worker as the lauching user or not. Defaults to false.
+ */
+ public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
+ public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
+
+ /**
+ * Full path to the worker-laucher executable that will be used to lauch workers when
+ * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
+ */
+ public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
+ public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
+
+ /**
* The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
- * with an identifier for this worker.
+ * with an identifier for this worker. Also, "%WORKER-ID%", "%STORM-ID%" and "%WORKER-PORT%" are
+ * replaced with appropriate runtime values for this worker.
*/
public static final String WORKER_CHILDOPTS = "worker.childopts";
public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
@@@ -800,22 -500,9 +798,22 @@@
* come through.
*/
public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
- public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = Number.class;
+ public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
+ /**
+ * How often a task should sync credentials, worst case.
+ */
+ public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
+ public static final Object TASK_CREDENTIALS_POLL_SECS_SCHEMA = Number.class;
+
+
+ /**
+ * A list of users that are allowed to interact with the topology. To use this set
+ * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+ */
+ public static final String TOPOLOGY_USERS = "topology.users";
+ public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator;
/**
* True if Storm should timeout messages or not. Defaults to true. This is meant to be used
@@@ -936,15 -623,15 +934,15 @@@
/**
- * The maximum number of tuples that can be pending on a spout task at any given time.
- * This config applies to individual tasks, not to spouts or topologies as a whole.
- *
+ * The maximum number of tuples that can be pending on a spout task at any given time.
+ * This config applies to individual tasks, not to spouts or topologies as a whole.
+ *
* A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
- * Note that this config parameter has no effect for unreliable spouts that don't tag
+ * Note that this config parameter has no effect for unreliable spouts that don't tag
* their tuples with a message id.
*/
- public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
- public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
+ public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
+ public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
/**
* A class that implements a strategy for what to do when a spout needs to wait. Waiting is
@@@ -973,13 -660,13 +971,13 @@@
* The percentage of tuples to sample to produce stats for a task.
*/
public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
- public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
+ public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = ConfigValidation.DoubleValidator;
/**
- * The time period that builtin metrics data in bucketed into.
+ * The time period that builtin metrics data in bucketed into.
*/
public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
- public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
+ public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator;
/**
* Whether or not to use Java serialization in a topology.
@@@ -994,12 -681,19 +992,25 @@@
public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
+ * Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS.
+ */
+ public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts";
+ public static final Object TOPOLOGY_WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
+
+ /**
+ * Topology-specific classpath for the worker child process. This is combined to the usual classpath.
+ */
+ public static final String TOPOLOGY_CLASSPATH="topology.classpath";
+ public static final Object TOPOLOGY_CLASSPATH_SCHEMA = ConfigValidation.StringOrStringListValidator;
+
+ /**
+ * Topology-specific environment variables for the worker child process.
+ * This is added to the existing environment (that of the supervisor)
+ */
+ public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
+ public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class;
+
+ /**
* This config is available for TransactionalSpouts, and contains the id ( a String) for
* the transactional topology. This id is used to store the state of the transactional
* topology in Zookeeper.
@@@ -1061,10 -755,10 +1072,10 @@@
* via the TopologyContext.
*/
public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
- public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
+ public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
/**
- * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
+ * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
* an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
* reported to Zookeeper per task for every 10 second interval of time.
*/
@@@ -1183,25 -853,27 +1194,41 @@@
* to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
*/
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
- public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = Map.class;
+ public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
+
+ /**
+ * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler
+ * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+ */
+ public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools";
+ public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = ConfigValidation.MapOfStringToNumberValidator;
+
+ /**
+ * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
+ * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+ */
+ public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
+ public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
+ public static void setClasspath(Map conf, String cp) {
+ conf.put(Config.TOPOLOGY_CLASSPATH, cp);
+ }
+
+ public void setClasspath(String cp) {
+ setClasspath(this, cp);
+ }
+
+ public static void setEnvironment(Map conf, Map env) {
+ conf.put(Config.TOPOLOGY_ENVIRONMENT, env);
+ }
+
+ public void setEnvironment(Map env) {
+ setEnvironment(this, env);
+ }
+
public static void setDebug(Map conf, boolean isOn) {
conf.put(Config.TOPOLOGY_DEBUG, isOn);
- }
+ }
public void setDebug(boolean isOn) {
setDebug(this, isOn);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/ConfigValidation.java
index e990921,156ccf8..14394a0
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@@ -177,9 -79,79 +177,79 @@@ public class ConfigValidation
/**
* Validates is a list of Maps.
*/
- public static Object MapsValidator = FieldListValidatorFactory(Map.class);
+ public static Object MapsValidator = listFv(Map.class, true);
/**
+ * Validates a Integer.
+ */
+ public static Object IntegerValidator = new FieldValidator() {
+ @Override
+ public void validateField(String name, Object o) throws IllegalArgumentException {
+ if (o == null) {
+ // A null value is acceptable.
+ return;
+ }
+ final long i;
+ if (o instanceof Number &&
+ (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) {
+ if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
+ return;
+ }
+ }
+
+ throw new IllegalArgumentException("Field " + name + " must be an Integer within type range.");
+ }
+ };
+
+ /**
+ * Validates is a list of Integers.
+ */
+ public static Object IntegersValidator = new FieldValidator() {
+ @Override
+ public void validateField(String name, Object field)
+ throws IllegalArgumentException {
+ if (field == null) {
+ // A null value is acceptable.
+ return;
+ }
+ if (field instanceof Iterable) {
+ for (Object o : (Iterable)field) {
+ final long i;
+ if (o instanceof Number &&
+ ((i = ((Number)o).longValue()) == ((Number)o).doubleValue()) &&
+ (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE)) {
+ // pass the test
+ } else {
+ throw new IllegalArgumentException(
+ "Each element of the list " + name + " must be an Integer within type range.");
+ }
+ }
+ return;
+ }
+ }
+ };
+
+ /**
+ * Validates a Double.
+ */
+ public static Object DoubleValidator = new FieldValidator() {
+ @Override
+ public void validateField(String name, Object o) throws IllegalArgumentException {
+ if (o == null) {
+ // A null value is acceptable.
+ return;
+ }
+
+ // we can provide a lenient way to convert int/long to double with losing some precision
+ if (o instanceof Number) {
+ return;
+ }
+
+ throw new IllegalArgumentException("Field " + name + " must be an Double.");
+ }
+ };
+
+ /**
* Validates a power of 2.
*/
public static Object PowerOf2Validator = new FieldValidator() {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/Utils.java
index 87f1654,364b53f..fff91e6
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@@ -323,18 -314,19 +323,21 @@@ public class Utils
if (null == o) {
return defaultValue;
}
-
- if(o instanceof Long) {
- return ((Long) o ).intValue();
- } else if (o instanceof Integer) {
- return (Integer) o;
- } else if (o instanceof Short) {
- return ((Short) o).intValue();
+
+ if (o instanceof Integer ||
+ o instanceof Short ||
+ o instanceof Byte) {
+ return ((Number) o).intValue();
+ } else if (o instanceof Long) {
+ final long l = (Long) o;
+ if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+ return (int) l;
+ }
+ } else if (o instanceof String) {
+ return Integer.parseInt((String) o);
- } else {
- throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
}
+
+ throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
public static boolean getBoolean(Object o, boolean defaultValue) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/ui/public/component.html
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/component.html
index 092eb40,6353142..803fffb
--- a/storm-core/src/ui/public/component.html
+++ b/storm-core/src/ui/public/component.html
@@@ -26,10 -26,9 +26,11 @@@
<script src="/js/purl.js" type="text/javascript"></script>
<script src="/js/bootstrap-twipsy.js" type="text/javascript"></script>
<script src="/js/script.js" type="text/javascript"></script>
+ <script src="/js/moment.min.js" type="text/javascript"></script>
</head>
<body>
+<div id="ui-user">
+</div>
<h1><a href="/">Storm UI</a></h1>
<div id="component-summary">
</div>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/src/ui/public/js/script.js
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/cluster_test.clj
index f30c6a8,63efd30..7ed1028
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@@ -220,10 -206,11 +220,11 @@@
(with-inprocess-zookeeper zk-port
(with-simulated-time
(let [state (mk-storm-state zk-port)]
- (.report-error state "a" "1"(local-hostname) 6700 (RuntimeException.))
+ (.report-error state "a" "1" (local-hostname) 6700 (RuntimeException.))
(validate-errors! state "a" "1" ["RuntimeException"])
+ (advance-time-secs! 1)
(.report-error state "a" "1" (local-hostname) 6700 (IllegalArgumentException.))
- (validate-errors! state "a" "1" ["RuntimeException" "IllegalArgumentException"])
+ (validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
(doseq [i (range 10)]
(.report-error state "a" "2" (local-hostname) 6700 (RuntimeException.))
(advance-time-secs! 2))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/559c883d/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/supervisor_test.clj
index 3dcd275,584f0d9..6b40060
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@@ -311,100 -303,39 +313,137 @@@
mock-worker-id)
(verify-first-call-args-for-indices launch-process
[0]
- exp-args)))))))
+ exp-args))))
+ (testing "testing topology.classpath is added to classpath"
+ (let [topo-cp "/any/path"
+ exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
+ supervisor-stormdist-root nil
+ supervisor/jlp nil
++ set-worker-user! nil
++ supervisor/write-log-metadata! nil
+ launch-process nil
+ current-classpath "/base"]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-args))))
+ (testing "testing topology.environment is added to environment for worker launch"
+ (let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
++ full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
+ exp-args (exp-args-fn [] [] mock-cp)
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
+ supervisor-stormdist-root nil
+ supervisor/jlp nil
+ launch-process nil
++ set-worker-user! nil
++ supervisor/write-log-metadata! nil
+ current-classpath "/base"]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id)
+ (verify-first-call-args-for-indices launch-process
+ [2]
- (merge topo-env {"LD_LIBRARY_PATH" nil}))))))))
++ full-env)))))))
+
+(defn rm-r [f]
+ (if (.isDirectory f)
+ (for [sub (.listFiles f)] (rm-r sub))
+ (.delete f)
+ ))
+
+(deftest test-worker-launch-command-run-as-user
+ (testing "*.worker.childopts configuration"
+ (let [mock-port "42"
+ mock-storm-id "fake-storm-id"
+ mock-worker-id "fake-worker-id"
+ mock-cp "mock-classpath'quote-on-purpose"
+ storm-local (str "/tmp/" (UUID/randomUUID))
+ worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
+ exp-launch ["/bin/worker-launcher"
+ "me"
+ "worker"
+ (str storm-local "/workers/" mock-worker-id)
+ worker-script]
+ exp-script-fn (fn [opts topo-opts]
+ (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java' '-server'"
+ " " (shell-cmd opts)
+ " " (shell-cmd topo-opts)
+ " '-Djava.library.path='"
+ " '-Dlogfile.name=" mock-storm-id "-worker-" mock-port ".log'"
+ " '-Dstorm.home='"
+ " '-Dlogback.configurationFile=/logback/cluster.xml'"
+ " '-Dstorm.id=" mock-storm-id "'"
+ " '-Dworker.id=" mock-worker-id "'"
+ " '-Dworker.port=" mock-port "'"
+ " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
+ " 'backtype.storm.daemon.worker'"
+ " '" mock-storm-id "'"
+ " '" mock-port "'"
+ " '" mock-worker-id "';"))]
+ (.mkdirs (io/file storm-local "workers" mock-worker-id))
+ (try
+ (testing "testing *.worker.childopts as strings with extra spaces"
+ (let [string-opts "-Dfoo=bar -Xmx1024m"
+ topo-string-opts "-Dkau=aux -Xmx2048m"
+ exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
+ ["-Dkau=aux" "-Xmx2048m"])
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+ STORM-LOCAL-DIR storm-local
+ SUPERVISOR-RUN-WORKER-AS-USER true
+ WORKER-CHILDOPTS string-opts}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-string-opts
+ TOPOLOGY-SUBMITTER-USER "me"}
+ add-to-classpath mock-cp
+ supervisor-stormdist-root nil
+ launch-process nil
+ set-worker-user! nil
+ supervisor/java-cmd "java"
+ supervisor/jlp nil
+ supervisor/write-log-metadata! nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-launch))
+ (is (= (slurp worker-script) exp-script))))
+ (testing "testing *.worker.childopts as list of strings, with spaces in values"
+ (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
+ topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
+ exp-script (exp-script-fn list-opts topo-list-opts)
+ mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+ STORM-LOCAL-DIR storm-local
+ SUPERVISOR-RUN-WORKER-AS-USER true
+ WORKER-CHILDOPTS list-opts}}]
+ (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+ topo-list-opts
+ TOPOLOGY-SUBMITTER-USER "me"}
+ add-to-classpath mock-cp
+ supervisor-stormdist-root nil
+ launch-process nil
+ set-worker-user! nil
+ supervisor/java-cmd "java"
+ supervisor/jlp nil
+ supervisor/write-log-metadata! nil]
+ (supervisor/launch-worker mock-supervisor
+ mock-storm-id
+ mock-port
+ mock-worker-id)
+ (verify-first-call-args-for-indices launch-process
+ [0]
+ exp-launch))
+ (is (= (slurp worker-script) exp-script))))
+(finally (rm-r (io/file storm-local)))
+))))
(deftest test-workers-go-bananas
;; test that multiple workers are started for a port, and test that
@@@ -422,142 -353,59 +461,198 @@@
;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
)
+(deftest test-supervisor-data-acls
+ (testing "supervisor-data uses correct ACLs"
+ (let [scheme "digest"
+ digest "storm:thisisapoorpassword"
+ auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
+ STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
+ expected-acls supervisor/SUPERVISOR-ZK-ACLS
+ fake-isupervisor (reify ISupervisor
+ (getSupervisorId [this] nil)
+ (getAssignmentId [this] nil))]
+ (stubbing [uptime-computer nil
+ cluster/mk-storm-cluster-state nil
+ supervisor-state nil
+ local-hostname nil
+ mk-timer nil]
+ (supervisor/supervisor-data auth-conf nil fake-isupervisor)
+ (verify-call-times-for cluster/mk-storm-cluster-state 1)
+ (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
+ expected-acls)))))
+
+(deftest test-write-log-metadata
+ (testing "supervisor writes correct data to logs metadata file"
+ (let [exp-owner "alice"
+ exp-worker-id "42"
+ exp-storm-id "0123456789"
+ exp-port 4242
+ exp-logs-users ["bob" "charlie" "daryl"]
+ storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+ TOPOLOGY-USERS ["charlie" "bob"]
+ LOGS-USERS ["daryl"]}
+ exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+ "worker-id" exp-worker-id
+ LOGS-USERS exp-logs-users}
+ conf {}]
+ (mocking [supervisor/write-log-metadata-to-yaml-file!]
+ (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+ exp-storm-id exp-port conf)
+ (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+ exp-storm-id exp-port exp-data conf)))))
+
+(deftest test-worker-launcher-requires-user
+ (testing "worker-launcher throws on blank user"
+ (mocking [launch-process]
+ (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
+ #"(?i).*user cannot be blank.*"
+ (supervisor/worker-launcher {} nil ""))))))
+
+(defn found? [sub-str input-str]
+ (if (string? input-str)
+ (contrib-str/substring? sub-str (str input-str))
+ (some? #(contrib-str/substring? sub-str %) input-str)))
+
+(defn not-found? [sub-str input-str]
+ (complement (found? sub-str input-str)))
+
+(deftest test-substitute-childopts-happy-path
+ (testing "worker-launcher replaces ids in childopts"
+ (let [ worker-id "w-01"
+ storm-id "s-01"
+ port 9999
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+ ]
+ (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
+ (is (not-found? "%WORKER-ID%" childopts-with-ids))
+ (is (found? "w-01" childopts-with-ids))
+ (is (not-found? "%STORM-ID%" childopts-with-ids))
+ (is (found? "s-01" childopts-with-ids))
+ (is (not-found? "%WORKER-PORT%" childopts-with-ids))
+ (is (found? "-9999." childopts-with-ids))
+ (is (not-found? "%ID%" childopts-with-ids))
+ (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
+ )))
+
+(deftest test-substitute-childopts-storm-id-alone
+ (testing "worker-launcher replaces ids in childopts"
+ (let [ worker-id "w-01"
+ storm-id "s-01"
+ port 9999
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%STORM-ID%.log"]
+ (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
+ (is (not-found? "%WORKER-ID%" childopts-with-ids))
+ (is (not-found? "w-01" childopts-with-ids))
+ (is (not-found? "%STORM-ID%" childopts-with-ids))
+ (is (found? "s-01" childopts-with-ids))
+ (is (not-found? "%WORKER-PORT%" childopts-with-ids))
+ (is (not-found? "-9999." childopts-with-ids))
+ (is (not-found? "%ID%" childopts-with-ids))
+ (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) )))
+
+(deftest test-substitute-childopts-no-keys
+ (testing "worker-launcher has no ids to replace in childopts"
+ (let [ worker-id "w-01"
+ storm-id "s-01"
+ port 9999
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"]
+ (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
+ (is (not-found? "%WORKER-ID%" childopts-with-ids))
+ (is (not-found? "w-01" childopts-with-ids))
+ (is (not-found? "%STORM-ID%" childopts-with-ids))
+ (is (not-found? "s-01" childopts-with-ids))
+ (is (not-found? "%WORKER-PORT%" childopts-with-ids))
+ (is (not-found? "-9999." childopts-with-ids))
+ (is (not-found? "%ID%" childopts-with-ids))
+ (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) )))
+
+(deftest test-substitute-childopts-nil-childopts
+ (testing "worker-launcher has nil childopts"
+ (let [ worker-id "w-01"
+ storm-id "s-01"
+ port 9999
+ childopts nil]
+ (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
+ (is (not-found? "%WORKER-ID%" childopts-with-ids))
+ (is (not-found? "w-01" childopts-with-ids))
+ (is (not-found? "%STORM-ID%" childopts-with-ids))
+ (is (not-found? "s-01" childopts-with-ids))
+ (is (not-found? "%WORKER-PORT%" childopts-with-ids))
+ (is (not-found? "-9999." childopts-with-ids))
+ (is (not-found? "%ID%" childopts-with-ids))
+ (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
+ )))
+
+(deftest test-substitute-childopts-nil-ids
+ (testing "worker-launcher has nil ids"
+ (let [ worker-id nil
+ storm-id "s-01"
+ port 9999
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"]
+ (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id port))
+ (is (not-found? "%WORKER-ID%" childopts-with-ids))
+ (is (not-found? "w-01" childopts-with-ids))
+ (is (not-found? "%STORM-ID%" childopts-with-ids))
+ (is (found? "s-01" childopts-with-ids))
+ (is (not-found? "%WORKER-PORT%" childopts-with-ids))
+ (is (found? "-9999." childopts-with-ids))
+ (is (not-found? "%ID%" childopts-with-ids))
+ (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
+ )))
+
+ (deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {NIMBUS-REASSIGN false
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1] ["sup1" 1]
+ [2] ["sup1" 2]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1] ["sup1" 1]
+ [2] ["sup1" 2]
+ })
+ (advance-cluster-time cluster 10)
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
+ )))