You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:12 UTC

[16/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
new file mode 100644
index 0000000..9607d77
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -0,0 +1,763 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.worker
+  (:use [org.apache.storm.daemon common])
+  (:use [org.apache.storm config log util timer local-state])
+  (:require [clj-time.core :as time])
+  (:require [clj-time.coerce :as coerce])
+  (:require [org.apache.storm.daemon [executor :as executor]])
+  (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
+  (:require [clojure.set :as set])
+  (:require [org.apache.storm.messaging.loader :as msg-loader])
+  (:import [java.util.concurrent Executors]
+           [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
+  (:import [java.util ArrayList HashMap])
+  (:import [org.apache.storm.utils Utils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
+  (:import [org.apache.storm.grouping LoadMapping])
+  (:import [org.apache.storm.messaging TransportFactory])
+  (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
+  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [org.apache.storm.serialization KryoTupleSerializer])
+  (:import [org.apache.storm.generated StormTopology])
+  (:import [org.apache.storm.tuple AddressedTuple Fields])
+  (:import [org.apache.storm.task WorkerTopologyContext])
+  (:import [org.apache.storm Constants])
+  (:import [org.apache.storm.security.auth AuthUtils])
+  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
+  (:import [javax.security.auth Subject])
+  (:import [java.security PrivilegedExceptionAction])
+  (:import [org.apache.logging.log4j LogManager])
+  (:import [org.apache.logging.log4j Level])
+  (:import [org.apache.logging.log4j.core.config LoggerConfig])
+  (:import [org.apache.storm.generated LogConfig LogLevelAction])
+  (:gen-class))
+
+(defmulti mk-suicide-fn cluster-mode)
+
+(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
+  (log-message "Reading Assignments.")
+  (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
+    (doall
+     (concat
+      [Constants/SYSTEM_EXECUTOR_ID]
+      (mapcat (fn [[executor loc]]
+                (if (= loc [assignment-id port])
+                  [executor]
+                  ))
+              assignment)))))
+
+(defnk do-executor-heartbeats [worker :executors nil]
+  ;; stats is how we know what executors are assigned to this worker 
+  (let [stats (if-not executors
+                  (into {} (map (fn [e] {e nil}) (:executors worker)))
+                  (->> executors
+                    (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
+                    (apply merge)))
+        zk-hb {:storm-id (:storm-id worker)
+               :executor-stats stats
+               :uptime ((:uptime worker))
+               :time-secs (current-time-secs)
+               }]
+    ;; do the zookeeper heartbeat
+    (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
+    ))
+
+(defn do-heartbeat [worker]
+  (let [conf (:conf worker)
+        state (worker-state conf (:worker-id worker))]
+    ;; do the local-file-system heartbeat.
+    (ls-worker-heartbeat! state (current-time-secs) (:storm-id worker) (:executors worker) (:port worker))
+    (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
+                         ; it shouldn't take supervisor 120 seconds between listing dir and reading it
+
+    ))
+
+(defn worker-outbound-tasks
+  "Returns seq of task-ids that receive messages from this worker"
+  [worker]
+  (let [context (worker-context worker)
+        components (mapcat
+                     (fn [task-id]
+                       (->> (.getComponentId context (int task-id))
+                            (.getTargets context)
+                            vals
+                            (map keys)
+                            (apply concat)))
+                     (:task-ids worker))]
+    (-> worker
+        :task->component
+        reverse-map
+        (select-keys components)
+        vals
+        flatten
+        set )))
+
+(defn get-dest
+  [^AddressedTuple addressed-tuple]
+  "get the destination for an AddressedTuple"
+  (.getDest addressed-tuple))
+
+(defn mk-transfer-local-fn [worker]
+  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
+        task->short-executor (:task->short-executor worker)
+        task-getter (comp #(get task->short-executor %) get-dest)]
+    (fn [tuple-batch]
+      (let [grouped (fast-group-by task-getter tuple-batch)]
+        (fast-map-iter [[short-executor pairs] grouped]
+          (let [q (short-executor-receive-queue-map short-executor)]
+            (if q
+              (disruptor/publish q pairs)
+              (log-warn "Received invalid messages for unknown tasks. Dropping... ")
+              )))))))
+
+(defn- assert-can-serialize [^KryoTupleSerializer serializer tuple-batch]
+  "Check that all of the tuples can be serialized by serializing them."
+  (fast-list-iter [[task tuple :as pair] tuple-batch]
+    (.serialize serializer tuple)))
+
+(defn- mk-backpressure-handler [executors]
+  "make a handler that checks and updates worker's backpressure flag"
+  (disruptor/worker-backpressure-handler
+    (fn [worker]
+      (let [storm-id (:storm-id worker)
+            assignment-id (:assignment-id worker)
+            port (:port worker)
+            storm-cluster-state (:storm-cluster-state worker)
+            prev-backpressure-flag @(:backpressure worker)]
+        (when executors
+          (reset! (:backpressure worker)
+                  (or @(:transfer-backpressure worker)
+                      (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))))
+        ;; update the worker's backpressure flag to zookeeper only when it has changed
+        (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag)
+        (when (not= prev-backpressure-flag @(:backpressure worker))
+          (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker)))
+        ))))
+
+(defn- mk-disruptor-backpressure-handler [worker]
+  "make a handler for the worker's send disruptor queue to
+  check highWaterMark and lowWaterMark for backpressure"
+  (disruptor/disruptor-backpressure-handler
+    (fn []
+      (reset! (:transfer-backpressure worker) true)
+      (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
+    (fn []
+      (reset! (:transfer-backpressure worker) false)
+      (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
+
+(defn mk-transfer-fn [worker]
+  (let [local-tasks (-> worker :task-ids set)
+        local-transfer (:transfer-local-fn worker)
+        ^DisruptorQueue transfer-queue (:transfer-queue worker)
+        task->node+port (:cached-task->node+port worker)
+        try-serialize-local ((:storm-conf worker) TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE)
+
+        transfer-fn
+          (fn [^KryoTupleSerializer serializer tuple-batch]
+            (let [^ArrayList local (ArrayList.)
+                  ^HashMap remoteMap (HashMap.)]
+              (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
+                (let [task (.getDest addressed-tuple)
+                      tuple (.getTuple addressed-tuple)]
+                  (if (local-tasks task)
+                    (.add local addressed-tuple)
+
+                    ;;Using java objects directly to avoid performance issues in java code
+                    (do
+                      (when (not (.get remoteMap task))
+                        (.put remoteMap task (ArrayList.)))
+                      (let [^ArrayList remote (.get remoteMap task)]
+                        (if (not-nil? task)
+                          (.add remote (TaskMessage. task ^bytes (.serialize serializer tuple)))
+                          (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
+                       )))))
+
+              (when (not (.isEmpty local)) (local-transfer local))
+              (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
+    (if try-serialize-local
+      (do
+        (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
+        (fn [^KryoTupleSerializer serializer tuple-batch]
+          (assert-can-serialize serializer tuple-batch)
+          (transfer-fn serializer tuple-batch)))
+      transfer-fn)))
+
+(defn- mk-receive-queue-map [storm-conf executors]
+  (->> executors
+       ;; TODO: this depends on the type of executor
+       (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
+                                                  (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
+                                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                                  :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
+                                                  :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
+       (into {})
+       ))
+
+(defn- stream->fields [^StormTopology topology component]
+  (->> (ThriftTopologyUtils/getComponentCommon topology component)
+       .get_streams
+       (map (fn [[s info]] [s (Fields. (.get_output_fields info))]))
+       (into {})
+       (HashMap.)))
+
+(defn component->stream->fields [^StormTopology topology]
+  (->> (ThriftTopologyUtils/getComponentIds topology)
+       (map (fn [c] [c (stream->fields topology c)]))
+       (into {})
+       (HashMap.)))
+
+(defn- mk-default-resources [worker]
+  (let [conf (:conf worker)
+        thread-pool-size (int (conf TOPOLOGY-WORKER-SHARED-THREAD-POOL-SIZE))]
+    {WorkerTopologyContext/SHARED_EXECUTOR (Executors/newFixedThreadPool thread-pool-size)}
+    ))
+
+(defn- mk-user-resources [worker]
+  ;;TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources.
+  ;; this would be part of the initialization hook
+  ;; need to separate workertopologycontext into WorkerContext and WorkerUserContext.
+  ;; actually just do it via interfaces. just need to make sure to hide setResource from tasks
+  {})
+
+(defn mk-halting-timer [timer-name]
+  (mk-timer :kill-fn (fn [t]
+                       (log-error t "Error when processing event")
+                       (exit-process! 20 "Error when processing an event")
+                       )
+            :timer-name timer-name))
+
+(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
+  (let [assignment-versions (atom {})
+        executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
+        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+                                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                                  :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
+                                                  :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
+        executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+
+        receive-queue-map (->> executor-receive-queue-map
+                               (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
+                               (into {}))
+
+        topology (read-supervisor-topology conf storm-id)
+        mq-context  (if mq-context
+                      mq-context
+                      (TransportFactory/makeContext storm-conf))]
+
+    (recursive-map
+      :conf conf
+      :mq-context mq-context
+      :receiver (.bind ^IContext mq-context storm-id port)
+      :storm-id storm-id
+      :assignment-id assignment-id
+      :port port
+      :worker-id worker-id
+      :cluster-state cluster-state
+      :storm-cluster-state storm-cluster-state
+      ;; when worker bootup, worker will start to setup initial connections to
+      ;; other workers. When all connection is ready, we will enable this flag
+      ;; and spout and bolt will be activated.
+      :worker-active-flag (atom false)
+      :storm-active-atom (atom false)
+      :storm-component->debug-atom (atom {})
+      :executors executors
+      :task-ids (->> receive-queue-map keys (map int) sort)
+      :storm-conf storm-conf
+      :topology topology
+      :system-topology (system-topology! storm-conf topology)
+      :heartbeat-timer (mk-halting-timer "heartbeat-timer")
+      :refresh-load-timer (mk-halting-timer "refresh-load-timer")
+      :refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
+      :refresh-credentials-timer (mk-halting-timer "refresh-credentials-timer")
+      :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
+      :refresh-active-timer (mk-halting-timer "refresh-active-timer")
+      :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+      :user-timer (mk-halting-timer "user-timer")
+      :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
+      :component->stream->fields (component->stream->fields (:system-topology <>))
+      :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
+      :endpoint-socket-lock (mk-rw-lock)
+      :cached-node+port->socket (atom {})
+      :cached-task->node+port (atom {})
+      :transfer-queue transfer-queue
+      :executor-receive-queue-map executor-receive-queue-map
+      :short-executor-receive-queue-map (map-key first executor-receive-queue-map)
+      :task->short-executor (->> executors
+                                 (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
+                                 (into {})
+                                 (HashMap.))
+      :suicide-fn (mk-suicide-fn conf)
+      :uptime (uptime-computer)
+      :default-shared-resources (mk-default-resources <>)
+      :user-shared-resources (mk-user-resources <>)
+      :transfer-local-fn (mk-transfer-local-fn <>)
+      :transfer-fn (mk-transfer-fn <>)
+      :load-mapping (LoadMapping.)
+      :assignment-versions assignment-versions
+      :backpressure (atom false) ;; whether this worker is going slow
+      :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
+      :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
+      :throttle-on (atom false) ;; whether throttle is activated for spouts
+      )))
+
+(defn- endpoint->string [[node port]]
+  (str port "/" node))
+
+(defn string->endpoint [^String s]
+  (let [[port-str node] (.split s "/" 2)]
+    [node (Integer/valueOf port-str)]
+    ))
+
+(def LOAD-REFRESH-INTERVAL-MS 5000)
+
+(defn mk-refresh-load [worker]
+  (let [local-tasks (set (:task-ids worker))
+        remote-tasks (set/difference (worker-outbound-tasks worker) local-tasks)
+        short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
+        next-update (atom 0)]
+    (fn this
+      ([]
+        (let [^LoadMapping load-mapping (:load-mapping worker)
+              local-pop (map-val (fn [queue]
+                                   (let [q-metrics (.getMetrics queue)]
+                                     (/ (double (.population q-metrics)) (.capacity q-metrics))))
+                                 short-executor-receive-queue-map)
+              remote-load (reduce merge (for [[np conn] @(:cached-node+port->socket worker)] (into {} (.getLoad conn remote-tasks))))
+              now (System/currentTimeMillis)]
+          (.setLocal load-mapping local-pop)
+          (.setRemote load-mapping remote-load)
+          (when (> now @next-update)
+            (.sendLoadMetrics (:receiver worker) local-pop)
+            (reset! next-update (+ LOAD-REFRESH-INTERVAL-MS now))))))))
+
+(defn mk-refresh-connections [worker]
+  (let [outbound-tasks (worker-outbound-tasks worker)
+        conf (:conf worker)
+        storm-cluster-state (:storm-cluster-state worker)
+        storm-id (:storm-id worker)]
+    (fn this
+      ([]
+        (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
+      ([callback]
+         (let [version (.assignment-version storm-cluster-state storm-id callback)
+               assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
+                            (:data (get @(:assignment-versions worker) storm-id))
+                            (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)]
+                              (swap! (:assignment-versions worker) assoc storm-id new-assignment)
+                              (:data new-assignment)))
+              my-assignment (-> assignment
+                                :executor->node+port
+                                to-task->node+port
+                                (select-keys outbound-tasks)
+                                (#(map-val endpoint->string %)))
+              ;; we dont need a connection for the local tasks anymore
+              needed-assignment (->> my-assignment
+                                      (filter-key (complement (-> worker :task-ids set))))
+              needed-connections (-> needed-assignment vals set)
+              needed-tasks (-> needed-assignment keys)
+
+              current-connections (set (keys @(:cached-node+port->socket worker)))
+              new-connections (set/difference needed-connections current-connections)
+              remove-connections (set/difference current-connections needed-connections)]
+              (swap! (:cached-node+port->socket worker)
+                     #(HashMap. (merge (into {} %1) %2))
+                     (into {}
+                       (dofor [endpoint-str new-connections
+                               :let [[node port] (string->endpoint endpoint-str)]]
+                         [endpoint-str
+                          (.connect
+                           ^IContext (:mq-context worker)
+                           storm-id
+                           ((:node->host assignment) node)
+                           port)
+                          ]
+                         )))
+              (write-locked (:endpoint-socket-lock worker)
+                (reset! (:cached-task->node+port worker)
+                        (HashMap. my-assignment)))
+              (doseq [endpoint remove-connections]
+                (.close (get @(:cached-node+port->socket worker) endpoint)))
+              (apply swap!
+                     (:cached-node+port->socket worker)
+                     #(HashMap. (apply dissoc (into {} %1) %&))
+                     remove-connections)
+
+           )))))
+
+(defn refresh-storm-active
+  ([worker]
+    (refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
+  ([worker callback]
+    (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
+      (reset!
+        (:storm-active-atom worker)
+        (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
+      (reset! (:storm-component->debug-atom worker) (-> base :component->debug))
+      (log-debug "Event debug options " @(:storm-component->debug-atom worker)))))
+
+;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
+(defn mk-transfer-tuples-handler [worker]
+  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
+        drainer (TransferDrainer.)
+        node+port->socket (:cached-node+port->socket worker)
+        task->node+port (:cached-task->node+port worker)
+        endpoint-socket-lock (:endpoint-socket-lock worker)
+        ]
+    (disruptor/clojure-handler
+      (fn [packets _ batch-end?]
+        (.add drainer packets)
+
+        (when batch-end?
+          (read-locked endpoint-socket-lock
+             (let [node+port->socket @node+port->socket
+                   task->node+port @task->node+port]
+               (.send drainer task->node+port node+port->socket)))
+          (.clear drainer))))))
+
+;; Check whether this messaging connection is ready to send data
+(defn is-connection-ready [^IConnection connection]
+  (if (instance?  ConnectionWithStatus connection)
+    (let [^ConnectionWithStatus connection connection
+          status (.status connection)]
+      (= status ConnectionWithStatus$Status/Ready))
+    true))
+
+;; all connections are ready
+(defn all-connections-ready [worker]
+    (let [connections (vals @(:cached-node+port->socket worker))]
+      (every? is-connection-ready connections)))
+
+;; we will wait all connections to be ready and then activate the spout/bolt
+;; when the worker bootup
+(defn activate-worker-when-all-connections-ready
+  [worker]
+  (let [timer (:refresh-active-timer worker)
+        delay-secs 0
+        recur-secs 1]
+    (schedule timer
+      delay-secs
+      (fn this []
+        (if (all-connections-ready worker)
+          (do
+            (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+              " with id "(:worker-id worker))
+            (reset! (:worker-active-flag worker) true))
+          (schedule timer recur-secs this :check-active false)
+            )))))
+
+(defn register-callbacks [worker]
+  (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
+  (msg-loader/register-callback (:transfer-local-fn worker)
+                                (:receiver worker)
+                                (:storm-conf worker)
+                                (worker-context worker)))
+
+(defn- close-resources [worker]
+  (let [dr (:default-shared-resources worker)]
+    (log-message "Shutting down default resources")
+    (.shutdownNow (get dr WorkerTopologyContext/SHARED_EXECUTOR))
+    (log-message "Shut down default resources")))
+
+(defn- get-logger-levels []
+  (into {}
+    (let [logger-config (.getConfiguration (LogManager/getContext false))]
+      (for [[logger-name logger] (.getLoggers logger-config)]
+        {logger-name (.getLevel logger)}))))
+
+(defn set-logger-level [logger-context logger-name new-level]
+  (let [config (.getConfiguration logger-context)
+        logger-config (.getLoggerConfig config logger-name)]
+    (if (not (= (.getName logger-config) logger-name))
+      ;; create a new config. Make it additive (true) s.t. inherit
+      ;; parents appenders
+      (let [new-logger-config (LoggerConfig. logger-name new-level true)]
+        (log-message "Adding config for: " new-logger-config " with level: " new-level)
+        (.addLogger config logger-name new-logger-config))
+      (do
+        (log-message "Setting " logger-config " log level to: " new-level)
+        (.setLevel logger-config new-level)))))
+
+;; function called on timer to reset log levels last set to DEBUG
+;; also called from process-log-config-change
+(defn reset-log-levels [latest-log-config-atom]
+  (let [latest-log-config @latest-log-config-atom
+        logger-context (LogManager/getContext false)]
+    (doseq [[logger-name logger-setting] (sort latest-log-config)]
+      (let [timeout (:timeout logger-setting)
+            target-log-level (:target-log-level logger-setting)
+            reset-log-level (:reset-log-level logger-setting)]
+        (when (> (coerce/to-long (time/now)) timeout)
+          (log-message logger-name ": Resetting level to " reset-log-level) 
+          (set-logger-level logger-context logger-name reset-log-level)
+          (swap! latest-log-config-atom
+            (fn [prev]
+              (dissoc prev logger-name))))))
+    (.updateLoggers logger-context)))
+
+;; when a new log level is received from zookeeper, this function is called
+(defn process-log-config-change [latest-log-config original-log-levels log-config]
+  (when log-config
+    (log-debug "Processing received log config: " log-config)
+    ;; merge log configs together
+    (let [loggers (.get_named_logger_level log-config)
+          logger-context (LogManager/getContext false)]
+      (def new-log-configs
+        (into {}
+         ;; merge named log levels
+         (for [[msg-logger-name logger-level] loggers]
+           (let [logger-name (if (= msg-logger-name "ROOT")
+                                LogManager/ROOT_LOGGER_NAME
+                                msg-logger-name)]
+             ;; the new-timeouts map now contains logger => timeout 
+             (when (.is_set_reset_log_level_timeout_epoch logger-level)
+               {logger-name {:action (.get_action logger-level)
+                             :target-log-level (Level/toLevel (.get_target_log_level logger-level))
+                             :reset-log-level (or (.get @original-log-levels logger-name) (Level/INFO))
+                             :timeout (.get_reset_log_level_timeout_epoch logger-level)}})))))
+
+      ;; look for deleted log timeouts
+      (doseq [[logger-name logger-val] (sort @latest-log-config)]
+        (when (not (contains? new-log-configs logger-name))
+          ;; if we had a timeout, but the timeout is no longer active
+          (set-logger-level
+            logger-context logger-name (:reset-log-level logger-val))))
+
+      ;; apply new log settings we just received
+      ;; the merged configs are only for the reset logic
+      (doseq [[msg-logger-name logger-level] (sort (into {} (.get_named_logger_level log-config)))]
+        (let [logger-name (if (= msg-logger-name "ROOT")
+                                LogManager/ROOT_LOGGER_NAME
+                                msg-logger-name)
+              level (Level/toLevel (.get_target_log_level logger-level))
+              action (.get_action logger-level)]
+          (if (= action LogLevelAction/UPDATE)
+            (set-logger-level logger-context logger-name level))))
+   
+      (.updateLoggers logger-context)
+      (reset! latest-log-config new-log-configs)
+      (log-debug "New merged log config is " @latest-log-config))))
+
+(defn run-worker-start-hooks [worker]
+  (let [topology (:topology worker)
+        topo-conf (:storm-conf worker)
+        worker-topology-context (worker-context worker)
+        hooks (.get_worker_hooks topology)]
+    (dofor [hook hooks]
+      (let [hook-bytes (Utils/toByteArray hook)
+            deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
+        (.start deser-hook topo-conf worker-topology-context)))))
+
+(defn run-worker-shutdown-hooks [worker]
+  (let [topology (:topology worker)
+        hooks (.get_worker_hooks topology)]
+    (dofor [hook hooks]
+      (let [hook-bytes (Utils/toByteArray hook)
+            deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
+        (.shutdown deser-hook)))))
+
+;; TODO: should worker even take the storm-id as input? this should be
+;; deducable from cluster state (by searching through assignments)
+;; what about if there's inconsistency in assignments? -> but nimbus
+;; should guarantee this consistency
+(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
+  (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
+               " and conf " conf)
+  (if-not (local-mode? conf)
+    (redirect-stdio-to-slf4j!))
+  ;; because in local mode, its not a separate
+  ;; process. supervisor will register it in this case
+  (when (= :distributed (cluster-mode conf))
+    (let [pid (process-pid)]
+      (touch (worker-pid-path conf worker-id pid))
+      (spit (worker-artifacts-pid-path conf storm-id port) pid)))
+
+  (declare establish-log-setting-callback)
+
+  ;; start out with empty list of timeouts 
+  (def latest-log-config (atom {}))
+  (def original-log-levels (atom {}))
+
+  (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+        storm-conf (override-login-config-with-system-property storm-conf)
+        acls (Utils/getWorkerACL storm-conf)
+        cluster-state (cluster/mk-distributed-cluster-state conf :auth-conf storm-conf :acls acls :context (ClusterStateContext. DaemonType/WORKER))
+        storm-cluster-state (cluster/mk-storm-cluster-state cluster-state :acls acls)
+        initial-credentials (.credentials storm-cluster-state storm-id nil)
+        auto-creds (AuthUtils/GetAutoCredentials storm-conf)
+        subject (AuthUtils/populateSubject nil auto-creds initial-credentials)]
+      (Subject/doAs subject (reify PrivilegedExceptionAction
+        (run [this]
+          (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state)
+        heartbeat-fn #(do-heartbeat worker)
+
+        ;; do this here so that the worker process dies if this fails
+        ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
+        _ (heartbeat-fn)
+
+        executors (atom nil)
+        ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+        ;; to the supervisor
+        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
+        _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
+
+        _ (register-callbacks worker)
+
+        refresh-connections (mk-refresh-connections worker)
+        refresh-load (mk-refresh-load worker)
+
+        _ (refresh-connections nil)
+
+        _ (activate-worker-when-all-connections-ready worker)
+
+        _ (refresh-storm-active worker nil)
+
+        _ (run-worker-start-hooks worker)
+
+        _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
+
+        transfer-tuples (mk-transfer-tuples-handler worker)
+        
+        transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)               
+
+        disruptor-handler (mk-disruptor-backpressure-handler worker)
+        _ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)
+        _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
+              (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
+              (.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)))
+        backpressure-handler (mk-backpressure-handler @executors)        
+        backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
+        _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) 
+            (.start backpressure-thread))
+        callback (fn cb [& ignored]
+                   (let [throttle-on (.topology-backpressure storm-cluster-state storm-id cb)]
+                     (reset! (:throttle-on worker) throttle-on)))
+        _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+            (.topology-backpressure storm-cluster-state storm-id callback))
+
+        shutdown* (fn []
+                    (log-message "Shutting down worker " storm-id " " assignment-id " " port)
+                    (doseq [[_ socket] @(:cached-node+port->socket worker)]
+                      ;; this will do best effort flushing since the linger period
+                      ;; was set on creation
+                      (.close socket))
+                    (log-message "Terminating messaging context")
+                    (log-message "Shutting down executors")
+                    (doseq [executor @executors] (.shutdown executor))
+                    (log-message "Shut down executors")
+
+                    ;;this is fine because the only time this is shared is when it's a local context,
+                    ;;in which case it's a noop
+                    (.term ^IContext (:mq-context worker))
+                    (log-message "Shutting down transfer thread")
+                    (disruptor/halt-with-interrupt! (:transfer-queue worker))
+
+                    (.interrupt transfer-thread)
+                    (.join transfer-thread)
+                    (log-message "Shut down transfer thread")
+                    (.interrupt backpressure-thread)
+                    (.join backpressure-thread)
+                    (log-message "Shut down backpressure thread")
+                    (cancel-timer (:heartbeat-timer worker))
+                    (cancel-timer (:refresh-connections-timer worker))
+                    (cancel-timer (:refresh-credentials-timer worker))
+                    (cancel-timer (:refresh-active-timer worker))
+                    (cancel-timer (:executor-heartbeat-timer worker))
+                    (cancel-timer (:user-timer worker))
+                    (cancel-timer (:refresh-load-timer worker))
+
+                    (close-resources worker)
+
+                    (log-message "Trigger any worker shutdown hooks")
+                    (run-worker-shutdown-hooks worker)
+
+                    (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port)
+                    (log-message "Disconnecting from storm cluster state context")
+                    (.disconnect (:storm-cluster-state worker))
+                    (.close (:cluster-state worker))
+                    (log-message "Shut down worker " storm-id " " assignment-id " " port))
+        ret (reify
+             Shutdownable
+             (shutdown
+              [this]
+              (shutdown*))
+             DaemonCommon
+             (waiting? [this]
+               (and
+                 (timer-waiting? (:heartbeat-timer worker))
+                 (timer-waiting? (:refresh-connections-timer worker))
+                 (timer-waiting? (:refresh-load-timer worker))
+                 (timer-waiting? (:refresh-credentials-timer worker))
+                 (timer-waiting? (:refresh-active-timer worker))
+                 (timer-waiting? (:executor-heartbeat-timer worker))
+                 (timer-waiting? (:user-timer worker))
+                 ))
+             )
+        credentials (atom initial-credentials)
+        check-credentials-changed (fn []
+                                    (let [new-creds (.credentials (:storm-cluster-state worker) storm-id nil)]
+                                      (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
+                                        (AuthUtils/updateSubject subject auto-creds new-creds)
+                                        (dofor [e @executors] (.credentials-changed e new-creds))
+                                        (reset! credentials new-creds))))
+       check-throttle-changed (fn []
+                                (let [callback (fn cb [& ignored]
+                                                 (let [throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id cb)]
+                                                   (reset! (:throttle-on worker) throttle-on)))
+                                      new-throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id callback)]
+                                    (reset! (:throttle-on worker) new-throttle-on)))
+        check-log-config-changed (fn []
+                                  (let [log-config (.topology-log-config (:storm-cluster-state worker) storm-id nil)]
+                                    (process-log-config-change latest-log-config original-log-levels log-config)
+                                    (establish-log-setting-callback)))]
+    (reset! original-log-levels (get-logger-levels))
+    (log-message "Started with log levels: " @original-log-levels)
+  
+    (defn establish-log-setting-callback []
+      (.topology-log-config (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed))))
+
+    (establish-log-setting-callback)
+    (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
+    (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
+                        (fn [& args]
+                          (check-credentials-changed)
+                          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+                            (check-throttle-changed))))
+    ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
+    (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
+      (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
+    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
+    (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
+    (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
+
+    (log-message "Worker has topology config " (redact-value (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
+    (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
+    ret
+    ))))))
+
+(defmethod mk-suicide-fn
+  :local [conf]
+  (fn [] (exit-process! 1 "Worker died")))
+
+(defmethod mk-suicide-fn
+  :distributed [conf]
+  (fn [] (exit-process! 1 "Worker died")))
+
+(defn -main [storm-id assignment-id port-str worker-id]
+  (let [conf (read-storm-config)]
+    (setup-default-uncaught-exception-handler)
+    (validate-distributed-mode! conf)
+    (let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)]
+      (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj
new file mode 100644
index 0000000..1546b3f
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -0,0 +1,89 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.disruptor
+  (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
+  (:import [com.lmax.disruptor.dsl ProducerType])
+  (:require [clojure [string :as str]])
+  (:require [clojure [set :as set]])
+  (:use [clojure walk])
+  (:use [org.apache.storm util log]))
+
+(def PRODUCER-TYPE
+  {:multi-threaded ProducerType/MULTI
+   :single-threaded ProducerType/SINGLE})
+
+(defnk disruptor-queue
+  [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+  (DisruptorQueue. queue-name
+                   (PRODUCER-TYPE producer-type) buffer-size
+                   timeout batch-size batch-timeout))
+
+(defn clojure-handler
+  [afn]
+  (reify com.lmax.disruptor.EventHandler
+    (onEvent
+      [this o seq-id batchEnd?]
+      (afn o seq-id batchEnd?))))
+
+(defn disruptor-backpressure-handler
+  [afn-high-wm afn-low-wm]
+  (reify DisruptorBackpressureCallback
+    (highWaterMark
+      [this]
+      (afn-high-wm))
+    (lowWaterMark
+      [this]
+      (afn-low-wm))))
+
+(defn worker-backpressure-handler
+  [afn]
+  (reify WorkerBackpressureCallback
+    (onEvent
+      [this o]
+      (afn o))))
+
+(defmacro handler
+  [& args]
+  `(clojure-handler (fn ~@args)))
+
+(defn publish
+  [^DisruptorQueue q o]
+  (.publish q o))
+
+(defn consume-batch
+  [^DisruptorQueue queue handler]
+  (.consumeBatch queue handler))
+
+(defn consume-batch-when-available
+  [^DisruptorQueue queue handler]
+  (.consumeBatchWhenAvailable queue handler))
+
+(defn halt-with-interrupt!
+  [^DisruptorQueue queue]
+  (.haltWithInterrupt queue))
+
+(defnk consume-loop*
+  [^DisruptorQueue queue handler
+   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
+  (async-loop
+          (fn [] (consume-batch-when-available queue handler) 0)
+          :kill-fn kill-fn
+          :thread-name (.getName queue)))
+
+(defmacro consume-loop [queue & handler-args]
+  `(let [handler# (handler ~@handler-args)]
+     (consume-loop* ~queue handler#)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/event.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/event.clj b/storm-core/src/clj/org/apache/storm/event.clj
new file mode 100644
index 0000000..edc7616
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/event.clj
@@ -0,0 +1,71 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.event
+  (:use [org.apache.storm log util])
+  (:import [org.apache.storm.utils Time Utils])
+  (:import [java.io InterruptedIOException])
+  (:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))
+
+(defprotocol EventManager
+  (add [this event-fn])
+  (waiting? [this])
+  (shutdown [this]))
+
+(defn event-manager
+  "Creates a thread to respond to events. Any error will cause process to halt"
+  [daemon?]
+  (let [added (atom 0)
+        processed (atom 0)
+        ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
+        running (atom true)
+        runner (Thread.
+                 (fn []
+                   (try-cause
+                     (while @running
+                       (let [r (.take queue)]
+                         (r)
+                         (swap! processed inc)))
+                     (catch InterruptedIOException t
+                       (log-message "Event manager interrupted while doing IO"))
+                     (catch InterruptedException t
+                       (log-message "Event manager interrupted"))
+                     (catch Throwable t
+                       (log-error t "Error when processing event")
+                       (exit-process! 20 "Error when processing an event")))))]
+    (.setDaemon runner daemon?)
+    (.start runner)
+    (reify
+      EventManager
+
+      (add
+        [this event-fn]
+        ;; should keep track of total added and processed to know if this is finished yet
+        (when-not @running
+          (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
+        (swap! added inc)
+        (.put queue event-fn))
+
+      (waiting?
+        [this]
+        (or (Time/isThreadWaiting runner)
+            (= @processed @added)))
+
+      (shutdown
+        [this]
+        (reset! running false)
+        (.interrupt runner)
+        (.join runner)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/local_state.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state.clj b/storm-core/src/clj/org/apache/storm/local_state.clj
new file mode 100644
index 0000000..a95a85b
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/local_state.clj
@@ -0,0 +1,131 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.local-state
+  (:use [org.apache.storm log util])
+  (:import [org.apache.storm.generated StormTopology
+            InvalidTopologyException GlobalStreamId
+            LSSupervisorId LSApprovedWorkers
+            LSSupervisorAssignments LocalAssignment
+            ExecutorInfo LSWorkerHeartbeat
+            LSTopoHistory LSTopoHistoryList
+            WorkerResources])
+  (:import [org.apache.storm.utils LocalState]))
+
+(def LS-WORKER-HEARTBEAT "worker-heartbeat")
+(def LS-ID "supervisor-id")
+(def LS-LOCAL-ASSIGNMENTS "local-assignments")
+(def LS-APPROVED-WORKERS "approved-workers")
+(def LS-TOPO-HISTORY "topo-hist")
+
+(defn ->LSTopoHistory
+  [{topoid :topoid timestamp :timestamp users :users groups :groups}]
+  (LSTopoHistory. topoid timestamp users groups))
+
+(defn ->topo-history
+  [thrift-topo-hist]
+  {
+    :topoid (.get_topology_id thrift-topo-hist)
+    :timestamp (.get_time_stamp thrift-topo-hist)
+    :users (.get_users thrift-topo-hist)
+    :groups (.get_groups thrift-topo-hist)})
+
+(defn ls-topo-hist!
+  [^LocalState local-state hist-list]
+  (.put local-state LS-TOPO-HISTORY
+    (LSTopoHistoryList. (map ->LSTopoHistory hist-list))))
+
+(defn ls-topo-hist
+  [^LocalState local-state]
+  (if-let [thrift-hist-list (.get local-state LS-TOPO-HISTORY)]
+    (map ->topo-history (.get_topo_history thrift-hist-list))))
+
+(defn ls-supervisor-id!
+  [^LocalState local-state ^String id]
+    (.put local-state LS-ID (LSSupervisorId. id)))
+
+(defn ls-supervisor-id
+  [^LocalState local-state]
+  (if-let [super-id (.get local-state LS-ID)]
+    (.get_supervisor_id super-id)))
+
+(defn ls-approved-workers!
+  [^LocalState local-state workers]
+    (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers)))
+
+(defn ls-approved-workers
+  [^LocalState local-state]
+  (if-let [tmp (.get local-state LS-APPROVED-WORKERS)]
+    (into {} (.get_approved_workers tmp))))
+
+(defn ->ExecutorInfo
+  [[low high]] (ExecutorInfo. low high))
+
+(defn ->ExecutorInfo-list
+  [executors]
+  (map ->ExecutorInfo executors))
+
+(defn ->executor-list
+  [executors]
+  (into [] 
+    (for [exec-info executors] 
+      [(.get_task_start exec-info) (.get_task_end exec-info)])))
+
+(defn ->LocalAssignment
+  [{storm-id :storm-id executors :executors resources :resources}]
+  (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))]
+    (if resources (.set_resources assignment
+                                  (doto (WorkerResources. )
+                                    (.set_mem_on_heap (first resources))
+                                    (.set_mem_off_heap (second resources))
+                                    (.set_cpu (last resources)))))
+    assignment))
+
+(defn mk-local-assignment
+  [storm-id executors resources]
+  {:storm-id storm-id :executors executors :resources resources})
+
+(defn ->local-assignment
+  [^LocalAssignment thrift-local-assignment]
+    (mk-local-assignment
+      (.get_topology_id thrift-local-assignment)
+      (->executor-list (.get_executors thrift-local-assignment))
+      (.get_resources thrift-local-assignment)))
+
+(defn ls-local-assignments!
+  [^LocalState local-state assignments]
+    (let [local-assignment-map (map-val ->LocalAssignment assignments)]
+    (.put local-state LS-LOCAL-ASSIGNMENTS 
+          (LSSupervisorAssignments. local-assignment-map))))
+
+(defn ls-local-assignments
+  [^LocalState local-state]
+    (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)]
+      (map-val
+        ->local-assignment
+        (.get_assignments thrift-local-assignments))))
+
+(defn ls-worker-heartbeat!
+  [^LocalState local-state time-secs storm-id executors port]
+  (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false))
+
+(defn ls-worker-heartbeat 
+  [^LocalState local-state]
+  (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)]
+    {:time-secs (.get_time_secs worker-hb)
+     :storm-id (.get_topology_id worker-hb)
+     :executors (->executor-list (.get_executors worker-hb))
+     :port (.get_port worker-hb)}))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/log.clj b/storm-core/src/clj/org/apache/storm/log.clj
new file mode 100644
index 0000000..96570e3
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/log.clj
@@ -0,0 +1,56 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.log
+  (:require [clojure.tools.logging :as log])
+  (:use [clojure pprint])
+  (:import [java.io StringWriter]))
+
+(defmacro log-message
+  [& args]
+  `(log/info (str ~@args)))
+
+(defmacro log-error
+  [e & args]
+  `(log/log :error ~e (str ~@args)))
+
+(defmacro log-debug
+  [& args]
+  `(log/debug (str ~@args)))
+
+(defmacro log-warn-error
+  [e & args]
+  `(log/warn (str ~@args) ~e))
+
+(defmacro log-warn
+  [& args]
+  `(log/warn (str ~@args)))
+
+(defn log-capture!
+  [& args]
+  (apply log/log-capture! args))
+
+(defn log-stream
+  [& args]
+  (apply log/log-stream args))
+
+(defmacro log-pprint
+  [& args]
+  `(let [^StringWriter writer# (StringWriter.)]
+     (doall
+       (for [object# [~@args]]
+         (pprint object# writer#)))
+     (log-message "\n" writer#)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/messaging/loader.clj b/storm-core/src/clj/org/apache/storm/messaging/loader.clj
new file mode 100644
index 0000000..b190ab0
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/messaging/loader.clj
@@ -0,0 +1,34 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.messaging.loader
+  (:import [org.apache.storm.messaging IConnection DeserializingConnectionCallback])
+  (:require [org.apache.storm.messaging [local :as local]]))
+
+(defn mk-local-context []
+  (local/mk-context))
+
+(defn- mk-connection-callback
+  "make an IConnectionCallback"
+  [transfer-local-fn storm-conf worker-context]
+  (DeserializingConnectionCallback. storm-conf
+                                    worker-context
+                                    (fn [batch]
+                                      (transfer-local-fn batch))))
+
+(defn register-callback
+  "register the local-transfer-fn with the server"
+  [transfer-local-fn ^IConnection socket storm-conf worker-context]
+  (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/messaging/local.clj b/storm-core/src/clj/org/apache/storm/messaging/local.clj
new file mode 100644
index 0000000..32fbb34
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/messaging/local.clj
@@ -0,0 +1,23 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.messaging.local
+  (:import [org.apache.storm.messaging IContext])
+  (:import [org.apache.storm.messaging.local Context]))
+
+(defn mk-context [] 
+  (let [context  (Context.)]
+    (.prepare ^IContext context nil)
+    context))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/metric/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/metric/testing.clj b/storm-core/src/clj/org/apache/storm/metric/testing.clj
new file mode 100644
index 0000000..a8ec438
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/metric/testing.clj
@@ -0,0 +1,68 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.metric.testing
+  "This namespace is for AOT dependent metrics testing code."
+  (:gen-class))
+
+(letfn [(for- [threader arg seq-exprs body]
+          `(reduce #(%2 %1)
+                   ~arg
+                   (for ~seq-exprs
+                     (fn [arg#] (~threader arg# ~@body)))))]
+  (defmacro for->
+    "Apply a thread expression to a sequence.
+   eg.
+      (-> 1
+        (for-> [x [1 2 3]]
+          (+ x)))
+   => 7"
+    {:indent 1}
+    [arg seq-exprs & body]
+    (for- 'clojure.core/-> arg seq-exprs body)))
+
+(gen-class
+ :name clojure.storm.metric.testing.FakeMetricConsumer
+ :implements [org.apache.storm.metric.api.IMetricsConsumer]
+ :prefix "impl-")
+
+(def buffer (atom nil))
+
+(defn impl-prepare [this conf argument ctx error-reporter]
+  (reset! buffer {}))
+
+(defn impl-cleanup [this]
+  (reset! buffer {}))
+
+(defn vec-conj [coll x] (if coll
+                          (conj coll x)
+                          [x]))
+
+(defn expand-complex-datapoint [dp]
+  (if (or (map? (.value dp))
+          (instance? java.util.AbstractMap (.value dp)))
+    (into [] (for [[k v] (.value dp)]
+               [(str (.name dp) "/" k) v]))
+    [[(.name dp) (.value dp)]]))
+
+(defn impl-handleDataPoints [this task-info data-points]  
+  (swap! buffer
+         (fn [old]
+           (-> old
+            (for-> [dp data-points
+                    [name val] (expand-complex-datapoint dp)]
+                   (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val))))))
+ 
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj b/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj
index e9d5db5..70313e4 100644
--- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj
+++ b/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj
@@ -18,11 +18,11 @@
   (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
            [java.util.concurrent ConcurrentHashMap]
            [java.util.concurrent.atomic AtomicInteger]
-           [backtype.storm.generated HBNodes
+           [org.apache.storm.generated HBNodes
                                      HBServerMessageType HBMessage HBMessageData HBPulse]
-           [backtype.storm.utils VersionInfo])
+           [org.apache.storm.utils VersionInfo])
   (:use [clojure.string :only [replace-first split]]
-        [backtype.storm log config util])
+        [org.apache.storm log config util])
   (:require [clojure.java.jmx :as jmx])
   (:gen-class))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj b/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
index d99442b..cede59e 100644
--- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
+++ b/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
@@ -16,20 +16,20 @@
 
 (ns org.apache.storm.pacemaker.pacemaker-state-factory
   (:require [org.apache.storm.pacemaker pacemaker]
-            [backtype.storm.cluster-state [zookeeper-state-factory :as zk-factory]]
-            [backtype.storm
+            [org.apache.storm.cluster-state [zookeeper-state-factory :as zk-factory]]
+            [org.apache.storm
              [config :refer :all]
              [cluster :refer :all]
              [log :refer :all]
              [util :as util]])
-  (:import [backtype.storm.generated
+  (:import [org.apache.storm.generated
             HBExecutionException HBServerMessageType HBMessage
             HBMessageData HBPulse]
-           [backtype.storm.cluster_state zookeeper_state_factory]
-           [backtype.storm.cluster ClusterState]
+           [org.apache.storm.cluster_state zookeeper_state_factory]
+           [org.apache.storm.cluster ClusterState]
            [org.apache.storm.pacemaker PacemakerClient])
   (:gen-class
-   :implements [backtype.storm.cluster.ClusterStateFactory]))
+   :implements [org.apache.storm.cluster.ClusterStateFactory]))
 
 ;; So we can mock the client for testing
 (defn makeClient [conf]

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/process_simulator.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/process_simulator.clj b/storm-core/src/clj/org/apache/storm/process_simulator.clj
new file mode 100644
index 0000000..03c3dd9
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/process_simulator.clj
@@ -0,0 +1,51 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.process-simulator
+  (:use [org.apache.storm log util]))
+
+(def pid-counter (mk-counter))
+
+(def process-map (atom {}))
+
+(def kill-lock (Object.))
+
+(defn register-process [pid shutdownable]
+  (swap! process-map assoc pid shutdownable))
+
+(defn process-handle
+  [pid]
+  (@process-map pid))
+
+(defn all-processes
+  []
+  (vals @process-map))
+
+(defn kill-process
+  "Uses `locking` in case cluster shuts down while supervisor is
+  killing a task"
+  [pid]
+  (locking kill-lock
+    (log-message "Killing process " pid)
+    (let [shutdownable (process-handle pid)]
+      (swap! process-map dissoc pid)
+      (when shutdownable
+        (.shutdown shutdownable)))))
+
+(defn kill-all-processes
+  []
+  (doseq [pid (keys @process-map)]
+    (kill-process pid)))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/scheduler/DefaultScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/scheduler/DefaultScheduler.clj b/storm-core/src/clj/org/apache/storm/scheduler/DefaultScheduler.clj
new file mode 100644
index 0000000..f6f89f8
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/scheduler/DefaultScheduler.clj
@@ -0,0 +1,77 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.scheduler.DefaultScheduler
+  (:use [org.apache.storm util config])
+  (:require [org.apache.storm.scheduler.EvenScheduler :as EvenScheduler])
+  (:import [org.apache.storm.scheduler IScheduler Topologies
+            Cluster TopologyDetails WorkerSlot SchedulerAssignment
+            EvenScheduler ExecutorDetails])
+  (:gen-class
+    :implements [org.apache.storm.scheduler.IScheduler]))
+
+(defn- bad-slots [existing-slots num-executors num-workers]
+  (if (= 0 num-workers)
+    '()
+    (let [distribution (atom (integer-divided num-executors num-workers))
+          keepers (atom {})]
+      (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
+        (when (pos? (get @distribution executor-count 0))
+          (swap! keepers assoc node+port executor-list)
+          (swap! distribution update-in [executor-count] dec)
+          ))
+      (->> @keepers
+           keys
+           (apply dissoc existing-slots)
+           keys
+           (map (fn [[node port]]
+                  (WorkerSlot. node port)))))))
+
+(defn slots-can-reassign [^Cluster cluster slots]
+  (->> slots
+      (filter
+        (fn [[node port]]
+          (if-not (.isBlackListed cluster node)
+            (if-let [supervisor (.getSupervisorById cluster node)]
+              (.contains (.getAllPorts supervisor) (int port))
+              ))))))
+
+(defn -prepare [this conf]
+  )
+
+(defn default-schedule [^Topologies topologies ^Cluster cluster]
+  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
+    (doseq [^TopologyDetails topology needs-scheduling-topologies
+            :let [topology-id (.getId topology)
+                  available-slots (->> (.getAvailableSlots cluster)
+                                       (map #(vector (.getNodeId %) (.getPort %))))
+                  all-executors (->> topology
+                                     .getExecutors
+                                     (map #(vector (.getStartTask %) (.getEndTask %)))
+                                     set)
+                  alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
+                  alive-executors (->> alive-assigned vals (apply concat) set)
+                  can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
+                  total-slots-to-use (min (.getNumWorkers topology)
+                                          (+ (count can-reassign-slots) (count available-slots)))
+                  bad-slots (if (or (> total-slots-to-use (count alive-assigned)) 
+                                    (not= alive-executors all-executors))
+                                (bad-slots alive-assigned (count all-executors) total-slots-to-use)
+                                [])]]
+      (.freeSlots cluster bad-slots)
+      (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
+
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+  (default-schedule topologies cluster))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/org/apache/storm/scheduler/EvenScheduler.clj
new file mode 100644
index 0000000..783da26
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/scheduler/EvenScheduler.clj
@@ -0,0 +1,81 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.scheduler.EvenScheduler
+  (:use [org.apache.storm util log config])
+  (:require [clojure.set :as set])
+  (:import [org.apache.storm.scheduler IScheduler Topologies
+            Cluster TopologyDetails WorkerSlot ExecutorDetails])
+  (:gen-class
+    :implements [org.apache.storm.scheduler.IScheduler]))
+
+(defn sort-slots [all-slots]
+  (let [split-up (sort-by count > (vals (group-by first all-slots)))]
+    (apply interleave-all split-up)
+    ))
+
+(defn get-alive-assigned-node+port->executors [cluster topology-id]
+  (let [existing-assignment (.getAssignmentById cluster topology-id)
+        executor->slot (if existing-assignment
+                         (.getExecutorToSlot existing-assignment)
+                         {}) 
+        executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot
+                                           :let [executor [(.getStartTask executor) (.getEndTask executor)]
+                                                 node+port [(.getNodeId slot) (.getPort slot)]]]
+                                       {executor node+port}))
+        alive-assigned (reverse-map executor->node+port)]
+    alive-assigned))
+
+(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
+  (let [topology-id (.getId topology)
+        available-slots (->> (.getAvailableSlots cluster)
+                             (map #(vector (.getNodeId %) (.getPort %))))
+        all-executors (->> topology
+                          .getExecutors
+                          (map #(vector (.getStartTask %) (.getEndTask %)))
+                          set)
+        alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
+        total-slots-to-use (min (.getNumWorkers topology)
+                                (+ (count available-slots) (count alive-assigned)))
+        reassign-slots (take (- total-slots-to-use (count alive-assigned))
+                             (sort-slots available-slots))
+        reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
+        reassignment (into {}
+                           (map vector
+                                reassign-executors
+                                ;; for some reason it goes into infinite loop without limiting the repeat-seq
+                                (repeat-seq (count reassign-executors) reassign-slots)))]
+    (when-not (empty? reassignment)
+      (log-message "Available slots: " (pr-str available-slots))
+      )
+    reassignment))
+
+(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
+  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
+    (doseq [^TopologyDetails topology needs-scheduling-topologies
+            :let [topology-id (.getId topology)
+                  new-assignment (schedule-topology topology cluster)
+                  node+port->executors (reverse-map new-assignment)]]
+      (doseq [[node+port executors] node+port->executors
+              :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
+                    executors (for [[start-task end-task] executors]
+                                (ExecutorDetails. start-task end-task))]]
+        (.assign cluster slot topology-id executors)))))
+
+(defn -prepare [this conf]
+  )
+
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+  (schedule-topologies-evenly topologies cluster))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj b/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj
new file mode 100644
index 0000000..2e86748
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj
@@ -0,0 +1,219 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.scheduler.IsolationScheduler
+  (:use [org.apache.storm util config log])
+  (:require [org.apache.storm.scheduler.DefaultScheduler :as DefaultScheduler])
+  (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
+  (:import [org.apache.storm.scheduler IScheduler Topologies
+            Cluster TopologyDetails WorkerSlot SchedulerAssignment
+            EvenScheduler ExecutorDetails])
+  (:gen-class
+    :init init
+    :constructors {[] []}
+    :state state 
+    :implements [org.apache.storm.scheduler.IScheduler]))
+
+(defn -init []
+  [[] (container)])
+
+(defn -prepare [this conf]
+  (container-set! (.state this) conf))
+
+(defn- compute-worker-specs "Returns mutable set of sets of executors"
+  [^TopologyDetails details]
+  (->> (.getExecutorToComponent details)
+       reverse-map
+       (map second)
+       (apply concat)
+       (map vector (repeat-seq (range (.getNumWorkers details))))
+       (group-by first)
+       (map-val #(map second %))
+       vals
+       (map set)
+       (HashSet.)
+       ))
+
+(defn isolated-topologies [conf topologies]
+  (let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
+    (filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
+    ))
+
+;; map from topology id -> set of sets of executors
+(defn topology-worker-specs [iso-topologies]
+  (->> iso-topologies
+       (map (fn [t] {(.getId t) (compute-worker-specs t)}))
+       (apply merge)))
+
+(defn machine-distribution [conf ^TopologyDetails topology]
+  (let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
+        machines (get name->machines (.getName topology))
+        workers (.getNumWorkers topology)]
+    (-> (integer-divided workers machines)
+        (dissoc 0)
+        (HashMap.)
+        )))
+
+(defn topology-machine-distribution [conf iso-topologies]
+  (->> iso-topologies
+       (map (fn [t] {(.getId t) (machine-distribution conf t)}))
+       (apply merge)))
+
+(defn host-assignments [^Cluster cluster]
+  (letfn [(to-slot-specs [^SchedulerAssignment ass]
+            (->> ass
+                 .getExecutorToSlot
+                 reverse-map
+                 (map (fn [[slot executors]]
+                        [slot (.getTopologyId ass) (set executors)]))))]
+  (->> cluster
+       .getAssignments
+       vals
+       (mapcat to-slot-specs)
+       (group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
+       )))
+
+(defn- decrement-distribution! [^Map distribution value]
+  (let [v (-> distribution (get value) dec)]
+    (if (zero? v)
+      (.remove distribution value)
+      (.put distribution value v))))
+
+;; returns list of list of slots, reverse sorted by number of slots
+(defn- host-assignable-slots [^Cluster cluster]
+  (-<> cluster
+       .getAssignableSlots
+       (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
+       (dissoc <> nil)
+       (sort-by #(-> % second count -) <>)
+       shuffle
+       (LinkedList. <>)
+       ))
+
+(defn- host->used-slots [^Cluster cluster]
+  (->> cluster
+       .getUsedSlots
+       (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
+       ))
+
+(defn- distribution->sorted-amts [distribution]
+  (->> distribution
+       (mapcat (fn [[val amt]] (repeat amt val)))
+       (sort-by -)
+       ))
+
+(defn- allocated-topologies [topology-worker-specs]
+  (->> topology-worker-specs
+    (filter (fn [[_ worker-specs]] (empty? worker-specs)))
+    (map first)
+    set
+    ))
+
+(defn- leftover-topologies [^Topologies topologies filter-ids-set]
+  (->> topologies
+       .getTopologies
+       (filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
+       (map (fn [^TopologyDetails t] {(.getId t) t}))
+       (apply merge)
+       (Topologies.)
+       ))
+
+;; for each isolated topology:
+;;   compute even distribution of executors -> workers on the number of workers specified for the topology
+;;   compute distribution of workers to machines
+;; determine host -> list of [slot, topology id, executors]
+;; iterate through hosts and: a machine is good if:
+;;   1. only running workers from one isolated topology
+;;   2. all workers running on it match one of the distributions of executors for that topology
+;;   3. matches one of the # of workers
+;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
+;; otherwise unassign all other workers for isolated topologies if assigned
+
+(defn remove-elem-from-set! [^Set aset]
+  (let [elem (-> aset .iterator .next)]
+    (.remove aset elem)
+    elem
+    ))
+
+;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
+;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
+;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
+;; blacklist all machines who had production slots defined
+;; log isolated topologies who weren't able to get enough slots / machines
+;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
+;; set blacklist to what it was initially
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+  (let [conf (container-get (.state this))        
+        orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
+        iso-topologies (isolated-topologies conf (.getTopologies topologies))
+        iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
+        topology-worker-specs (topology-worker-specs iso-topologies)
+        topology-machine-distribution (topology-machine-distribution conf iso-topologies)
+        host-assignments (host-assignments cluster)]
+    (doseq [[host assignments] host-assignments]
+      (let [top-id (-> assignments first second)
+            distribution (get topology-machine-distribution top-id)
+            ^Set worker-specs (get topology-worker-specs top-id)
+            num-workers (count assignments)
+            ]
+        (if (and (contains? iso-ids-set top-id)
+                 (every? #(= (second %) top-id) assignments)
+                 (contains? distribution num-workers)
+                 (every? #(contains? worker-specs (nth % 2)) assignments))
+          (do (decrement-distribution! distribution num-workers)
+              (doseq [[_ _ executors] assignments] (.remove worker-specs executors))
+              (.blacklistHost cluster host))
+          (doseq [[slot top-id _] assignments]
+            (when (contains? iso-ids-set top-id)
+              (.freeSlot cluster slot)
+              ))
+          )))
+    
+    (let [host->used-slots (host->used-slots cluster)
+          ^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
+      ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
+      (doseq [[top-id worker-specs] topology-worker-specs
+              :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
+        (doseq [amt amts
+                :let [[host host-slots] (.peek sorted-assignable-hosts)]]
+          (when (and host-slots (>= (count host-slots) amt))
+            (.poll sorted-assignable-hosts)
+            (.freeSlots cluster (get host->used-slots host))
+            (doseq [slot (take amt host-slots)
+                    :let [executors-set (remove-elem-from-set! worker-specs)]]
+              (.assign cluster slot top-id executors-set))
+            (.blacklistHost cluster host))
+          )))
+    
+    (let [failed-iso-topologies (->> topology-worker-specs
+                                  (mapcat (fn [[top-id worker-specs]]
+                                    (if-not (empty? worker-specs) [top-id])
+                                    )))]
+      (if (empty? failed-iso-topologies)
+        ;; run default scheduler on non-isolated topologies
+        (-<> topology-worker-specs
+             allocated-topologies
+             (leftover-topologies topologies <>)
+             (DefaultScheduler/default-schedule <> cluster))
+        (do
+          (log-warn "Unable to isolate topologies " (pr-str failed-iso-topologies) ". No machine had enough worker slots to run the remaining workers for these topologies. Clearing all other resources and will wait for enough resources for isolated topologies before allocating any other resources.")
+          ;; clear workers off all hosts that are not blacklisted
+          (doseq [[host slots] (host->used-slots cluster)]
+            (if-not (.isBlacklistedHost cluster host)
+              (.freeSlots cluster slots)
+              )))
+        ))
+    (.setBlacklistedHosts cluster orig-blacklist)
+    ))