You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:13 UTC

[17/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
new file mode 100644
index 0000000..e4b44b0
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -0,0 +1,1219 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.supervisor
+  (:import [java.io File IOException FileOutputStream])
+  (:import [org.apache.storm.scheduler ISupervisor]
+           [org.apache.storm.utils LocalState Time Utils]
+           [org.apache.storm.daemon Shutdownable]
+           [org.apache.storm Constants]
+           [org.apache.storm.cluster ClusterStateContext DaemonType]
+           [java.net JarURLConnection]
+           [java.net URI]
+           [org.apache.commons.io FileUtils])
+  (:use [org.apache.storm config util log timer local-state])
+  (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
+  (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
+  (:import [java.nio.file Files StandardCopyOption])
+  (:import [org.apache.storm Config])
+  (:import [org.apache.storm.generated WorkerResources ProfileAction])
+  (:import [org.apache.storm.localizer LocalResource])
+  (:use [org.apache.storm.daemon common])
+  (:require [org.apache.storm.command [healthcheck :as healthcheck]])
+  (:require [org.apache.storm.daemon [worker :as worker]]
+            [org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
+            [clojure.set :as set])
+  (:import [org.apache.thrift.transport TTransportException])
+  (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
+  (:import [org.yaml.snakeyaml Yaml]
+           [org.yaml.snakeyaml.constructor SafeConstructor])
+  (:require [metrics.gauges :refer [defgauge]])
+  (:require [metrics.meters :refer [defmeter mark!]])
+  (:gen-class
+    :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]]))
+
+(defmeter supervisor:num-workers-launched)
+
+(defmulti download-storm-code cluster-mode)
+(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
+
+(def STORM-VERSION (VersionInfo/getVersion))
+
+(defprotocol SupervisorDaemon
+  (get-id [this])
+  (get-conf [this])
+  (shutdown-all-workers [this])
+  )
+
+(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
+  (let [storm-ids (.assignments storm-cluster-state callback)]
+    (let [new-assignments
+          (->>
+           (dofor [sid storm-ids]
+                  (let [recorded-version (:version (get assignment-versions sid))]
+                    (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
+                      (if (= assignment-version recorded-version)
+                        {sid (get assignment-versions sid)}
+                        {sid (.assignment-info-with-version storm-cluster-state sid callback)})
+                      {sid nil})))
+           (apply merge)
+           (filter-val not-nil?))
+          new-profiler-actions
+          (->>
+            (dofor [sid (distinct storm-ids)]
+                   (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)]
+                      {sid topo-profile-actions}))
+           (apply merge))]
+         
+      {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
+       :profiler-actions new-profiler-actions
+       :versions new-assignments})))
+
+(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
+  (let [assignment (get assignments-snapshot storm-id)
+        my-slots-resources (into {}
+                                 (filter (fn [[[node _] _]] (= node assignment-id))
+                                         (:worker->resources assignment)))
+        my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
+                             (:executor->node+port assignment))
+        port-executors (apply merge-with
+                              concat
+                              (for [[executor [_ port]] my-executors]
+                                {port [executor]}
+                                ))]
+    (into {} (for [[port executors] port-executors]
+               ;; need to cast to int b/c it might be a long (due to how yaml parses things)
+               ;; doall is to avoid serialization/deserialization problems with lazy seqs
+               [(Integer. port) (mk-local-assignment storm-id (doall executors) (get my-slots-resources [assignment-id port]))]
+               ))))
+
+(defn- read-assignments
+  "Returns map from port to struct containing :storm-id, :executors and :resources"
+  ([assignments-snapshot assignment-id]
+     (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
+          (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
+  ([assignments-snapshot assignment-id existing-assignment retries]
+     (try (let [assignments (read-assignments assignments-snapshot assignment-id)]
+            (reset! retries 0)
+            assignments)
+          (catch RuntimeException e
+            (if (> @retries 2) (throw e) (swap! retries inc))
+            (log-warn (.getMessage e) ": retrying " @retries " of 3")
+            existing-assignment))))
+
+(defn- read-storm-code-locations
+  [assignments-snapshot]
+  (map-val :master-code-dir assignments-snapshot))
+
+(defn- read-downloaded-storm-ids [conf]
+  (map #(url-decode %) (read-dir-contents (supervisor-stormdist-root conf)))
+  )
+
+(defn read-worker-heartbeat [conf id]
+  (let [local-state (worker-state conf id)]
+    (try
+      (ls-worker-heartbeat local-state)
+      (catch Exception e
+        (log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.")
+        nil))))
+
+
+(defn my-worker-ids [conf]
+  (read-dir-contents (worker-root conf)))
+
+(defn read-worker-heartbeats
+  "Returns map from worker id to heartbeat"
+  [conf]
+  (let [ids (my-worker-ids conf)]
+    (into {}
+      (dofor [id ids]
+        [id (read-worker-heartbeat conf id)]))
+    ))
+
+
+(defn matches-an-assignment? [worker-heartbeat assigned-executors]
+  (let [local-assignment (assigned-executors (:port worker-heartbeat))]
+    (and local-assignment
+         (= (:storm-id worker-heartbeat) (:storm-id local-assignment))
+         (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
+            (set (:executors local-assignment))))))
+
+(let [dead-workers (atom #{})]
+  (defn get-dead-workers []
+    @dead-workers)
+  (defn add-dead-worker [worker]
+    (swap! dead-workers conj worker))
+  (defn remove-dead-worker [worker]
+    (swap! dead-workers disj worker)))
+
+(defn is-worker-hb-timed-out? [now hb conf]
+  (> (- now (:time-secs hb))
+     (conf SUPERVISOR-WORKER-TIMEOUT-SECS)))
+
+(defn read-allocated-workers
+  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
+  [supervisor assigned-executors now]
+  (let [conf (:conf supervisor)
+        ^LocalState local-state (:local-state supervisor)
+        id->heartbeat (read-worker-heartbeats conf)
+        approved-ids (set (keys (ls-approved-workers local-state)))]
+    (into
+     {}
+     (dofor [[id hb] id->heartbeat]
+            (let [state (cond
+                         (not hb)
+                           :not-started
+                         (or (not (contains? approved-ids id))
+                             (not (matches-an-assignment? hb assigned-executors)))
+                           :disallowed
+                         (or
+                          (when (get (get-dead-workers) id)
+                            (log-message "Worker Process " id " has died!")
+                            true)
+                          (is-worker-hb-timed-out? now hb conf))
+                           :timed-out
+                         true
+                           :valid)]
+              (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
+              [id [state hb]]
+              ))
+     )))
+
+(defn- wait-for-worker-launch [conf id start-time]
+  (let [state (worker-state conf id)]
+    (loop []
+      (let [hb (ls-worker-heartbeat state)]
+        (when (and
+               (not hb)
+               (<
+                (- (current-time-secs) start-time)
+                (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
+                ))
+          (log-message id " still hasn't started")
+          (Time/sleep 500)
+          (recur)
+          )))
+    (when-not (ls-worker-heartbeat state)
+      (log-message "Worker " id " failed to start")
+      )))
+
+(defn- wait-for-workers-launch [conf ids]
+  (let [start-time (current-time-secs)]
+    (doseq [id ids]
+      (wait-for-worker-launch conf id start-time))
+    ))
+
+(defn generate-supervisor-id []
+  (uuid))
+
+(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil :directory nil]
+  (let [_ (when (clojure.string/blank? user)
+            (throw (java.lang.IllegalArgumentException.
+                     "User cannot be blank when calling worker-launcher.")))
+        wl-initial (conf SUPERVISOR-WORKER-LAUNCHER)
+        storm-home (System/getProperty "storm.home")
+        wl (if wl-initial wl-initial (str storm-home "/bin/worker-launcher"))
+        command (concat [wl user] args)]
+    (log-message "Running as user:" user " command:" (pr-str command))
+    (launch-process command :environment environment :log-prefix log-prefix :exit-code-callback exit-code-callback :directory directory)
+  ))
+
+(defnk worker-launcher-and-wait [conf user args :environment {} :log-prefix nil]
+  (let [process (worker-launcher conf user args :environment environment)]
+    (if log-prefix
+      (read-and-log-stream log-prefix (.getInputStream process)))
+      (try
+        (.waitFor process)
+      (catch InterruptedException e
+        (log-message log-prefix " interrupted.")))
+      (.exitValue process)))
+
+(defn- rmr-as-user
+  "Launches a process owned by the given user that deletes the given path
+  recursively.  Throws RuntimeException if the directory is not removed."
+  [conf id path]
+  (let [user (Utils/getFileOwner path)]
+    (worker-launcher-and-wait conf
+      user
+      ["rmr" path]
+      :log-prefix (str "rmr " id))
+    (if (exists-file? path)
+      (throw (RuntimeException. (str path " was not deleted"))))))
+
+(defn try-cleanup-worker [conf id]
+  (try
+    (if (.exists (File. (worker-root conf id)))
+      (do
+        (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+          (rmr-as-user conf id (worker-root conf id))
+          (do
+            (rmr (worker-heartbeats-root conf id))
+            ;; this avoids a race condition with worker or subprocess writing pid around same time
+            (rmr (worker-pids-root conf id))
+            (rmr (worker-root conf id))))
+        (remove-worker-user! conf id)
+        (remove-dead-worker id)
+      ))
+  (catch IOException e
+    (log-warn-error e "Failed to cleanup worker " id ". Will retry later"))
+  (catch RuntimeException e
+    (log-warn-error e "Failed to cleanup worker " id ". Will retry later")
+    )
+  (catch java.io.FileNotFoundException e (log-message (.getMessage e)))
+    ))
+
+(defn shutdown-worker [supervisor id]
+  (log-message "Shutting down " (:supervisor-id supervisor) ":" id)
+  (let [conf (:conf supervisor)
+        pids (read-dir-contents (worker-pids-root conf id))
+        thread-pid (@(:worker-thread-pids-atom supervisor) id)
+        shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS)
+        as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
+        user (get-worker-user conf id)]
+    (when thread-pid
+      (psim/kill-process thread-pid))
+    (doseq [pid pids]
+      (if as-user
+        (worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid))
+        (kill-process-with-sig-term pid)))
+    (when-not (empty? pids)  
+      (log-message "Sleep " shutdown-sleep-secs " seconds for execution of cleanup threads on worker.")
+      (sleep-secs shutdown-sleep-secs))
+    (doseq [pid pids]
+      (if as-user
+        (worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
+        (force-kill-process pid))
+      (if as-user
+        (rmr-as-user conf id (worker-pid-path conf id pid))
+        (try
+          (rmpath (worker-pid-path conf id pid))
+          (catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
+    (try-cleanup-worker conf id))
+  (log-message "Shut down " (:supervisor-id supervisor) ":" id))
+
+(def SUPERVISOR-ZK-ACLS
+  [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+   (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
+
+(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
+  {:conf conf
+   :shared-context shared-context
+   :isupervisor isupervisor
+   :active (atom true)
+   :uptime (uptime-computer)
+   :version STORM-VERSION
+   :worker-thread-pids-atom (atom {})
+   :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
+                                                                     (Utils/isZkAuthenticationConfiguredStormServer
+                                                                       conf)
+                                                                     SUPERVISOR-ZK-ACLS)
+                                                        :context (ClusterStateContext. DaemonType/SUPERVISOR))
+   :local-state (supervisor-state conf)
+   :supervisor-id (.getSupervisorId isupervisor)
+   :assignment-id (.getAssignmentId isupervisor)
+   :my-hostname (hostname conf)
+   :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
+   :heartbeat-timer (mk-timer :kill-fn (fn [t]
+                               (log-error t "Error when processing event")
+                               (exit-process! 20 "Error when processing an event")
+                               ))
+   :event-timer (mk-timer :kill-fn (fn [t]
+                                         (log-error t "Error when processing event")
+                                         (exit-process! 20 "Error when processing an event")
+                                         ))
+   :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
+                                           [t]
+                                           (log-error t "Error when processing event")
+                                           (exit-process! 20 "Error when processing a event"))
+                                :timer-name "blob-update-timer")
+   :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
+   :assignment-versions (atom {})
+   :sync-retry (atom 0)
+   :download-lock (Object.)
+   :stormid->profiler-actions (atom {})
+   })
+
+(defn required-topo-files-exist?
+  [conf storm-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+        stormjarpath (supervisor-stormjar-path stormroot)
+        stormcodepath (supervisor-stormcode-path stormroot)
+        stormconfpath (supervisor-stormconf-path stormroot)]
+    (and (every? exists-file? [stormroot stormconfpath stormcodepath])
+         (or (local-mode? conf)
+             (exists-file? stormjarpath)))))
+
+(defn get-worker-assignment-helper-msg
+  [assignment supervisor port id]
+  (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port "
+    port " with id " id))
+
+(defn get-valid-new-worker-ids
+  [conf supervisor reassign-executors new-worker-ids]
+  (into {}
+    (remove nil?
+      (dofor [[port assignment] reassign-executors]
+        (let [id (new-worker-ids port)
+              storm-id (:storm-id assignment)
+              ^WorkerResources resources (:resources assignment)
+              mem-onheap (.get_mem_on_heap resources)]
+          ;; This condition checks for required files exist before launching the worker
+          (if (required-topo-files-exist? conf storm-id)
+            (do
+              (log-message "Launching worker with assignment "
+                (get-worker-assignment-helper-msg assignment supervisor port id))
+              (local-mkdirs (worker-pids-root conf id))
+              (local-mkdirs (worker-heartbeats-root conf id))
+              (launch-worker supervisor
+                (:storm-id assignment)
+                port
+                id
+                mem-onheap)
+              [id port])
+            (do
+              (log-message "Missing topology storm code, so can't launch worker with assignment "
+                (get-worker-assignment-helper-msg assignment supervisor port id))
+              nil)))))))
+
+(defn sync-processes [supervisor]
+  (let [conf (:conf supervisor)
+        ^LocalState local-state (:local-state supervisor)
+        storm-cluster-state (:storm-cluster-state supervisor)
+        assigned-executors (defaulted (ls-local-assignments local-state) {})
+        now (current-time-secs)
+        allocated (read-allocated-workers supervisor assigned-executors now)
+        keepers (filter-val
+                 (fn [[state _]] (= state :valid))
+                 allocated)
+        keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
+        reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
+        new-worker-ids (into
+                        {}
+                        (for [port (keys reassign-executors)]
+                          [port (uuid)]))]
+    ;; 1. to kill are those in allocated that are dead or disallowed
+    ;; 2. kill the ones that should be dead
+    ;;     - read pids, kill -9 and individually remove file
+    ;;     - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
+    ;; 3. of the rest, figure out what assignments aren't yet satisfied
+    ;; 4. generate new worker ids, write new "approved workers" to LS
+    ;; 5. create local dir for worker id
+    ;; 5. launch new workers (give worker-id, port, and supervisor-id)
+    ;; 6. wait for workers launch
+
+    (log-debug "Syncing processes")
+    (log-debug "Assigned executors: " assigned-executors)
+    (log-debug "Allocated: " allocated)
+    (doseq [[id [state heartbeat]] allocated]
+      (when (not= :valid state)
+        (log-message
+         "Shutting down and clearing state for id " id
+         ". Current supervisor time: " now
+         ". State: " state
+         ", Heartbeat: " (pr-str heartbeat))
+        (shutdown-worker supervisor id)))
+    (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
+      (ls-approved-workers! local-state
+                        (merge
+                          (select-keys (ls-approved-workers local-state)
+                            (keys keepers))
+                          valid-new-worker-ids))
+      (wait-for-workers-launch conf (keys valid-new-worker-ids)))))
+
+(defn assigned-storm-ids-from-port-assignments [assignment]
+  (->> assignment
+       vals
+       (map :storm-id)
+       set))
+
+(defn shutdown-disallowed-workers [supervisor]
+  (let [conf (:conf supervisor)
+        ^LocalState local-state (:local-state supervisor)
+        assigned-executors (defaulted (ls-local-assignments local-state) {})
+        now (current-time-secs)
+        allocated (read-allocated-workers supervisor assigned-executors now)
+        disallowed (keys (filter-val
+                                  (fn [[state _]] (= state :disallowed))
+                                  allocated))]
+    (log-debug "Allocated workers " allocated)
+    (log-debug "Disallowed workers " disallowed)
+    (doseq [id disallowed]
+      (shutdown-worker supervisor id))
+    ))
+
+(defn get-blob-localname
+  "Given the blob information either gets the localname field if it exists,
+  else routines the default value passed in."
+  [blob-info defaultValue]
+  (or (get blob-info "localname") defaultValue))
+
+(defn should-uncompress-blob?
+  "Given the blob information returns the value of the uncompress field, handling it either being
+  a string or a boolean value, or if it's not specified then returns false"
+  [blob-info]
+  (Boolean. (get blob-info "uncompress")))
+
+(defn remove-blob-references
+  "Remove a reference to a blob when its no longer needed."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        topo-name (storm-conf TOPOLOGY-NAME)]
+    (if blobstore-map
+      (doseq [[k, v] blobstore-map]
+        (.removeBlobReference localizer
+          k
+          user
+          topo-name
+          (should-uncompress-blob? v))))))
+
+(defn blobstore-map-to-localresources
+  "Returns a list of LocalResources based on the blobstore-map passed in."
+  [blobstore-map]
+  (if blobstore-map
+    (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v)))
+    ()))
+
+(defn add-blob-references
+  "For each of the downloaded topologies, adds references to the blobs that the topologies are
+  using. This is used to reconstruct the cache on restart."
+  [localizer storm-id conf]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        topo-name (storm-conf TOPOLOGY-NAME)
+        localresources (blobstore-map-to-localresources blobstore-map)]
+    (if blobstore-map
+      (.addReferences localizer localresources user topo-name))))
+
+(defn rm-topo-files
+  [conf storm-id localizer rm-blob-refs?]
+  (let [path (supervisor-stormdist-root conf storm-id)]
+    (try
+      (if rm-blob-refs?
+        (remove-blob-references localizer storm-id conf))
+      (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+        (rmr-as-user conf storm-id path)
+        (rmr (supervisor-stormdist-root conf storm-id)))
+      (catch Exception e
+        (log-message e (str "Exception removing: " storm-id))))))
+
+(defn verify-downloaded-files
+  "Check for the files exists to avoid supervisor crashing
+   Also makes sure there is no necessity for locking"
+  [conf localizer assigned-storm-ids all-downloaded-storm-ids]
+  (remove nil?
+    (into #{}
+      (for [storm-id all-downloaded-storm-ids
+            :when (contains? assigned-storm-ids storm-id)]
+        (when-not (required-topo-files-exist? conf storm-id)
+          (log-debug "Files not present in topology directory")
+          (rm-topo-files conf storm-id localizer false)
+          storm-id)))))
+
+(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
+  (fn this []
+    (let [conf (:conf supervisor)
+          storm-cluster-state (:storm-cluster-state supervisor)
+          ^ISupervisor isupervisor (:isupervisor supervisor)
+          ^LocalState local-state (:local-state supervisor)
+          sync-callback (fn [& ignored] (.add event-manager this))
+          assignment-versions @(:assignment-versions supervisor)
+          {assignments-snapshot :assignments
+           storm-id->profiler-actions :profiler-actions
+           versions :versions}
+          (assignments-snapshot storm-cluster-state sync-callback assignment-versions)
+          storm-code-map (read-storm-code-locations assignments-snapshot)
+          all-downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+          existing-assignment (ls-local-assignments local-state)
+          all-assignment (read-assignments assignments-snapshot
+                                           (:assignment-id supervisor)
+                                           existing-assignment
+                                           (:sync-retry supervisor))
+          new-assignment (->> all-assignment
+                              (filter-key #(.confirmAssigned isupervisor %)))
+          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
+          localizer (:localizer supervisor)
+          checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids))
+          downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)]
+
+      (log-debug "Synchronizing supervisor")
+      (log-debug "Storm code map: " storm-code-map)
+      (log-debug "All assignment: " all-assignment)
+      (log-debug "New assignment: " new-assignment)
+      (log-debug "Assigned Storm Ids " assigned-storm-ids)
+      (log-debug "All Downloaded Ids " all-downloaded-storm-ids)
+      (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids)
+      (log-debug "Downloaded Ids " downloaded-storm-ids)
+      (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions)
+      ;; download code first
+      ;; This might take awhile
+      ;;   - should this be done separately from usual monitoring?
+      ;; should we only download when topology is assigned to this supervisor?
+      (doseq [[storm-id master-code-dir] storm-code-map]
+        (when (and (not (downloaded-storm-ids storm-id))
+                   (assigned-storm-ids storm-id))
+          (log-message "Downloading code for storm id " storm-id)
+          (try-cause
+            (download-storm-code conf storm-id master-code-dir localizer)
+
+            (catch NimbusLeaderNotFoundException e
+              (log-warn-error e "Nimbus leader was not available."))
+            (catch TTransportException e
+              (log-warn-error e "There was a connection problem with nimbus.")))
+          (log-message "Finished downloading code for storm id " storm-id)))
+
+      (log-debug "Writing new assignment "
+                 (pr-str new-assignment))
+      (doseq [p (set/difference (set (keys existing-assignment))
+                                (set (keys new-assignment)))]
+        (.killedWorker isupervisor (int p)))
+      (.assigned isupervisor (keys new-assignment))
+      (ls-local-assignments! local-state
+            new-assignment)
+      (reset! (:assignment-versions supervisor) versions)
+      (reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions)
+
+      (reset! (:curr-assignment supervisor) new-assignment)
+      ;; remove any downloaded code that's no longer assigned or active
+      ;; important that this happens after setting the local assignment so that
+      ;; synchronize-supervisor doesn't try to launch workers for which the
+      ;; resources don't exist
+      (if on-windows? (shutdown-disallowed-workers supervisor))
+      (doseq [storm-id all-downloaded-storm-ids]
+        (when-not (storm-code-map storm-id)
+          (log-message "Removing code for storm id "
+                       storm-id)
+          (rm-topo-files conf storm-id localizer true)))
+      (.add processes-event-manager sync-processes))))
+
+(defn mk-supervisor-capacities
+  [conf]
+  {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
+   Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
+
+(defn update-blobs-for-topology!
+  "Update each blob listed in the topology configuration if the latest version of the blob
+   has not been downloaded."
+  [conf storm-id localizer]
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        localresources (blobstore-map-to-localresources blobstore-map)]
+    (try
+      (.updateBlobs localizer localresources user)
+      (catch AuthorizationException authExp
+        (log-error authExp))
+      (catch KeyNotFoundException knf
+        (log-error knf)))))
+
+(defn update-blobs-for-all-topologies-fn
+  "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned
+  to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically
+  by a timer, created elsewhere."
+  [supervisor]
+  (fn []
+    (try-cause
+      (let [conf (:conf supervisor)
+            downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+            new-assignment @(:curr-assignment supervisor)
+            assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+        (doseq [topology-id downloaded-storm-ids]
+          (let [storm-root (supervisor-stormdist-root conf topology-id)]
+            (when (assigned-storm-ids topology-id)
+              (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root)
+              (update-blobs-for-topology! conf topology-id (:localizer supervisor))))))
+      (catch TTransportException e
+        (log-error
+          e
+          "Network error while updating blobs, will retry again later"))
+      (catch NimbusLeaderNotFoundException e
+        (log-error
+          e
+          "Nimbus unavailable to update blobs, will retry again later")))))
+
+(defn jvm-cmd [cmd]
+  (let [java-home (.get (System/getenv) "JAVA_HOME")]
+    (if (nil? java-home)
+      cmd
+      (str java-home file-path-separator "bin" file-path-separator cmd))))
+
+(defn java-cmd []
+  (jvm-cmd "java"))
+
+(defn jmap-dump-cmd [profile-cmd pid target-dir]
+  [profile-cmd pid "jmap" target-dir])
+
+(defn jstack-dump-cmd [profile-cmd pid target-dir]
+  [profile-cmd pid "jstack" target-dir])
+
+(defn jprofile-start [profile-cmd pid]
+  [profile-cmd pid "start"])
+
+(defn jprofile-stop [profile-cmd pid target-dir]
+  [profile-cmd pid "stop" target-dir])
+
+(defn jprofile-dump [profile-cmd pid workers-artifacts-directory]
+  [profile-cmd pid "dump" workers-artifacts-directory])
+
+(defn jprofile-jvm-restart [profile-cmd pid]
+  [profile-cmd pid "kill"])
+
+(defn- delete-topology-profiler-action [storm-cluster-state storm-id profile-action]
+  (log-message "Deleting profiler action.." profile-action)
+  (.delete-topology-profile-requests storm-cluster-state storm-id profile-action))
+
+(defnk launch-profiler-action-for-worker
+  "Launch profiler action for a worker"
+  [conf user target-dir command :environment {} :exit-code-on-profile-action nil :log-prefix nil]
+  (if-let [run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)]
+    (let [container-file (container-file-path target-dir)
+          script-file (script-file-path target-dir)]
+      (log-message "Running as user:" user " command:" (shell-cmd command))
+      (if (exists-file? container-file) (rmr-as-user conf container-file container-file))
+      (if (exists-file? script-file) (rmr-as-user conf script-file script-file))
+      (worker-launcher
+        conf
+        user
+        ["profiler" target-dir (write-script target-dir command :environment environment)]
+        :log-prefix log-prefix
+        :exit-code-callback exit-code-on-profile-action
+        :directory (File. target-dir)))
+    (launch-process
+      command
+      :environment environment
+      :log-prefix log-prefix
+      :exit-code-callback exit-code-on-profile-action
+      :directory (File. target-dir))))
+
+(defn mk-run-profiler-actions-for-all-topologies
+  "Returns a function that downloads all profile-actions listed for all topologies assigned
+  to this supervisor, executes those actions as user and deletes them from zookeeper."
+  [supervisor]
+  (fn []
+    (try
+      (let [conf (:conf supervisor)
+            stormid->profiler-actions @(:stormid->profiler-actions supervisor)
+            storm-cluster-state (:storm-cluster-state supervisor)
+            hostname (:my-hostname supervisor)
+            profile-cmd (conf WORKER-PROFILER-COMMAND)
+            new-assignment @(:curr-assignment supervisor)
+            assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+        (doseq [[storm-id profiler-actions] stormid->profiler-actions]
+          (when (not (empty? profiler-actions))
+            (doseq [pro-action profiler-actions]
+              (if (= hostname (:host pro-action))
+                (let [port (:port pro-action)
+                      action ^ProfileAction (:action pro-action)
+                      stop? (> (System/currentTimeMillis) (:timestamp pro-action))
+                      target-dir (worker-artifacts-root conf storm-id port)
+                      storm-conf (read-supervisor-storm-conf conf storm-id)
+                      user (storm-conf TOPOLOGY-SUBMITTER-USER)
+                      environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {})
+                      worker-pid (slurp (worker-artifacts-pid-path conf storm-id port))
+                      log-prefix (str "ProfilerAction process " storm-id ":" port " PROFILER_ACTION: " action " ")
+                      ;; Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+                      ;; The profiler plugin script validates if JVM is recording before starting another recording.
+                      command (cond
+                                (= action ProfileAction/JMAP_DUMP) (jmap-dump-cmd profile-cmd worker-pid target-dir)
+                                (= action ProfileAction/JSTACK_DUMP) (jstack-dump-cmd profile-cmd worker-pid target-dir)
+                                (= action ProfileAction/JPROFILE_DUMP) (jprofile-dump profile-cmd worker-pid target-dir)
+                                (= action ProfileAction/JVM_RESTART) (jprofile-jvm-restart profile-cmd worker-pid)
+                                (and (not stop?)
+                                     (= action ProfileAction/JPROFILE_STOP))
+                                  (jprofile-start profile-cmd worker-pid) ;; Ensure the profiler is still running
+                                (and stop? (= action ProfileAction/JPROFILE_STOP)) (jprofile-stop profile-cmd worker-pid target-dir))
+                      action-on-exit (fn [exit-code]
+                                       (log-message log-prefix " profile-action exited for code: " exit-code)
+                                       (if (and (= exit-code 0) stop?)
+                                         (delete-topology-profiler-action storm-cluster-state storm-id pro-action)))
+                      command (->> command (map str) (filter (complement empty?)))]
+
+                  (try
+                    (launch-profiler-action-for-worker conf
+                      user
+                      target-dir
+                      command
+                      :environment environment
+                      :exit-code-on-profile-action action-on-exit
+                      :log-prefix log-prefix)
+                    (catch IOException ioe
+                      (log-error ioe
+                        (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later.")))
+                    (catch RuntimeException rte
+                      (log-error rte
+                        (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later."))))))))))
+      (catch Exception e
+        (log-error e "Error running profiler actions, will retry again later")))))
+
+;; in local state, supervisor stores who its current assignments are
+;; another thread launches events to restart any dead processes if necessary
+(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
+  (log-message "Starting Supervisor with conf " conf)
+  (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
+  (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
+  (let [supervisor (supervisor-data conf shared-context isupervisor)
+        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
+        sync-processes (partial sync-processes supervisor)
+        synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
+        synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
+        downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+        run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor)
+        heartbeat-fn (fn [] (.supervisor-heartbeat!
+                               (:storm-cluster-state supervisor)
+                               (:supervisor-id supervisor)
+                               (->SupervisorInfo (current-time-secs)
+                                                 (:my-hostname supervisor)
+                                                 (:assignment-id supervisor)
+                                                 (keys @(:curr-assignment supervisor))
+                                                  ;; used ports
+                                                 (.getMetadata isupervisor)
+                                                 (conf SUPERVISOR-SCHEDULER-META)
+                                                 ((:uptime supervisor))
+                                                 (:version supervisor)
+                                                 (mk-supervisor-capacities conf))))]
+    (heartbeat-fn)
+
+    ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
+    (schedule-recurring (:heartbeat-timer supervisor)
+                        0
+                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
+                        heartbeat-fn)
+    (doseq [storm-id downloaded-storm-ids]
+      (add-blob-references (:localizer supervisor) storm-id
+        conf))
+    ;; do this after adding the references so we don't try to clean things being used
+    (.startCleaner (:localizer supervisor))
+
+    (when (conf SUPERVISOR-ENABLE)
+      ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+      ;; to date even if callbacks don't all work exactly right
+      (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+      (schedule-recurring (:event-timer supervisor)
+                          0
+                          (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+                          (fn [] (.add processes-event-manager sync-processes)))
+
+      ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
+      (schedule-recurring (:blob-update-timer supervisor)
+                          30
+                          30
+                          (fn [] (.add event-manager synchronize-blobs-fn)))
+
+      (schedule-recurring (:event-timer supervisor)
+                          (* 60 5)
+                          (* 60 5)
+                          (fn [] (let [health-code (healthcheck/health-check conf)
+                                       ids (my-worker-ids conf)]
+                                   (if (not (= health-code 0))
+                                     (do
+                                       (doseq [id ids]
+                                         (shutdown-worker supervisor id))
+                                       (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+
+      ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+      (schedule-recurring (:event-timer supervisor)
+                          30
+                          30
+                          (fn [] (.add event-manager run-profiler-actions-fn))))
+    (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
+    (reify
+     Shutdownable
+     (shutdown [this]
+               (log-message "Shutting down supervisor " (:supervisor-id supervisor))
+               (reset! (:active supervisor) false)
+               (cancel-timer (:heartbeat-timer supervisor))
+               (cancel-timer (:event-timer supervisor))
+               (cancel-timer (:blob-update-timer supervisor))
+               (.shutdown event-manager)
+               (.shutdown processes-event-manager)
+               (.shutdown (:localizer supervisor))
+               (.disconnect (:storm-cluster-state supervisor)))
+     SupervisorDaemon
+     (get-conf [this]
+       conf)
+     (get-id [this]
+       (:supervisor-id supervisor))
+     (shutdown-all-workers [this]
+       (let [ids (my-worker-ids conf)]
+         (doseq [id ids]
+           (shutdown-worker supervisor id)
+           )))
+     DaemonCommon
+     (waiting? [this]
+       (or (not @(:active supervisor))
+           (and
+            (timer-waiting? (:heartbeat-timer supervisor))
+            (timer-waiting? (:event-timer supervisor))
+            (every? (memfn waiting?) managers)))
+           ))))
+
+(defn kill-supervisor [supervisor]
+  (.shutdown supervisor)
+  )
+
+(defn setup-storm-code-dir
+  [conf storm-conf dir]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+  (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir))))
+
+(defn setup-blob-permission
+  [conf storm-conf path]
+  (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+    (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path))))
+
+(defn download-blobs-for-topology!
+  "Download all blobs listed in the topology configuration for a given topology."
+  [conf stormconf-path localizer tmproot]
+  (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        user (storm-conf TOPOLOGY-SUBMITTER-USER)
+        topo-name (storm-conf TOPOLOGY-NAME)
+        user-dir (.getLocalUserFileCacheDir localizer user)
+        localresources (blobstore-map-to-localresources blobstore-map)]
+    (when localresources
+      (when-not (.exists user-dir)
+        (FileUtils/forceMkdir user-dir))
+      (try
+        (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)]
+          (setup-blob-permission conf storm-conf (.toString user-dir))
+          (doseq [local-rsrc localized-resources]
+            (let [rsrc-file-path (File. (.getFilePath local-rsrc))
+                  key-name (.getName rsrc-file-path)
+                  blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc)))
+                  symlink-name (get-blob-localname (get blobstore-map key-name) key-name)]
+              (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name
+                blob-symlink-target-name))))
+        (catch AuthorizationException authExp
+          (log-error authExp))
+        (catch KeyNotFoundException knf
+          (log-error knf))))))
+
+(defn get-blob-file-names
+  [blobstore-map]
+  (if blobstore-map
+    (for [[k, data] blobstore-map]
+      (get-blob-localname data k))))
+
+(defn download-blobs-for-topology-succeed?
+  "Assert if all blobs are downloaded for the given topology"
+  [stormconf-path target-dir]
+  (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        file-names (get-blob-file-names blobstore-map)]
+    (if-not (empty? file-names)
+      (every? #(Utils/checkFileExists target-dir %) file-names)
+      true)))
+
+;; distributed implementation
+(defmethod download-storm-code
+  :distributed [conf storm-id master-code-dir localizer]
+  ;; Downloading to permanent location is atomic
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+        stormroot (supervisor-stormdist-root conf storm-id)
+        blobstore (Utils/getClientBlobStoreForSupervisor conf)]
+    (FileUtils/forceMkdir (File. tmproot))
+    (if-not on-windows?
+      (Utils/restrictPermissions tmproot)
+      (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+        (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
+    (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id)
+      (supervisor-stormjar-path tmproot) blobstore)
+    (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id)
+      (supervisor-stormcode-path tmproot) blobstore)
+    (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id)
+      (supervisor-stormconf-path tmproot) blobstore)
+    (.shutdown blobstore)
+    (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+    (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
+      tmproot)
+    (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
+      (do
+        (log-message "Successfully downloaded blob resources for storm-id " storm-id)
+        (FileUtils/forceMkdir (File. stormroot))
+        (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot))
+          (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE)))
+        (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot))
+      (do
+        (log-message "Failed to download blob resources for storm-id " storm-id)
+        (rmr tmproot)))))
+
+(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
+  (let [file (get-log-metadata-file conf storm-id port)]
+    ;;run worker as user needs the directory to have special permissions
+    ;; or it is insecure
+    (when (not (.exists (.getParentFile file)))
+      (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+        (do (FileUtils/forceMkdir (.getParentFile file))
+            (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
+        (.mkdirs (.getParentFile file))))
+    (let [writer (java.io.FileWriter. file)
+          yaml (Yaml.)]
+      (try
+        (.dump yaml data writer)
+        (finally
+          (.close writer))))))
+
+(defn write-log-metadata! [storm-conf user worker-id storm-id port conf]
+  (let [data {TOPOLOGY-SUBMITTER-USER user
+              "worker-id" worker-id
+              LOGS-GROUPS (sort (distinct (remove nil?
+                                           (concat
+                                             (storm-conf LOGS-GROUPS)
+                                             (storm-conf TOPOLOGY-GROUPS)))))
+              LOGS-USERS (sort (distinct (remove nil?
+                                           (concat
+                                             (storm-conf LOGS-USERS)
+                                             (storm-conf TOPOLOGY-USERS)))))}]
+    (write-log-metadata-to-yaml-file! storm-id port data conf)))
+
+(defn jlp [stormroot conf]
+  (let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
+        os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
+        arch (System/getProperty "os.arch")
+        arch-resource-root (str resource-root File/separator os "-" arch)]
+    (str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
+
+(defn substitute-childopts
+  "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap"
+  [value worker-id topology-id port mem-onheap]
+  (let [replacement-map {"%ID%"          (str port)
+                         "%WORKER-ID%"   (str worker-id)
+                         "%TOPOLOGY-ID%"    (str topology-id)
+                         "%WORKER-PORT%" (str port)
+                         "%HEAP-MEM%" (str mem-onheap)}
+        sub-fn #(reduce (fn [string entry]
+                          (apply clojure.string/replace string entry))
+                        %
+                        replacement-map)]
+    (cond
+      (nil? value) nil
+      (sequential? value) (vec (map sub-fn value))
+      :else (-> value sub-fn (clojure.string/split #"\s+")))))
+
+
+(defn create-blobstore-links
+  "Create symlinks in worker launch directory for all blobs"
+  [conf storm-id worker-id]
+  (let [stormroot (supervisor-stormdist-root conf storm-id)
+        storm-conf (read-supervisor-storm-conf conf storm-id)
+        workerroot (worker-root conf worker-id)
+        blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+        blob-file-names (get-blob-file-names blobstore-map)
+        resource-file-names (cons RESOURCES-SUBDIR blob-file-names)]
+    (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+      storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names))
+    (create-symlink! workerroot stormroot RESOURCES-SUBDIR)
+    (doseq [file-name blob-file-names]
+      (create-symlink! workerroot stormroot file-name file-name))))
+
+(defn create-artifacts-link
+  "Create a symlink from workder directory to its port artifacts directory"
+  [conf storm-id port worker-id]
+  (let [worker-dir (worker-root conf worker-id)
+        topo-dir (worker-artifacts-root conf storm-id)]
+    (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+                 storm-id " to its port artifacts directory")
+    (if (.exists (File. worker-dir))
+      (create-symlink! worker-dir topo-dir "artifacts" port))))
+
+(defmethod launch-worker
+    :distributed [supervisor storm-id port worker-id mem-onheap]
+    (let [conf (:conf supervisor)
+          run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
+          storm-home (System/getProperty "storm.home")
+          storm-options (System/getProperty "storm.options")
+          storm-conf-file (System/getProperty "storm.conf.file")
+          storm-log-dir LOG-DIR
+          storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR)
+          storm-log4j2-conf-dir (if storm-log-conf-dir
+                                  (if (is-absolute-path? storm-log-conf-dir)
+                                    storm-log-conf-dir
+                                    (str storm-home file-path-separator storm-log-conf-dir))
+                                  (str storm-home file-path-separator "log4j2"))
+          stormroot (supervisor-stormdist-root conf storm-id)
+          jlp (jlp stormroot conf)
+          stormjar (supervisor-stormjar-path stormroot)
+          storm-conf (read-supervisor-storm-conf conf storm-id)
+          topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
+                           [cp]
+                           [])
+          classpath (-> (worker-classpath)
+                        (add-to-classpath [stormjar])
+                        (add-to-classpath topo-classpath))
+          top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
+          mem-onheap (if (and mem-onheap (> mem-onheap 0)) ;; not nil and not zero
+                       (int (Math/ceil mem-onheap)) ;; round up
+                       (storm-conf WORKER-HEAP-MEMORY-MB)) ;; otherwise use default value
+          gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port mem-onheap)
+          topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS)
+          user (storm-conf TOPOLOGY-SUBMITTER-USER)
+          logfilename "worker.log"
+          workers-artifacts (worker-artifacts-root conf)
+          logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3")
+          worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
+                             (substitute-childopts s worker-id storm-id port mem-onheap))
+          topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
+                                  (substitute-childopts s worker-id storm-id port mem-onheap))
+          worker--profiler-childopts (if (conf WORKER-PROFILER-ENABLED)
+                                       (substitute-childopts (conf WORKER-PROFILER-CHILDOPTS) worker-id storm-id port mem-onheap)
+                                       "")
+          topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)]
+                                        (merge env {"LD_LIBRARY_PATH" jlp})
+                                        {"LD_LIBRARY_PATH" jlp})
+          command (concat
+                    [(java-cmd) "-cp" classpath 
+                     topo-worker-logwriter-childopts
+                     (str "-Dlogfile.name=" logfilename)
+                     (str "-Dstorm.home=" storm-home)
+                     (str "-Dworkers.artifacts=" workers-artifacts)
+                     (str "-Dstorm.id=" storm-id)
+                     (str "-Dworker.id=" worker-id)
+                     (str "-Dworker.port=" port)
+                     (str "-Dstorm.log.dir=" storm-log-dir)
+                     (str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml")
+                     (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector")
+                     "org.apache.storm.LogWriter"]
+                    [(java-cmd) "-server"]
+                    worker-childopts
+                    topo-worker-childopts
+                    gc-opts
+                    worker--profiler-childopts
+                    [(str "-Djava.library.path=" jlp)
+                     (str "-Dlogfile.name=" logfilename)
+                     (str "-Dstorm.home=" storm-home)
+                     (str "-Dworkers.artifacts=" workers-artifacts)
+                     (str "-Dstorm.conf.file=" storm-conf-file)
+                     (str "-Dstorm.options=" storm-options)
+                     (str "-Dstorm.log.dir=" storm-log-dir)
+                     (str "-Dlogging.sensitivity=" logging-sensitivity)
+                     (str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml")
+                     (str "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector")
+                     (str "-Dstorm.id=" storm-id)
+                     (str "-Dworker.id=" worker-id)
+                     (str "-Dworker.port=" port)
+                     "-cp" classpath
+                     "org.apache.storm.daemon.worker"
+                     storm-id
+                     (:assignment-id supervisor)
+                     port
+                     worker-id])
+          command (->> command (map str) (filter (complement empty?)))]
+      (log-message "Launching worker with command: " (shell-cmd command))
+      (write-log-metadata! storm-conf user worker-id storm-id port conf)
+      (set-worker-user! conf worker-id user)
+      (create-artifacts-link conf storm-id port worker-id)
+      (let [log-prefix (str "Worker Process " worker-id)
+            callback (fn [exit-code]
+                       (log-message log-prefix " exited with code: " exit-code)
+                       (add-dead-worker worker-id))
+            worker-dir (worker-root conf worker-id)]
+        (remove-dead-worker worker-id)
+        (create-blobstore-links conf storm-id worker-id)
+        (if run-worker-as-user
+          (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))
+          (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)))
+        )))
+
+;; local implementation
+
+(defn resources-jar []
+  (->> (.split (current-classpath) File/pathSeparator)
+       (filter #(.endsWith  % ".jar"))
+       (filter #(zip-contains-dir? % RESOURCES-SUBDIR))
+       first ))
+
+(defmethod download-storm-code
+  :local [conf storm-id master-code-dir localizer]
+  (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+        stormroot (supervisor-stormdist-root conf storm-id)
+        blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
+    (try
+      (FileUtils/forceMkdir (File. tmproot))
+      (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+      (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+      (finally
+        (.shutdown blob-store)))
+    (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+    (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+    (let [classloader (.getContextClassLoader (Thread/currentThread))
+          resources-jar (resources-jar)
+          url (.getResource classloader RESOURCES-SUBDIR)
+          target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+      (cond
+        resources-jar
+        (do
+          (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+          (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+        url
+        (do
+          (log-message "Copying resources at " (str url) " to " target-dir)
+          (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir)))))))
+
+(defmethod launch-worker
+    :local [supervisor storm-id port worker-id mem-onheap]
+    (let [conf (:conf supervisor)
+          pid (uuid)
+          worker (worker/mk-worker conf
+                                   (:shared-context supervisor)
+                                   storm-id
+                                   (:assignment-id supervisor)
+                                   port
+                                   worker-id)]
+      (set-worker-user! conf worker-id "")
+      (psim/register-process pid worker)
+      (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
+      ))
+
+(defn -launch
+  [supervisor]
+  (log-message "Starting supervisor for storm version '" STORM-VERSION "'")
+  (let [conf (read-storm-config)]
+    (validate-distributed-mode! conf)
+    (let [supervisor (mk-supervisor conf nil supervisor)]
+      (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))
+    (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
+    (start-metrics-reporters)))
+
+(defn standalone-supervisor []
+  (let [conf-atom (atom nil)
+        id-atom (atom nil)]
+    (reify ISupervisor
+      (prepare [this conf local-dir]
+        (reset! conf-atom conf)
+        (let [state (LocalState. local-dir)
+              curr-id (if-let [id (ls-supervisor-id state)]
+                        id
+                        (generate-supervisor-id))]
+          (ls-supervisor-id! state curr-id)
+          (reset! id-atom curr-id))
+        )
+      (confirmAssigned [this port]
+        true)
+      (getMetadata [this]
+        (doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS))))
+      (getSupervisorId [this]
+        @id-atom)
+      (getAssignmentId [this]
+        @id-atom)
+      (killedWorker [this port]
+        )
+      (assigned [this ports]
+        ))))
+
+(defn -main []
+  (setup-default-uncaught-exception-handler)
+  (-launch (standalone-supervisor)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
new file mode 100644
index 0000000..1ae9b22
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -0,0 +1,189 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.task
+  (:use [org.apache.storm.daemon common])
+  (:use [org.apache.storm config util log])
+  (:import [org.apache.storm.hooks ITaskHook])
+  (:import [org.apache.storm.tuple Tuple TupleImpl])
+  (:import [org.apache.storm.grouping LoadMapping])
+  (:import [org.apache.storm.generated SpoutSpec Bolt StateSpoutSpec StormTopology])
+  (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
+            EmitInfo BoltFailInfo BoltAckInfo])
+  (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
+  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.generated ShellComponent JavaObject])
+  (:import [org.apache.storm.spout ShellSpout])
+  (:import [java.util Collection List ArrayList])
+  (:require [org.apache.storm
+             [thrift :as thrift]
+             [stats :as stats]])
+  (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
+
+(defn mk-topology-context-builder [worker executor-data topology]
+  (let [conf (:conf worker)]
+    #(TopologyContext.
+      topology
+      (:storm-conf worker)
+      (:task->component worker)
+      (:component->sorted-tasks worker)
+      (:component->stream->fields worker)
+      (:storm-id worker)
+      (supervisor-storm-resources-path
+        (supervisor-stormdist-root conf (:storm-id worker)))
+      (worker-pids-root conf (:worker-id worker))
+      (int %)
+      (:port worker)
+      (:task-ids worker)
+      (:default-shared-resources worker)
+      (:user-shared-resources worker)
+      (:shared-executor-data executor-data)
+      (:interval->task->metric-registry executor-data)
+      (:open-or-prepare-was-called? executor-data))))
+
+(defn system-topology-context [worker executor-data tid]
+  ((mk-topology-context-builder
+    worker
+    executor-data
+    (:system-topology worker))
+   tid))
+
+(defn user-topology-context [worker executor-data tid]
+  ((mk-topology-context-builder
+    worker
+    executor-data
+    (:topology worker))
+   tid))
+
+(defn- get-task-object [^StormTopology topology component-id]
+  (let [spouts (.get_spouts topology)
+        bolts (.get_bolts topology)
+        state-spouts (.get_state_spouts topology)
+        obj (Utils/getSetComponentObject
+             (cond
+              (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))
+              (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))
+              (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))
+              true (throw-runtime "Could not find " component-id " in " topology)))
+        obj (if (instance? ShellComponent obj)
+              (if (contains? spouts component-id)
+                (ShellSpout. obj)
+                (ShellBolt. obj))
+              obj )
+        obj (if (instance? JavaObject obj)
+              (thrift/instantiate-java-object obj)
+              obj )]
+    obj
+    ))
+
+(defn get-context-hooks [^TopologyContext context]
+  (.getHooks context))
+
+(defn hooks-empty? [^Collection hooks]
+  (.isEmpty hooks))
+
+(defmacro apply-hooks [topology-context method-sym info-form]
+  (let [hook-sym (with-meta (gensym "hook") {:tag 'org.apache.storm.hooks.ITaskHook})]
+    `(let [hooks# (get-context-hooks ~topology-context)]
+       (when-not (hooks-empty? hooks#)
+         (let [info# ~info-form]
+           (fast-list-iter [~hook-sym hooks#]
+             (~method-sym ~hook-sym info#)
+             ))))))
+
+
+;; TODO: this is all expensive... should be precomputed
+(defn send-unanchored
+  [task-data stream values]
+    (let [^TopologyContext topology-context (:system-context task-data)
+          tasks-fn (:tasks-fn task-data)
+          transfer-fn (-> task-data :executor-data :transfer-fn)
+          out-tuple (TupleImpl. topology-context
+                                 values
+                                 (.getThisTaskId topology-context)
+                                 stream)]
+      (fast-list-iter [t (tasks-fn stream values)]
+        (transfer-fn t out-tuple))))
+
+(defn mk-tasks-fn [task-data]
+  (let [task-id (:task-id task-data)
+        executor-data (:executor-data task-data)
+        ^LoadMapping load-mapping (:load-mapping (:worker executor-data))
+        component-id (:component-id executor-data)
+        ^WorkerTopologyContext worker-context (:worker-context executor-data)
+        storm-conf (:storm-conf executor-data)
+        emit-sampler (mk-stats-sampler storm-conf)
+        stream->component->grouper (:stream->component->grouper executor-data)
+        user-context (:user-context task-data)
+        executor-stats (:stats executor-data)
+        debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+        
+    (fn ([^Integer out-task-id ^String stream ^List values]
+          (when debug?
+            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
+          (let [target-component (.getComponentId worker-context out-task-id)
+                component->grouping (get stream->component->grouper stream)
+                grouping (get component->grouping target-component)
+                out-task-id (if grouping out-task-id)]
+            (when (and (not-nil? grouping) (not= :direct grouping))
+              (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
+            (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
+            (when (emit-sampler)
+              (stats/emitted-tuple! executor-stats stream)
+              (if out-task-id
+                (stats/transferred-tuples! executor-stats stream 1)))
+            (if out-task-id [out-task-id])
+            ))
+        ([^String stream ^List values]
+           (when debug?
+             (log-message "Emitting: " component-id " " stream " " values))
+           (let [out-tasks (ArrayList.)]
+             (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
+               (when (= :direct grouper)
+                  ;;  TODO: this is wrong, need to check how the stream was declared
+                  (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
+               (let [comp-tasks (grouper task-id values load-mapping)]
+                 (if (or (sequential? comp-tasks) (instance? Collection comp-tasks))
+                   (.addAll out-tasks comp-tasks)
+                   (.add out-tasks comp-tasks)
+                   )))
+             (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
+             (when (emit-sampler)
+               (stats/emitted-tuple! executor-stats stream)
+               (stats/transferred-tuples! executor-stats stream (count out-tasks)))
+             out-tasks)))
+    ))
+
+(defn mk-task-data [executor-data task-id]
+  (recursive-map
+    :executor-data executor-data
+    :task-id task-id
+    :system-context (system-topology-context (:worker executor-data) executor-data task-id)
+    :user-context (user-topology-context (:worker executor-data) executor-data task-id)
+    :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data))
+    :tasks-fn (mk-tasks-fn <>)
+    :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
+
+
+(defn mk-task [executor-data task-id]
+  (let [task-data (mk-task-data executor-data task-id)
+        storm-conf (:storm-conf executor-data)]
+    (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
+      (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
+    ;; when this is called, the threads for the executor haven't been started yet,
+    ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
+    (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])
+    task-data
+    ))