You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:13 UTC
[17/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
new file mode 100644
index 0000000..e4b44b0
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -0,0 +1,1219 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.supervisor
+ (:import [java.io File IOException FileOutputStream])
+ (:import [org.apache.storm.scheduler ISupervisor]
+ [org.apache.storm.utils LocalState Time Utils]
+ [org.apache.storm.daemon Shutdownable]
+ [org.apache.storm Constants]
+ [org.apache.storm.cluster ClusterStateContext DaemonType]
+ [java.net JarURLConnection]
+ [java.net URI]
+ [org.apache.commons.io FileUtils])
+ (:use [org.apache.storm config util log timer local-state])
+ (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
+ (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
+ (:import [java.nio.file Files StandardCopyOption])
+ (:import [org.apache.storm Config])
+ (:import [org.apache.storm.generated WorkerResources ProfileAction])
+ (:import [org.apache.storm.localizer LocalResource])
+ (:use [org.apache.storm.daemon common])
+ (:require [org.apache.storm.command [healthcheck :as healthcheck]])
+ (:require [org.apache.storm.daemon [worker :as worker]]
+ [org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
+ [clojure.set :as set])
+ (:import [org.apache.thrift.transport TTransportException])
+ (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
+ (:import [org.yaml.snakeyaml Yaml]
+ [org.yaml.snakeyaml.constructor SafeConstructor])
+ (:require [metrics.gauges :refer [defgauge]])
+ (:require [metrics.meters :refer [defmeter mark!]])
+ (:gen-class
+ :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]]))
+
+(defmeter supervisor:num-workers-launched)
+
+(defmulti download-storm-code cluster-mode)
+(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
+
+(def STORM-VERSION (VersionInfo/getVersion))
+
+(defprotocol SupervisorDaemon
+ (get-id [this])
+ (get-conf [this])
+ (shutdown-all-workers [this])
+ )
+
+(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
+ (let [storm-ids (.assignments storm-cluster-state callback)]
+ (let [new-assignments
+ (->>
+ (dofor [sid storm-ids]
+ (let [recorded-version (:version (get assignment-versions sid))]
+ (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
+ (if (= assignment-version recorded-version)
+ {sid (get assignment-versions sid)}
+ {sid (.assignment-info-with-version storm-cluster-state sid callback)})
+ {sid nil})))
+ (apply merge)
+ (filter-val not-nil?))
+ new-profiler-actions
+ (->>
+ (dofor [sid (distinct storm-ids)]
+ (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)]
+ {sid topo-profile-actions}))
+ (apply merge))]
+
+ {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
+ :profiler-actions new-profiler-actions
+ :versions new-assignments})))
+
+(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
+ (let [assignment (get assignments-snapshot storm-id)
+ my-slots-resources (into {}
+ (filter (fn [[[node _] _]] (= node assignment-id))
+ (:worker->resources assignment)))
+ my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
+ (:executor->node+port assignment))
+ port-executors (apply merge-with
+ concat
+ (for [[executor [_ port]] my-executors]
+ {port [executor]}
+ ))]
+ (into {} (for [[port executors] port-executors]
+ ;; need to cast to int b/c it might be a long (due to how yaml parses things)
+ ;; doall is to avoid serialization/deserialization problems with lazy seqs
+ [(Integer. port) (mk-local-assignment storm-id (doall executors) (get my-slots-resources [assignment-id port]))]
+ ))))
+
+(defn- read-assignments
+ "Returns map from port to struct containing :storm-id, :executors and :resources"
+ ([assignments-snapshot assignment-id]
+ (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
+ (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
+ ([assignments-snapshot assignment-id existing-assignment retries]
+ (try (let [assignments (read-assignments assignments-snapshot assignment-id)]
+ (reset! retries 0)
+ assignments)
+ (catch RuntimeException e
+ (if (> @retries 2) (throw e) (swap! retries inc))
+ (log-warn (.getMessage e) ": retrying " @retries " of 3")
+ existing-assignment))))
+
+(defn- read-storm-code-locations
+ [assignments-snapshot]
+ (map-val :master-code-dir assignments-snapshot))
+
+(defn- read-downloaded-storm-ids [conf]
+ (map #(url-decode %) (read-dir-contents (supervisor-stormdist-root conf)))
+ )
+
+(defn read-worker-heartbeat [conf id]
+ (let [local-state (worker-state conf id)]
+ (try
+ (ls-worker-heartbeat local-state)
+ (catch Exception e
+ (log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.")
+ nil))))
+
+
+(defn my-worker-ids [conf]
+ (read-dir-contents (worker-root conf)))
+
+(defn read-worker-heartbeats
+ "Returns map from worker id to heartbeat"
+ [conf]
+ (let [ids (my-worker-ids conf)]
+ (into {}
+ (dofor [id ids]
+ [id (read-worker-heartbeat conf id)]))
+ ))
+
+
+(defn matches-an-assignment? [worker-heartbeat assigned-executors]
+ (let [local-assignment (assigned-executors (:port worker-heartbeat))]
+ (and local-assignment
+ (= (:storm-id worker-heartbeat) (:storm-id local-assignment))
+ (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
+ (set (:executors local-assignment))))))
+
+(let [dead-workers (atom #{})]
+ (defn get-dead-workers []
+ @dead-workers)
+ (defn add-dead-worker [worker]
+ (swap! dead-workers conj worker))
+ (defn remove-dead-worker [worker]
+ (swap! dead-workers disj worker)))
+
+(defn is-worker-hb-timed-out? [now hb conf]
+ (> (- now (:time-secs hb))
+ (conf SUPERVISOR-WORKER-TIMEOUT-SECS)))
+
+(defn read-allocated-workers
+ "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
+ [supervisor assigned-executors now]
+ (let [conf (:conf supervisor)
+ ^LocalState local-state (:local-state supervisor)
+ id->heartbeat (read-worker-heartbeats conf)
+ approved-ids (set (keys (ls-approved-workers local-state)))]
+ (into
+ {}
+ (dofor [[id hb] id->heartbeat]
+ (let [state (cond
+ (not hb)
+ :not-started
+ (or (not (contains? approved-ids id))
+ (not (matches-an-assignment? hb assigned-executors)))
+ :disallowed
+ (or
+ (when (get (get-dead-workers) id)
+ (log-message "Worker Process " id " has died!")
+ true)
+ (is-worker-hb-timed-out? now hb conf))
+ :timed-out
+ true
+ :valid)]
+ (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
+ [id [state hb]]
+ ))
+ )))
+
+(defn- wait-for-worker-launch [conf id start-time]
+ (let [state (worker-state conf id)]
+ (loop []
+ (let [hb (ls-worker-heartbeat state)]
+ (when (and
+ (not hb)
+ (<
+ (- (current-time-secs) start-time)
+ (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
+ ))
+ (log-message id " still hasn't started")
+ (Time/sleep 500)
+ (recur)
+ )))
+ (when-not (ls-worker-heartbeat state)
+ (log-message "Worker " id " failed to start")
+ )))
+
+(defn- wait-for-workers-launch [conf ids]
+ (let [start-time (current-time-secs)]
+ (doseq [id ids]
+ (wait-for-worker-launch conf id start-time))
+ ))
+
+(defn generate-supervisor-id []
+ (uuid))
+
+(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil :directory nil]
+ (let [_ (when (clojure.string/blank? user)
+ (throw (java.lang.IllegalArgumentException.
+ "User cannot be blank when calling worker-launcher.")))
+ wl-initial (conf SUPERVISOR-WORKER-LAUNCHER)
+ storm-home (System/getProperty "storm.home")
+ wl (if wl-initial wl-initial (str storm-home "/bin/worker-launcher"))
+ command (concat [wl user] args)]
+ (log-message "Running as user:" user " command:" (pr-str command))
+ (launch-process command :environment environment :log-prefix log-prefix :exit-code-callback exit-code-callback :directory directory)
+ ))
+
+(defnk worker-launcher-and-wait [conf user args :environment {} :log-prefix nil]
+ (let [process (worker-launcher conf user args :environment environment)]
+ (if log-prefix
+ (read-and-log-stream log-prefix (.getInputStream process)))
+ (try
+ (.waitFor process)
+ (catch InterruptedException e
+ (log-message log-prefix " interrupted.")))
+ (.exitValue process)))
+
+(defn- rmr-as-user
+ "Launches a process owned by the given user that deletes the given path
+ recursively. Throws RuntimeException if the directory is not removed."
+ [conf id path]
+ (let [user (Utils/getFileOwner path)]
+ (worker-launcher-and-wait conf
+ user
+ ["rmr" path]
+ :log-prefix (str "rmr " id))
+ (if (exists-file? path)
+ (throw (RuntimeException. (str path " was not deleted"))))))
+
+(defn try-cleanup-worker [conf id]
+ (try
+ (if (.exists (File. (worker-root conf id)))
+ (do
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (rmr-as-user conf id (worker-root conf id))
+ (do
+ (rmr (worker-heartbeats-root conf id))
+ ;; this avoids a race condition with worker or subprocess writing pid around same time
+ (rmr (worker-pids-root conf id))
+ (rmr (worker-root conf id))))
+ (remove-worker-user! conf id)
+ (remove-dead-worker id)
+ ))
+ (catch IOException e
+ (log-warn-error e "Failed to cleanup worker " id ". Will retry later"))
+ (catch RuntimeException e
+ (log-warn-error e "Failed to cleanup worker " id ". Will retry later")
+ )
+ (catch java.io.FileNotFoundException e (log-message (.getMessage e)))
+ ))
+
+(defn shutdown-worker [supervisor id]
+ (log-message "Shutting down " (:supervisor-id supervisor) ":" id)
+ (let [conf (:conf supervisor)
+ pids (read-dir-contents (worker-pids-root conf id))
+ thread-pid (@(:worker-thread-pids-atom supervisor) id)
+ shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS)
+ as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ user (get-worker-user conf id)]
+ (when thread-pid
+ (psim/kill-process thread-pid))
+ (doseq [pid pids]
+ (if as-user
+ (worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid))
+ (kill-process-with-sig-term pid)))
+ (when-not (empty? pids)
+ (log-message "Sleep " shutdown-sleep-secs " seconds for execution of cleanup threads on worker.")
+ (sleep-secs shutdown-sleep-secs))
+ (doseq [pid pids]
+ (if as-user
+ (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
+ (force-kill-process pid))
+ (if as-user
+ (rmr-as-user conf id (worker-pid-path conf id pid))
+ (try
+ (rmpath (worker-pid-path conf id pid))
+ (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
+ (try-cleanup-worker conf id))
+ (log-message "Shut down " (:supervisor-id supervisor) ":" id))
+
+(def SUPERVISOR-ZK-ACLS
+ [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+ (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
+
+(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
+ {:conf conf
+ :shared-context shared-context
+ :isupervisor isupervisor
+ :active (atom true)
+ :uptime (uptime-computer)
+ :version STORM-VERSION
+ :worker-thread-pids-atom (atom {})
+ :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
+ (Utils/isZkAuthenticationConfiguredStormServer
+ conf)
+ SUPERVISOR-ZK-ACLS)
+ :context (ClusterStateContext. DaemonType/SUPERVISOR))
+ :local-state (supervisor-state conf)
+ :supervisor-id (.getSupervisorId isupervisor)
+ :assignment-id (.getAssignmentId isupervisor)
+ :my-hostname (hostname conf)
+ :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
+ :heartbeat-timer (mk-timer :kill-fn (fn [t]
+ (log-error t "Error when processing event")
+ (exit-process! 20 "Error when processing an event")
+ ))
+ :event-timer (mk-timer :kill-fn (fn [t]
+ (log-error t "Error when processing event")
+ (exit-process! 20 "Error when processing an event")
+ ))
+ :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
+ [t]
+ (log-error t "Error when processing event")
+ (exit-process! 20 "Error when processing a event"))
+ :timer-name "blob-update-timer")
+ :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
+ :assignment-versions (atom {})
+ :sync-retry (atom 0)
+ :download-lock (Object.)
+ :stormid->profiler-actions (atom {})
+ })
+
+(defn required-topo-files-exist?
+ [conf storm-id]
+ (let [stormroot (supervisor-stormdist-root conf storm-id)
+ stormjarpath (supervisor-stormjar-path stormroot)
+ stormcodepath (supervisor-stormcode-path stormroot)
+ stormconfpath (supervisor-stormconf-path stormroot)]
+ (and (every? exists-file? [stormroot stormconfpath stormcodepath])
+ (or (local-mode? conf)
+ (exists-file? stormjarpath)))))
+
+(defn get-worker-assignment-helper-msg
+ [assignment supervisor port id]
+ (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port "
+ port " with id " id))
+
+(defn get-valid-new-worker-ids
+ [conf supervisor reassign-executors new-worker-ids]
+ (into {}
+ (remove nil?
+ (dofor [[port assignment] reassign-executors]
+ (let [id (new-worker-ids port)
+ storm-id (:storm-id assignment)
+ ^WorkerResources resources (:resources assignment)
+ mem-onheap (.get_mem_on_heap resources)]
+ ;; This condition checks for required files exist before launching the worker
+ (if (required-topo-files-exist? conf storm-id)
+ (do
+ (log-message "Launching worker with assignment "
+ (get-worker-assignment-helper-msg assignment supervisor port id))
+ (local-mkdirs (worker-pids-root conf id))
+ (local-mkdirs (worker-heartbeats-root conf id))
+ (launch-worker supervisor
+ (:storm-id assignment)
+ port
+ id
+ mem-onheap)
+ [id port])
+ (do
+ (log-message "Missing topology storm code, so can't launch worker with assignment "
+ (get-worker-assignment-helper-msg assignment supervisor port id))
+ nil)))))))
+
+(defn sync-processes [supervisor]
+ (let [conf (:conf supervisor)
+ ^LocalState local-state (:local-state supervisor)
+ storm-cluster-state (:storm-cluster-state supervisor)
+ assigned-executors (defaulted (ls-local-assignments local-state) {})
+ now (current-time-secs)
+ allocated (read-allocated-workers supervisor assigned-executors now)
+ keepers (filter-val
+ (fn [[state _]] (= state :valid))
+ allocated)
+ keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
+ reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
+ new-worker-ids (into
+ {}
+ (for [port (keys reassign-executors)]
+ [port (uuid)]))]
+ ;; 1. to kill are those in allocated that are dead or disallowed
+ ;; 2. kill the ones that should be dead
+ ;; - read pids, kill -9 and individually remove file
+ ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
+ ;; 3. of the rest, figure out what assignments aren't yet satisfied
+ ;; 4. generate new worker ids, write new "approved workers" to LS
+ ;; 5. create local dir for worker id
+ ;; 5. launch new workers (give worker-id, port, and supervisor-id)
+ ;; 6. wait for workers launch
+
+ (log-debug "Syncing processes")
+ (log-debug "Assigned executors: " assigned-executors)
+ (log-debug "Allocated: " allocated)
+ (doseq [[id [state heartbeat]] allocated]
+ (when (not= :valid state)
+ (log-message
+ "Shutting down and clearing state for id " id
+ ". Current supervisor time: " now
+ ". State: " state
+ ", Heartbeat: " (pr-str heartbeat))
+ (shutdown-worker supervisor id)))
+ (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
+ (ls-approved-workers! local-state
+ (merge
+ (select-keys (ls-approved-workers local-state)
+ (keys keepers))
+ valid-new-worker-ids))
+ (wait-for-workers-launch conf (keys valid-new-worker-ids)))))
+
+(defn assigned-storm-ids-from-port-assignments [assignment]
+ (->> assignment
+ vals
+ (map :storm-id)
+ set))
+
+(defn shutdown-disallowed-workers [supervisor]
+ (let [conf (:conf supervisor)
+ ^LocalState local-state (:local-state supervisor)
+ assigned-executors (defaulted (ls-local-assignments local-state) {})
+ now (current-time-secs)
+ allocated (read-allocated-workers supervisor assigned-executors now)
+ disallowed (keys (filter-val
+ (fn [[state _]] (= state :disallowed))
+ allocated))]
+ (log-debug "Allocated workers " allocated)
+ (log-debug "Disallowed workers " disallowed)
+ (doseq [id disallowed]
+ (shutdown-worker supervisor id))
+ ))
+
+(defn get-blob-localname
+ "Given the blob information either gets the localname field if it exists,
+ else routines the default value passed in."
+ [blob-info defaultValue]
+ (or (get blob-info "localname") defaultValue))
+
+(defn should-uncompress-blob?
+ "Given the blob information returns the value of the uncompress field, handling it either being
+ a string or a boolean value, or if it's not specified then returns false"
+ [blob-info]
+ (Boolean. (get blob-info "uncompress")))
+
+(defn remove-blob-references
+ "Remove a reference to a blob when its no longer needed."
+ [localizer storm-id conf]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)]
+ (if blobstore-map
+ (doseq [[k, v] blobstore-map]
+ (.removeBlobReference localizer
+ k
+ user
+ topo-name
+ (should-uncompress-blob? v))))))
+
+(defn blobstore-map-to-localresources
+ "Returns a list of LocalResources based on the blobstore-map passed in."
+ [blobstore-map]
+ (if blobstore-map
+ (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v)))
+ ()))
+
+(defn add-blob-references
+ "For each of the downloaded topologies, adds references to the blobs that the topologies are
+ using. This is used to reconstruct the cache on restart."
+ [localizer storm-id conf]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (if blobstore-map
+ (.addReferences localizer localresources user topo-name))))
+
+(defn rm-topo-files
+ [conf storm-id localizer rm-blob-refs?]
+ (let [path (supervisor-stormdist-root conf storm-id)]
+ (try
+ (if rm-blob-refs?
+ (remove-blob-references localizer storm-id conf))
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (rmr-as-user conf storm-id path)
+ (rmr (supervisor-stormdist-root conf storm-id)))
+ (catch Exception e
+ (log-message e (str "Exception removing: " storm-id))))))
+
+(defn verify-downloaded-files
+ "Check for the files exists to avoid supervisor crashing
+ Also makes sure there is no necessity for locking"
+ [conf localizer assigned-storm-ids all-downloaded-storm-ids]
+ (remove nil?
+ (into #{}
+ (for [storm-id all-downloaded-storm-ids
+ :when (contains? assigned-storm-ids storm-id)]
+ (when-not (required-topo-files-exist? conf storm-id)
+ (log-debug "Files not present in topology directory")
+ (rm-topo-files conf storm-id localizer false)
+ storm-id)))))
+
+(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
+ (fn this []
+ (let [conf (:conf supervisor)
+ storm-cluster-state (:storm-cluster-state supervisor)
+ ^ISupervisor isupervisor (:isupervisor supervisor)
+ ^LocalState local-state (:local-state supervisor)
+ sync-callback (fn [& ignored] (.add event-manager this))
+ assignment-versions @(:assignment-versions supervisor)
+ {assignments-snapshot :assignments
+ storm-id->profiler-actions :profiler-actions
+ versions :versions}
+ (assignments-snapshot storm-cluster-state sync-callback assignment-versions)
+ storm-code-map (read-storm-code-locations assignments-snapshot)
+ all-downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ existing-assignment (ls-local-assignments local-state)
+ all-assignment (read-assignments assignments-snapshot
+ (:assignment-id supervisor)
+ existing-assignment
+ (:sync-retry supervisor))
+ new-assignment (->> all-assignment
+ (filter-key #(.confirmAssigned isupervisor %)))
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
+ localizer (:localizer supervisor)
+ checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids))
+ downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)]
+
+ (log-debug "Synchronizing supervisor")
+ (log-debug "Storm code map: " storm-code-map)
+ (log-debug "All assignment: " all-assignment)
+ (log-debug "New assignment: " new-assignment)
+ (log-debug "Assigned Storm Ids " assigned-storm-ids)
+ (log-debug "All Downloaded Ids " all-downloaded-storm-ids)
+ (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids)
+ (log-debug "Downloaded Ids " downloaded-storm-ids)
+ (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions)
+ ;; download code first
+ ;; This might take awhile
+ ;; - should this be done separately from usual monitoring?
+ ;; should we only download when topology is assigned to this supervisor?
+ (doseq [[storm-id master-code-dir] storm-code-map]
+ (when (and (not (downloaded-storm-ids storm-id))
+ (assigned-storm-ids storm-id))
+ (log-message "Downloading code for storm id " storm-id)
+ (try-cause
+ (download-storm-code conf storm-id master-code-dir localizer)
+
+ (catch NimbusLeaderNotFoundException e
+ (log-warn-error e "Nimbus leader was not available."))
+ (catch TTransportException e
+ (log-warn-error e "There was a connection problem with nimbus.")))
+ (log-message "Finished downloading code for storm id " storm-id)))
+
+ (log-debug "Writing new assignment "
+ (pr-str new-assignment))
+ (doseq [p (set/difference (set (keys existing-assignment))
+ (set (keys new-assignment)))]
+ (.killedWorker isupervisor (int p)))
+ (.assigned isupervisor (keys new-assignment))
+ (ls-local-assignments! local-state
+ new-assignment)
+ (reset! (:assignment-versions supervisor) versions)
+ (reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions)
+
+ (reset! (:curr-assignment supervisor) new-assignment)
+ ;; remove any downloaded code that's no longer assigned or active
+ ;; important that this happens after setting the local assignment so that
+ ;; synchronize-supervisor doesn't try to launch workers for which the
+ ;; resources don't exist
+ (if on-windows? (shutdown-disallowed-workers supervisor))
+ (doseq [storm-id all-downloaded-storm-ids]
+ (when-not (storm-code-map storm-id)
+ (log-message "Removing code for storm id "
+ storm-id)
+ (rm-topo-files conf storm-id localizer true)))
+ (.add processes-event-manager sync-processes))))
+
+(defn mk-supervisor-capacities
+ [conf]
+ {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
+ Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
+
+(defn update-blobs-for-topology!
+ "Update each blob listed in the topology configuration if the latest version of the blob
+ has not been downloaded."
+ [conf storm-id localizer]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (try
+ (.updateBlobs localizer localresources user)
+ (catch AuthorizationException authExp
+ (log-error authExp))
+ (catch KeyNotFoundException knf
+ (log-error knf)))))
+
+(defn update-blobs-for-all-topologies-fn
+ "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned
+ to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically
+ by a timer, created elsewhere."
+ [supervisor]
+ (fn []
+ (try-cause
+ (let [conf (:conf supervisor)
+ downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ new-assignment @(:curr-assignment supervisor)
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+ (doseq [topology-id downloaded-storm-ids]
+ (let [storm-root (supervisor-stormdist-root conf topology-id)]
+ (when (assigned-storm-ids topology-id)
+ (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root)
+ (update-blobs-for-topology! conf topology-id (:localizer supervisor))))))
+ (catch TTransportException e
+ (log-error
+ e
+ "Network error while updating blobs, will retry again later"))
+ (catch NimbusLeaderNotFoundException e
+ (log-error
+ e
+ "Nimbus unavailable to update blobs, will retry again later")))))
+
+(defn jvm-cmd [cmd]
+ (let [java-home (.get (System/getenv) "JAVA_HOME")]
+ (if (nil? java-home)
+ cmd
+ (str java-home file-path-separator "bin" file-path-separator cmd))))
+
+(defn java-cmd []
+ (jvm-cmd "java"))
+
+(defn jmap-dump-cmd [profile-cmd pid target-dir]
+ [profile-cmd pid "jmap" target-dir])
+
+(defn jstack-dump-cmd [profile-cmd pid target-dir]
+ [profile-cmd pid "jstack" target-dir])
+
+(defn jprofile-start [profile-cmd pid]
+ [profile-cmd pid "start"])
+
+(defn jprofile-stop [profile-cmd pid target-dir]
+ [profile-cmd pid "stop" target-dir])
+
+(defn jprofile-dump [profile-cmd pid workers-artifacts-directory]
+ [profile-cmd pid "dump" workers-artifacts-directory])
+
+(defn jprofile-jvm-restart [profile-cmd pid]
+ [profile-cmd pid "kill"])
+
+(defn- delete-topology-profiler-action [storm-cluster-state storm-id profile-action]
+ (log-message "Deleting profiler action.." profile-action)
+ (.delete-topology-profile-requests storm-cluster-state storm-id profile-action))
+
+(defnk launch-profiler-action-for-worker
+ "Launch profiler action for a worker"
+ [conf user target-dir command :environment {} :exit-code-on-profile-action nil :log-prefix nil]
+ (if-let [run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)]
+ (let [container-file (container-file-path target-dir)
+ script-file (script-file-path target-dir)]
+ (log-message "Running as user:" user " command:" (shell-cmd command))
+ (if (exists-file? container-file) (rmr-as-user conf container-file container-file))
+ (if (exists-file? script-file) (rmr-as-user conf script-file script-file))
+ (worker-launcher
+ conf
+ user
+ ["profiler" target-dir (write-script target-dir command :environment environment)]
+ :log-prefix log-prefix
+ :exit-code-callback exit-code-on-profile-action
+ :directory (File. target-dir)))
+ (launch-process
+ command
+ :environment environment
+ :log-prefix log-prefix
+ :exit-code-callback exit-code-on-profile-action
+ :directory (File. target-dir))))
+
+(defn mk-run-profiler-actions-for-all-topologies
+ "Returns a function that downloads all profile-actions listed for all topologies assigned
+ to this supervisor, executes those actions as user and deletes them from zookeeper."
+ [supervisor]
+ (fn []
+ (try
+ (let [conf (:conf supervisor)
+ stormid->profiler-actions @(:stormid->profiler-actions supervisor)
+ storm-cluster-state (:storm-cluster-state supervisor)
+ hostname (:my-hostname supervisor)
+ profile-cmd (conf WORKER-PROFILER-COMMAND)
+ new-assignment @(:curr-assignment supervisor)
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+ (doseq [[storm-id profiler-actions] stormid->profiler-actions]
+ (when (not (empty? profiler-actions))
+ (doseq [pro-action profiler-actions]
+ (if (= hostname (:host pro-action))
+ (let [port (:port pro-action)
+ action ^ProfileAction (:action pro-action)
+ stop? (> (System/currentTimeMillis) (:timestamp pro-action))
+ target-dir (worker-artifacts-root conf storm-id port)
+ storm-conf (read-supervisor-storm-conf conf storm-id)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {})
+ worker-pid (slurp (worker-artifacts-pid-path conf storm-id port))
+ log-prefix (str "ProfilerAction process " storm-id ":" port " PROFILER_ACTION: " action " ")
+ ;; Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+ ;; The profiler plugin script validates if JVM is recording before starting another recording.
+ command (cond
+ (= action ProfileAction/JMAP_DUMP) (jmap-dump-cmd profile-cmd worker-pid target-dir)
+ (= action ProfileAction/JSTACK_DUMP) (jstack-dump-cmd profile-cmd worker-pid target-dir)
+ (= action ProfileAction/JPROFILE_DUMP) (jprofile-dump profile-cmd worker-pid target-dir)
+ (= action ProfileAction/JVM_RESTART) (jprofile-jvm-restart profile-cmd worker-pid)
+ (and (not stop?)
+ (= action ProfileAction/JPROFILE_STOP))
+ (jprofile-start profile-cmd worker-pid) ;; Ensure the profiler is still running
+ (and stop? (= action ProfileAction/JPROFILE_STOP)) (jprofile-stop profile-cmd worker-pid target-dir))
+ action-on-exit (fn [exit-code]
+ (log-message log-prefix " profile-action exited for code: " exit-code)
+ (if (and (= exit-code 0) stop?)
+ (delete-topology-profiler-action storm-cluster-state storm-id pro-action)))
+ command (->> command (map str) (filter (complement empty?)))]
+
+ (try
+ (launch-profiler-action-for-worker conf
+ user
+ target-dir
+ command
+ :environment environment
+ :exit-code-on-profile-action action-on-exit
+ :log-prefix log-prefix)
+ (catch IOException ioe
+ (log-error ioe
+ (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later.")))
+ (catch RuntimeException rte
+ (log-error rte
+ (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later."))))))))))
+ (catch Exception e
+ (log-error e "Error running profiler actions, will retry again later")))))
+
+;; in local state, supervisor stores who its current assignments are
+;; another thread launches events to restart any dead processes if necessary
+(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
+ (log-message "Starting Supervisor with conf " conf)
+ (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
+ (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
+ (let [supervisor (supervisor-data conf shared-context isupervisor)
+ [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
+ sync-processes (partial sync-processes supervisor)
+ synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
+ synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
+ downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor)
+ heartbeat-fn (fn [] (.supervisor-heartbeat!
+ (:storm-cluster-state supervisor)
+ (:supervisor-id supervisor)
+ (->SupervisorInfo (current-time-secs)
+ (:my-hostname supervisor)
+ (:assignment-id supervisor)
+ (keys @(:curr-assignment supervisor))
+ ;; used ports
+ (.getMetadata isupervisor)
+ (conf SUPERVISOR-SCHEDULER-META)
+ ((:uptime supervisor))
+ (:version supervisor)
+ (mk-supervisor-capacities conf))))]
+ (heartbeat-fn)
+
+ ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
+ (schedule-recurring (:heartbeat-timer supervisor)
+ 0
+ (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
+ heartbeat-fn)
+ (doseq [storm-id downloaded-storm-ids]
+ (add-blob-references (:localizer supervisor) storm-id
+ conf))
+ ;; do this after adding the references so we don't try to clean things being used
+ (.startCleaner (:localizer supervisor))
+
+ (when (conf SUPERVISOR-ENABLE)
+ ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+ ;; to date even if callbacks don't all work exactly right
+ (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+ (schedule-recurring (:event-timer supervisor)
+ 0
+ (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+ (fn [] (.add processes-event-manager sync-processes)))
+
+ ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
+ (schedule-recurring (:blob-update-timer supervisor)
+ 30
+ 30
+ (fn [] (.add event-manager synchronize-blobs-fn)))
+
+ (schedule-recurring (:event-timer supervisor)
+ (* 60 5)
+ (* 60 5)
+ (fn [] (let [health-code (healthcheck/health-check conf)
+ ids (my-worker-ids conf)]
+ (if (not (= health-code 0))
+ (do
+ (doseq [id ids]
+ (shutdown-worker supervisor id))
+ (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+
+ ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+ (schedule-recurring (:event-timer supervisor)
+ 30
+ 30
+ (fn [] (.add event-manager run-profiler-actions-fn))))
+ (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
+ (reify
+ Shutdownable
+ (shutdown [this]
+ (log-message "Shutting down supervisor " (:supervisor-id supervisor))
+ (reset! (:active supervisor) false)
+ (cancel-timer (:heartbeat-timer supervisor))
+ (cancel-timer (:event-timer supervisor))
+ (cancel-timer (:blob-update-timer supervisor))
+ (.shutdown event-manager)
+ (.shutdown processes-event-manager)
+ (.shutdown (:localizer supervisor))
+ (.disconnect (:storm-cluster-state supervisor)))
+ SupervisorDaemon
+ (get-conf [this]
+ conf)
+ (get-id [this]
+ (:supervisor-id supervisor))
+ (shutdown-all-workers [this]
+ (let [ids (my-worker-ids conf)]
+ (doseq [id ids]
+ (shutdown-worker supervisor id)
+ )))
+ DaemonCommon
+ (waiting? [this]
+ (or (not @(:active supervisor))
+ (and
+ (timer-waiting? (:heartbeat-timer supervisor))
+ (timer-waiting? (:event-timer supervisor))
+ (every? (memfn waiting?) managers)))
+ ))))
+
+(defn kill-supervisor [supervisor]
+ (.shutdown supervisor)
+ )
+
+(defn setup-storm-code-dir
+ [conf storm-conf dir]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir))))
+
+(defn setup-blob-permission
+ [conf storm-conf path]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path))))
+
+(defn download-blobs-for-topology!
+ "Download all blobs listed in the topology configuration for a given topology."
+ [conf stormconf-path localizer tmproot]
+ (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ user-dir (.getLocalUserFileCacheDir localizer user)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (when localresources
+ (when-not (.exists user-dir)
+ (FileUtils/forceMkdir user-dir))
+ (try
+ (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)]
+ (setup-blob-permission conf storm-conf (.toString user-dir))
+ (doseq [local-rsrc localized-resources]
+ (let [rsrc-file-path (File. (.getFilePath local-rsrc))
+ key-name (.getName rsrc-file-path)
+ blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc)))
+ symlink-name (get-blob-localname (get blobstore-map key-name) key-name)]
+ (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name
+ blob-symlink-target-name))))
+ (catch AuthorizationException authExp
+ (log-error authExp))
+ (catch KeyNotFoundException knf
+ (log-error knf))))))
+
+(defn get-blob-file-names
+ [blobstore-map]
+ (if blobstore-map
+ (for [[k, data] blobstore-map]
+ (get-blob-localname data k))))
+
+(defn download-blobs-for-topology-succeed?
+ "Assert if all blobs are downloaded for the given topology"
+ [stormconf-path target-dir]
+ (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ file-names (get-blob-file-names blobstore-map)]
+ (if-not (empty? file-names)
+ (every? #(Utils/checkFileExists target-dir %) file-names)
+ true)))
+
+;; distributed implementation
+(defmethod download-storm-code
+ :distributed [conf storm-id master-code-dir localizer]
+ ;; Downloading to permanent location is atomic
+ (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+ stormroot (supervisor-stormdist-root conf storm-id)
+ blobstore (Utils/getClientBlobStoreForSupervisor conf)]
+ (FileUtils/forceMkdir (File. tmproot))
+ (if-not on-windows?
+ (Utils/restrictPermissions tmproot)
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
+ (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id)
+ (supervisor-stormjar-path tmproot) blobstore)
+ (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id)
+ (supervisor-stormcode-path tmproot) blobstore)
+ (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id)
+ (supervisor-stormconf-path tmproot) blobstore)
+ (.shutdown blobstore)
+ (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+ (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
+ tmproot)
+ (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
+ (do
+ (log-message "Successfully downloaded blob resources for storm-id " storm-id)
+ (FileUtils/forceMkdir (File. stormroot))
+ (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot))
+ (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE)))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot))
+ (do
+ (log-message "Failed to download blob resources for storm-id " storm-id)
+ (rmr tmproot)))))
+
+(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
+ (let [file (get-log-metadata-file conf storm-id port)]
+ ;;run worker as user needs the directory to have special permissions
+ ;; or it is insecure
+ (when (not (.exists (.getParentFile file)))
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (do (FileUtils/forceMkdir (.getParentFile file))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
+ (.mkdirs (.getParentFile file))))
+ (let [writer (java.io.FileWriter. file)
+ yaml (Yaml.)]
+ (try
+ (.dump yaml data writer)
+ (finally
+ (.close writer))))))
+
+(defn write-log-metadata! [storm-conf user worker-id storm-id port conf]
+ (let [data {TOPOLOGY-SUBMITTER-USER user
+ "worker-id" worker-id
+ LOGS-GROUPS (sort (distinct (remove nil?
+ (concat
+ (storm-conf LOGS-GROUPS)
+ (storm-conf TOPOLOGY-GROUPS)))))
+ LOGS-USERS (sort (distinct (remove nil?
+ (concat
+ (storm-conf LOGS-USERS)
+ (storm-conf TOPOLOGY-USERS)))))}]
+ (write-log-metadata-to-yaml-file! storm-id port data conf)))
+
+(defn jlp [stormroot conf]
+ (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
+ os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
+ arch (System/getProperty "os.arch")
+ arch-resource-root (str resource-root File/separator os "-" arch)]
+ (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
+
+(defn substitute-childopts
+ "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
+ [value worker-id topology-id port mem-onheap]
+ (let [replacement-map {"%ID%" (str port)
+ "%WORKER-ID%" (str worker-id)
+ "%TOPOLOGY-ID%" (str topology-id)
+ "%WORKER-PORT%" (str port)
+ "%HEAP-MEM%" (str mem-onheap)}
+ sub-fn #(reduce (fn [string entry]
+ (apply clojure.string/replace string entry))
+ %
+ replacement-map)]
+ (cond
+ (nil? value) nil
+ (sequential? value) (vec (map sub-fn value))
+ :else (-> value sub-fn (clojure.string/split #"\s+")))))
+
+
+(defn create-blobstore-links
+ "Create symlinks in worker launch directory for all blobs"
+ [conf storm-id worker-id]
+ (let [stormroot (supervisor-stormdist-root conf storm-id)
+ storm-conf (read-supervisor-storm-conf conf storm-id)
+ workerroot (worker-root conf worker-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ blob-file-names (get-blob-file-names blobstore-map)
+ resource-file-names (cons RESOURCES-SUBDIR blob-file-names)]
+ (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+ storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names))
+ (create-symlink! workerroot stormroot RESOURCES-SUBDIR)
+ (doseq [file-name blob-file-names]
+ (create-symlink! workerroot stormroot file-name file-name))))
+
+(defn create-artifacts-link
+ "Create a symlink from workder directory to its port artifacts directory"
+ [conf storm-id port worker-id]
+ (let [worker-dir (worker-root conf worker-id)
+ topo-dir (worker-artifacts-root conf storm-id)]
+ (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+ storm-id " to its port artifacts directory")
+ (if (.exists (File. worker-dir))
+ (create-symlink! worker-dir topo-dir "artifacts" port))))
+
+(defmethod launch-worker
+ :distributed [supervisor storm-id port worker-id mem-onheap]
+ (let [conf (:conf supervisor)
+ run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ storm-home (System/getProperty "storm.home")
+ storm-options (System/getProperty "storm.options")
+ storm-conf-file (System/getProperty "storm.conf.file")
+ storm-log-dir LOG-DIR
+ storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR)
+ storm-log4j2-conf-dir (if storm-log-conf-dir
+ (if (is-absolute-path? storm-log-conf-dir)
+ storm-log-conf-dir
+ (str storm-home file-path-separator storm-log-conf-dir))
+ (str storm-home file-path-separator "log4j2"))
+ stormroot (supervisor-stormdist-root conf storm-id)
+ jlp (jlp stormroot conf)
+ stormjar (supervisor-stormjar-path stormroot)
+ storm-conf (read-supervisor-storm-conf conf storm-id)
+ topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
+ [cp]
+ [])
+ classpath (-> (worker-classpath)
+ (add-to-classpath [stormjar])
+ (add-to-classpath topo-classpath))
+ top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
+ mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero
+ (int (Math/ceil mem-onheap)) ;; round up
+ (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use default value
+ gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap)
+ topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ logfilename "worker.log"
+ workers-artifacts (worker-artifacts-root conf)
+ logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3")
+ worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
+ (substitute-childopts s worker-id storm-id port mem-onheap))
+ topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
+ (substitute-childopts s worker-id storm-id port mem-onheap))
+ worker--profiler-childopts (if (conf WORKER-PROFILER-ENABLED)
+ (substitute-childopts (conf WORKER-PROFILER-CHILDOPTS) worker-id storm-id port mem-onheap)
+ "")
+ topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)]
+ (merge env {"LD_LIBRARY_PATH" jlp})
+ {"LD_LIBRARY_PATH" jlp})
+ command (concat
+ [(java-cmd) "-cp" classpath
+ topo-worker-logwriter-childopts
+ (str "-Dlogfile.name=" logfilename)
+ (str "-Dstorm.home=" storm-home)
+ (str "-Dworkers.artifacts=" workers-artifacts)
+ (str "-Dstorm.id=" storm-id)
+ (str "-Dworker.id=" worker-id)
+ (str "-Dworker.port=" port)
+ (str "-Dstorm.log.dir=" storm-log-dir)
+ (str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml")
+ (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector")
+ "org.apache.storm.LogWriter"]
+ [(java-cmd) "-server"]
+ worker-childopts
+ topo-worker-childopts
+ gc-opts
+ worker--profiler-childopts
+ [(str "-Djava.library.path=" jlp)
+ (str "-Dlogfile.name=" logfilename)
+ (str "-Dstorm.home=" storm-home)
+ (str "-Dworkers.artifacts=" workers-artifacts)
+ (str "-Dstorm.conf.file=" storm-conf-file)
+ (str "-Dstorm.options=" storm-options)
+ (str "-Dstorm.log.dir=" storm-log-dir)
+ (str "-Dlogging.sensitivity=" logging-sensitivity)
+ (str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml")
+ (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector")
+ (str "-Dstorm.id=" storm-id)
+ (str "-Dworker.id=" worker-id)
+ (str "-Dworker.port=" port)
+ "-cp" classpath
+ "org.apache.storm.daemon.worker"
+ storm-id
+ (:assignment-id supervisor)
+ port
+ worker-id])
+ command (->> command (map str) (filter (complement empty?)))]
+ (log-message "Launching worker with command: " (shell-cmd command))
+ (write-log-metadata! storm-conf user worker-id storm-id port conf)
+ (set-worker-user! conf worker-id user)
+ (create-artifacts-link conf storm-id port worker-id)
+ (let [log-prefix (str "Worker Process " worker-id)
+ callback (fn [exit-code]
+ (log-message log-prefix " exited with code: " exit-code)
+ (add-dead-worker worker-id))
+ worker-dir (worker-root conf worker-id)]
+ (remove-dead-worker worker-id)
+ (create-blobstore-links conf storm-id worker-id)
+ (if run-worker-as-user
+ (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))
+ (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)))
+ )))
+
+;; local implementation
+
+(defn resources-jar []
+ (->> (.split (current-classpath) File/pathSeparator)
+ (filter #(.endsWith % ".jar"))
+ (filter #(zip-contains-dir? % RESOURCES-SUBDIR))
+ first ))
+
+(defmethod download-storm-code
+ :local [conf storm-id master-code-dir localizer]
+ (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+ stormroot (supervisor-stormdist-root conf storm-id)
+ blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
+ (try
+ (FileUtils/forceMkdir (File. tmproot))
+ (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+ (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+ (finally
+ (.shutdown blob-store)))
+ (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+ (let [classloader (.getContextClassLoader (Thread/currentThread))
+ resources-jar (resources-jar)
+ url (.getResource classloader RESOURCES-SUBDIR)
+ target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+ (cond
+ resources-jar
+ (do
+ (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+ (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+ url
+ (do
+ (log-message "Copying resources at " (str url) " to " target-dir)
+ (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir)))))))
+
+(defmethod launch-worker
+ :local [supervisor storm-id port worker-id mem-onheap]
+ (let [conf (:conf supervisor)
+ pid (uuid)
+ worker (worker/mk-worker conf
+ (:shared-context supervisor)
+ storm-id
+ (:assignment-id supervisor)
+ port
+ worker-id)]
+ (set-worker-user! conf worker-id "")
+ (psim/register-process pid worker)
+ (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
+ ))
+
+(defn -launch
+ [supervisor]
+ (log-message "Starting supervisor for storm version '" STORM-VERSION "'")
+ (let [conf (read-storm-config)]
+ (validate-distributed-mode! conf)
+ (let [supervisor (mk-supervisor conf nil supervisor)]
+ (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))
+ (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
+ (start-metrics-reporters)))
+
+(defn standalone-supervisor []
+ (let [conf-atom (atom nil)
+ id-atom (atom nil)]
+ (reify ISupervisor
+ (prepare [this conf local-dir]
+ (reset! conf-atom conf)
+ (let [state (LocalState. local-dir)
+ curr-id (if-let [id (ls-supervisor-id state)]
+ id
+ (generate-supervisor-id))]
+ (ls-supervisor-id! state curr-id)
+ (reset! id-atom curr-id))
+ )
+ (confirmAssigned [this port]
+ true)
+ (getMetadata [this]
+ (doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS))))
+ (getSupervisorId [this]
+ @id-atom)
+ (getAssignmentId [this]
+ @id-atom)
+ (killedWorker [this port]
+ )
+ (assigned [this ports]
+ ))))
+
+(defn -main []
+ (setup-default-uncaught-exception-handler)
+ (-launch (standalone-supervisor)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
new file mode 100644
index 0000000..1ae9b22
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -0,0 +1,189 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.task
+ (:use [org.apache.storm.daemon common])
+ (:use [org.apache.storm config util log])
+ (:import [org.apache.storm.hooks ITaskHook])
+ (:import [org.apache.storm.tuple Tuple TupleImpl])
+ (:import [org.apache.storm.grouping LoadMapping])
+ (:import [org.apache.storm.generated SpoutSpec Bolt StateSpoutSpec StormTopology])
+ (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
+ EmitInfo BoltFailInfo BoltAckInfo])
+ (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
+ (:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.generated ShellComponent JavaObject])
+ (:import [org.apache.storm.spout ShellSpout])
+ (:import [java.util Collection List ArrayList])
+ (:require [org.apache.storm
+ [thrift :as thrift]
+ [stats :as stats]])
+ (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
+
+(defn mk-topology-context-builder [worker executor-data topology]
+ (let [conf (:conf worker)]
+ #(TopologyContext.
+ topology
+ (:storm-conf worker)
+ (:task->component worker)
+ (:component->sorted-tasks worker)
+ (:component->stream->fields worker)
+ (:storm-id worker)
+ (supervisor-storm-resources-path
+ (supervisor-stormdist-root conf (:storm-id worker)))
+ (worker-pids-root conf (:worker-id worker))
+ (int %)
+ (:port worker)
+ (:task-ids worker)
+ (:default-shared-resources worker)
+ (:user-shared-resources worker)
+ (:shared-executor-data executor-data)
+ (:interval->task->metric-registry executor-data)
+ (:open-or-prepare-was-called? executor-data))))
+
+(defn system-topology-context [worker executor-data tid]
+ ((mk-topology-context-builder
+ worker
+ executor-data
+ (:system-topology worker))
+ tid))
+
+(defn user-topology-context [worker executor-data tid]
+ ((mk-topology-context-builder
+ worker
+ executor-data
+ (:topology worker))
+ tid))
+
+(defn- get-task-object [^StormTopology topology component-id]
+ (let [spouts (.get_spouts topology)
+ bolts (.get_bolts topology)
+ state-spouts (.get_state_spouts topology)
+ obj (Utils/getSetComponentObject
+ (cond
+ (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))
+ (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))
+ (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))
+ true (throw-runtime "Could not find " component-id " in " topology)))
+ obj (if (instance? ShellComponent obj)
+ (if (contains? spouts component-id)
+ (ShellSpout. obj)
+ (ShellBolt. obj))
+ obj )
+ obj (if (instance? JavaObject obj)
+ (thrift/instantiate-java-object obj)
+ obj )]
+ obj
+ ))
+
+(defn get-context-hooks [^TopologyContext context]
+ (.getHooks context))
+
+(defn hooks-empty? [^Collection hooks]
+ (.isEmpty hooks))
+
+(defmacro apply-hooks [topology-context method-sym info-form]
+ (let [hook-sym (with-meta (gensym "hook") {:tag 'org.apache.storm.hooks.ITaskHook})]
+ `(let [hooks# (get-context-hooks ~topology-context)]
+ (when-not (hooks-empty? hooks#)
+ (let [info# ~info-form]
+ (fast-list-iter [~hook-sym hooks#]
+ (~method-sym ~hook-sym info#)
+ ))))))
+
+
+;; TODO: this is all expensive... should be precomputed
+(defn send-unanchored
+ [task-data stream values]
+ (let [^TopologyContext topology-context (:system-context task-data)
+ tasks-fn (:tasks-fn task-data)
+ transfer-fn (-> task-data :executor-data :transfer-fn)
+ out-tuple (TupleImpl. topology-context
+ values
+ (.getThisTaskId topology-context)
+ stream)]
+ (fast-list-iter [t (tasks-fn stream values)]
+ (transfer-fn t out-tuple))))
+
+(defn mk-tasks-fn [task-data]
+ (let [task-id (:task-id task-data)
+ executor-data (:executor-data task-data)
+ ^LoadMapping load-mapping (:load-mapping (:worker executor-data))
+ component-id (:component-id executor-data)
+ ^WorkerTopologyContext worker-context (:worker-context executor-data)
+ storm-conf (:storm-conf executor-data)
+ emit-sampler (mk-stats-sampler storm-conf)
+ stream->component->grouper (:stream->component->grouper executor-data)
+ user-context (:user-context task-data)
+ executor-stats (:stats executor-data)
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+
+ (fn ([^Integer out-task-id ^String stream ^List values]
+ (when debug?
+ (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
+ (let [target-component (.getComponentId worker-context out-task-id)
+ component->grouping (get stream->component->grouper stream)
+ grouping (get component->grouping target-component)
+ out-task-id (if grouping out-task-id)]
+ (when (and (not-nil? grouping) (not= :direct grouping))
+ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
+ (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
+ (when (emit-sampler)
+ (stats/emitted-tuple! executor-stats stream)
+ (if out-task-id
+ (stats/transferred-tuples! executor-stats stream 1)))
+ (if out-task-id [out-task-id])
+ ))
+ ([^String stream ^List values]
+ (when debug?
+ (log-message "Emitting: " component-id " " stream " " values))
+ (let [out-tasks (ArrayList.)]
+ (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
+ (when (= :direct grouper)
+ ;; TODO: this is wrong, need to check how the stream was declared
+ (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
+ (let [comp-tasks (grouper task-id values load-mapping)]
+ (if (or (sequential? comp-tasks) (instance? Collection comp-tasks))
+ (.addAll out-tasks comp-tasks)
+ (.add out-tasks comp-tasks)
+ )))
+ (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
+ (when (emit-sampler)
+ (stats/emitted-tuple! executor-stats stream)
+ (stats/transferred-tuples! executor-stats stream (count out-tasks)))
+ out-tasks)))
+ ))
+
+(defn mk-task-data [executor-data task-id]
+ (recursive-map
+ :executor-data executor-data
+ :task-id task-id
+ :system-context (system-topology-context (:worker executor-data) executor-data task-id)
+ :user-context (user-topology-context (:worker executor-data) executor-data task-id)
+ :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data))
+ :tasks-fn (mk-tasks-fn <>)
+ :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
+
+
+(defn mk-task [executor-data task-id]
+ (let [task-data (mk-task-data executor-data task-id)
+ storm-conf (:storm-conf executor-data)]
+ (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
+ (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
+ ;; when this is called, the threads for the executor haven't been started yet,
+ ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
+ (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])
+ task-data
+ ))