You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/08 08:17:39 UTC

[3/5] storm git commit: STORM-1277 port backtype.storm.daemon.executor to java

STORM-1277 port backtype.storm.daemon.executor to java

* code rebased by Jungtaek Lim <ka...@gmail.com>
* Closes #1445


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5e19d9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5e19d9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5e19d9b

Branch: refs/heads/master
Commit: a5e19d9b8064f83adf00190ed74518e2156faae2
Parents: 44068c4
Author: \u536b\u4e50 <we...@taobao.com>
Authored: Fri Apr 29 17:58:50 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:13:48 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  18 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 839 -------------------
 .../org/apache/storm/daemon/local_executor.clj  |  42 +
 .../src/clj/org/apache/storm/daemon/worker.clj  |  70 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  16 +-
 .../src/jvm/org/apache/storm/Constants.java     |  24 +-
 .../org/apache/storm/daemon/StormCommon.java    | 175 ++--
 .../src/jvm/org/apache/storm/daemon/Task.java   |  76 +-
 .../jvm/org/apache/storm/executor/Executor.java | 576 +++++++++++++
 .../apache/storm/executor/ExecutorShutdown.java | 111 +++
 .../apache/storm/executor/ExecutorTransfer.java |  87 ++
 .../apache/storm/executor/IRunningExecutor.java |  31 +
 .../org/apache/storm/executor/TupleInfo.java    |  90 ++
 .../storm/executor/bolt/BoltExecutor.java       | 138 +++
 .../executor/bolt/BoltOutputCollectorImpl.java  | 171 ++++
 .../storm/executor/error/IReportError.java      |  22 +
 .../storm/executor/error/ReportError.java       |  76 ++
 .../storm/executor/error/ReportErrorAndDie.java |  47 ++
 .../storm/executor/spout/SpoutExecutor.java     | 255 ++++++
 .../spout/SpoutOutputCollectorImpl.java         | 147 ++++
 .../apache/storm/stats/BoltExecutorStats.java   |   1 +
 .../jvm/org/apache/storm/stats/CommonStats.java |   5 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |   1 +
 .../org/apache/storm/task/TopologyContext.java  |  32 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  29 +
 .../storm/utils/WorkerBackpressureThread.java   |  11 +-
 .../org/apache/storm/integration_test.clj       |  11 +-
 .../test/clj/org/apache/storm/grouping_test.clj |   2 +-
 28 files changed, 2077 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index cc5436c..01a49b3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -52,20 +52,4 @@
       (catch Throwable t#
         (log-error t# "Error on initialization of server " ~(str name))
         (Utils/exitProcess 13 "Error on initialization")
-        )))))
-
-(defn worker-context [worker]
-  (WorkerTopologyContext. (:system-topology worker)
-                          (:storm-conf worker)
-                          (:task->component worker)
-                          (:component->sorted-tasks worker)
-                          (:component->stream->fields worker)
-                          (:storm-id worker)
-                          (ConfigUtils/supervisorStormResourcesPath
-                            (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
-                          (ConfigUtils/workerPidsRoot (:conf worker) (:worker-id worker))
-                          (:port worker)
-                          (:task-ids worker)
-                          (:default-shared-resources worker)
-                          (:user-shared-resources worker)
-                          ))
+        )))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
deleted file mode 100644
index 1fdfbf5..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ /dev/null
@@ -1,839 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.executor
-  (:use [org.apache.storm.daemon common])
-  (:use [clojure.walk])
-  (:import [org.apache.storm.generated Grouping Grouping$_Fields]
-           [java.io Serializable]
-           [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]
-           [org.apache.storm.daemon.metrics BuiltinMetricsUtil SpoutThrottlingMetrics])
-  (:use [org.apache.storm util config log])
-  (:import [java.util List Random HashMap ArrayList LinkedList Map])
-  (:import [org.apache.storm ICredentialsListener Thrift])
-  (:import [org.apache.storm.hooks ITaskHook])
-  (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId])
-  (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector])
-  (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
-            EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
-  (:import [org.apache.storm.grouping CustomStreamGrouping])
-  (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
-  (:import [org.apache.storm.generated GlobalStreamId])
-  (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
-  (:import [com.lmax.disruptor InsufficientCapacityException])
-  (:import [org.apache.storm.serialization KryoTupleSerializer])
-  (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory])
-  (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
-  (:import [org.apache.storm Config Constants])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
-  (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-  (:import [java.lang Thread Thread$UncaughtExceptionHandler]
-           [java.util.concurrent ConcurrentLinkedQueue]
-           [org.json.simple JSONValue]
-           [com.lmax.disruptor.dsl ProducerType]
-           [org.apache.storm StormTimer])
-  (:require [clojure.set :as set]))
-
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn- outbound-groupings
-  [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping topo-conf]
-  (->> component->grouping
-       (filter-key #(-> worker-context
-                        (.getComponentTasks %)
-                        count
-                        pos?))
-       (map (fn [[component tgrouping]]
-               [component
-                (GrouperFactory/mkGrouper worker-context
-                                this-component-id
-                                stream-id
-                                out-fields
-                                tgrouping
-                                (.getComponentTasks worker-context component)
-                                topo-conf)]))
-       (into {})
-       (HashMap.)))
-
-(defn outbound-components
-  "Returns map of stream id to component id to grouper"
-  [^WorkerTopologyContext worker-context component-id topo-conf]
-  (->> (.getTargets worker-context component-id)
-        clojurify-structure
-        (map (fn [[stream-id component->grouping]]
-               [stream-id
-                (outbound-groupings
-                  worker-context
-                  component-id
-                  stream-id
-                  (.getComponentOutputFields worker-context component-id stream-id)
-                  component->grouping
-                  topo-conf)]))
-         (into (apply merge (map #(hash-map % nil) (.keySet (.get_streams (.getComponentCommon worker-context component-id))))))
-         (HashMap.)))
-
-(defn executor-type [^WorkerTopologyContext context component-id]
-  (let [topology (.getRawTopology context)
-        spouts (.get_spouts topology)
-        bolts (.get_bolts topology)]
-    (cond (contains? spouts component-id) "spout"
-          (contains? bolts component-id) "bolt"
-          :else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology))))))
-
-(defn executor-selector [executor-data & _] (keyword (:type executor-data)))
-
-(defmulti mk-threads executor-selector)
-(defmulti mk-executor-stats executor-selector)
-(defmulti close-component executor-selector)
-
-(defn- normalized-component-conf [storm-conf general-context component-id]
-  (let [to-remove (disj (set ALL-CONFIGS)
-                        TOPOLOGY-DEBUG
-                        TOPOLOGY-MAX-SPOUT-PENDING
-                        TOPOLOGY-MAX-TASK-PARALLELISM
-                        TOPOLOGY-TRANSACTIONAL-ID
-                        TOPOLOGY-TICK-TUPLE-FREQ-SECS
-                        TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
-                        TOPOLOGY-SPOUT-WAIT-STRATEGY
-                        TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
-                        TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
-                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
-                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
-                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
-                        TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
-                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
-                        TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
-                        TOPOLOGY-STATE-PROVIDER
-                        TOPOLOGY-STATE-PROVIDER-CONFIG
-                        )
-        spec-conf (-> general-context
-                      (.getComponentCommon component-id)
-                      .get_json_conf
-                      (#(if % (JSONValue/parse %)))
-                      clojurify-structure)]
-    (merge storm-conf (apply dissoc spec-conf to-remove))
-    ))
-
-(defprotocol RunningExecutor
-  (render-stats [this])
-  (get-executor-id [this])
-  (credentials-changed [this creds])
-  (get-backpressure-flag [this]))
-
-(defn throttled-report-error-fn [executor]
-  (let [storm-conf (:storm-conf executor)
-        error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
-        max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
-        interval-start-time (atom (Time/currentTimeSecs))
-        interval-errors (atom 0)
-        ]
-    (fn [error]
-      (log-error error)
-      (when (> (Time/deltaSecs @interval-start-time)
-               error-interval-secs)
-        (reset! interval-errors 0)
-        (reset! interval-start-time (Time/currentTimeSecs)))
-      (swap! interval-errors inc)
-
-      (when (<= @interval-errors max-per-interval)
-        (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
-                              (Utils/hostname storm-conf)
-          (long (.getThisWorkerPort (:worker-context executor))) error)
-        ))))
-
-;; in its own function so that it can be mocked out by tracked topologies
-(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
-  (fn this
-    [task tuple]
-    (let [val (AddressedTuple. task tuple)]
-      (when (= true (storm-conf TOPOLOGY-DEBUG))
-        (log-message "TRANSFERING tuple " val))
-      (.publish ^DisruptorQueue batch-transfer->worker val))))
-
-(defn mk-executor-data [worker executor-id]
-  (let [worker-context (worker-context worker)
-        task-ids (clojurify-structure (StormCommon/executorIdToTasks executor-id))
-        component-id (.getComponentId worker-context (first task-ids))
-        storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
-        executor-type (executor-type worker-context component-id)
-        batch-transfer->worker (DisruptorQueue.
-                                  (str "executor"  executor-id "-send-queue")
-                                  ProducerType/SINGLE
-                                  (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
-                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                  (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                  (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        ]
-    (recursive-map
-     :worker worker
-     :worker-context worker-context
-     :executor-id executor-id
-     :task-ids task-ids
-     :component-id component-id
-     :open-or-prepare-was-called? (atom false)
-     :storm-conf storm-conf
-     :receive-queue ((:executor-receive-queue-map worker) executor-id)
-     :storm-id (:storm-id worker)
-     :conf (:conf worker)
-     :shared-executor-data (HashMap.)
-     :storm-active-atom (:storm-active-atom worker)
-     :storm-component->debug-atom (:storm-component->debug-atom worker)
-     :batch-transfer-queue batch-transfer->worker
-     :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
-     :suicide-fn (:suicide-fn worker)
-     :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
-                            (ClusterStateContext. DaemonType/WORKER))
-     :type executor-type
-     ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
-     :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
-     :interval->task->metric-registry (HashMap.)
-     :task->component (:task->component worker)
-     :stream->component->grouper (outbound-components worker-context component-id storm-conf)
-     :report-error (throttled-report-error-fn <>)
-     :report-error-and-die (reify
-                             Thread$UncaughtExceptionHandler
-                             (uncaughtException [this _ error]
-                               (try
-                                 ((:report-error <>) error)
-                                 (catch Exception e
-                                   (log-error e "Error while reporting error to cluster, proceeding with shutdown")))
-                               (if (or
-                                    (Utils/exceptionCauseIsInstanceOf InterruptedException error)
-                                    (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
-                                 (log-message "Got interrupted excpetion shutting thread down...")
-                                 ((:suicide-fn <>)))))
-     :sampler (ConfigUtils/mkStatsSampler storm-conf)
-     :backpressure (atom false)
-     :spout-throttling-metrics (if (= (keyword executor-type) :spout)
-                                 (SpoutThrottlingMetrics.)
-                                nil)
-     ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
-     )))
-
-(defn- mk-disruptor-backpressure-handler [executor-data]
-  "make a handler for the executor's receive disruptor queue to
-  check highWaterMark and lowWaterMark for backpressure"
-  (reify DisruptorBackpressureCallback
-    (highWaterMark [this]
-      "When receive queue is above highWaterMark"
-      (if (not @(:backpressure executor-data))
-        (do (reset! (:backpressure executor-data) true)
-            (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
-    (lowWaterMark [this]
-      "When receive queue is below lowWaterMark"
-      (if @(:backpressure executor-data)
-        (do (reset! (:backpressure executor-data) false)
-            (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
-
-(defn start-batch-transfer->worker-handler! [worker executor-data]
-  (let [worker-transfer-fn (:transfer-fn worker)
-        cached-emit (MutableObject. (ArrayList.))
-        storm-conf (:storm-conf executor-data)
-        serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
-        ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
-        handler (reify com.lmax.disruptor.EventHandler
-                  (onEvent [this o seq-id batch-end?]
-                    (let [^ArrayList alist (.getObject cached-emit)]
-                      (.add alist o)
-                      (when batch-end?
-                        (worker-transfer-fn serializer alist)
-                        (.setObject cached-emit (ArrayList.))))))
-        ]
-    (Utils/asyncLoop
-      (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
-      (.getName batch-transfer-queue)
-      (:uncaught-exception-handler (:report-error-and-die executor-data)))))
-
-;; TODO: this is all expensive... should be precomputed
-(defn send-unanchored
-  [^Task task-data stream values transfer-fn]
-  (let [out-tuple (.getTuple task-data stream values)]
-    (fast-list-iter [t (.getOutgoingTasks task-data stream values)]
-                    (transfer-fn t out-tuple))))
-
-(defn setup-metrics! [executor-data]
-  (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
-        distinct-time-bucket-intervals (keys interval->task->metric-registry)]
-    (doseq [interval distinct-time-bucket-intervals]
-      (.scheduleRecurring
-        (:user-timer (:worker executor-data))
-        interval
-        interval
-        (fn []
-          (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
-            (.publish ^DisruptorQueue receive-queue val)))))))
-
-(defn metrics-tick
-  [executor-data task-data ^TupleImpl tuple]
-   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
-         interval (.getInteger tuple 0)
-         transfer-fn (:transfer-fn executor-data)
-         task-id (.getTaskId task-data)
-         name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
-         task-info (IMetricsConsumer$TaskInfo.
-                     (Utils/hostname (:storm-conf executor-data))
-                     (.getThisWorkerPort worker-context)
-                     (:component-id executor-data)
-                     task-id
-                     (long (Time/currentTimeSecs))
-                     interval)
-         data-points (->> name->imetric
-                          (map (fn [[name imetric]]
-                                 (let [value (.getValueAndReset ^IMetric imetric)]
-                                   (if value
-                                     (IMetricsConsumer$DataPoint. name value)))))
-                          (filter identity)
-                          (into []))]
-     (when (seq data-points)
-       (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn))))
-
-(defn setup-ticks! [worker executor-data]
-  (let [storm-conf (:storm-conf executor-data)
-        tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
-        receive-queue (:receive-queue executor-data)
-        context (:worker-context executor-data)]
-    (when tick-time-secs
-      (if (or (Utils/isSystemId (:component-id executor-data))
-              (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
-                   (= :spout (keyword (:type executor-data)))))
-        (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
-        (.scheduleRecurring
-          (:user-timer worker)
-          tick-time-secs
-          tick-time-secs
-          (fn []
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-              (.publish ^DisruptorQueue receive-queue val))))))))
-
-(defn mk-executor [worker executor-id initial-credentials]
-  (let [executor-data (mk-executor-data worker executor-id)
-        transfer-fn (:transfer-fn executor-data)
-        _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
-        task-datas (->> executor-data
-                        :task-ids
-                        (map (fn [t] (let [task (Task. (stringify-keys executor-data)  t)
-                                           stream StormCommon/SYSTEM_STREAM_ID
-                                           values ["startup"]]
-                                       ;; when this is called, the threads for the executor haven't been started yet,
-                                       ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
-                                       (send-unanchored task stream values transfer-fn)
-                                       [t task]
-                                       )))
-                        (into {})
-                        (HashMap.))
-        _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
-        report-error-and-die (:report-error-and-die executor-data)
-        component-id (:component-id executor-data)
-
-
-        disruptor-handler (mk-disruptor-backpressure-handler executor-data)
-        _ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
-        _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
-              (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
-              (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
-
-        ;; starting the batch-transfer->worker ensures that anything publishing to that queue 
-        ;; doesn't block (because it's a single threaded queue and the caching/consumer started
-        ;; trick isn't thread-safe)
-        system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
-        handlers (try
-                   (mk-threads executor-data task-datas initial-credentials)
-                   (catch Throwable t (.uncaughtException report-error-and-die nil t)))
-        threads (concat handlers system-threads)]    
-    (setup-ticks! worker executor-data)
-
-    (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
-    ;; TODO: add method here to get rendered stats... have worker call that when heartbeating
-    (reify
-      RunningExecutor
-      (render-stats [this]
-        (.renderStats (:stats executor-data)))
-      (get-executor-id [this]
-        executor-id)
-      (credentials-changed [this creds]
-        (let [receive-queue (:receive-queue executor-data)
-              context (:worker-context executor-data)
-              val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
-          (.publish ^DisruptorQueue receive-queue val)))
-      (get-backpressure-flag [this]
-        @(:backpressure executor-data))
-      Shutdownable
-      (shutdown
-        [this]
-        (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
-        (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data))
-        (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data))
-        (doseq [t threads]
-          (.interrupt t)
-          (.join t))
-
-        (.cleanupStats (:stats executor-data))
-        (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
-          (doseq [hook (.getHooks user-context)]
-            (.cleanup hook)))
-        (.disconnect (:storm-cluster-state executor-data))
-        (when @(:open-or-prepare-was-called? executor-data)
-          (doseq [obj (map #(.getTaskObject %) (vals task-datas))]
-            (close-component executor-data obj)))
-        (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
-        )))
-
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
-  (let [^ISpout spout (.getTaskObject task-data)
-        storm-conf (:storm-conf executor-data)
-        task-id (.getTaskId task-data)]
-    ;;TODO: need to throttle these when there's lots of failures
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
-    (.fail spout msg-id)
-    (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data))
-    (when time-delta
-      (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
-  (let [storm-conf (:storm-conf executor-data)
-        ^ISpout spout (.getTaskObject task-data)
-        task-id (.getTaskId task-data)]
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Acking message " id " " msg-id))
-    (.ack spout msg-id)
-    (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data))
-    (when time-delta
-      (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn mk-task-receiver [executor-data tuple-action-fn]
-  (let [task-ids (:task-ids executor-data)
-        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-        ]
-    (reify com.lmax.disruptor.EventHandler
-      (onEvent [this tuple-batch sequence-id end-of-batch?]
-        (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
-          (let [^TupleImpl tuple (.getTuple addressed-tuple)
-                task-id (.getDest addressed-tuple)]
-            (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple))
-            (if (not= task-id AddressedTuple/BROADCAST_DEST)
-              (tuple-action-fn task-id tuple)
-              ;; null task ids are broadcast tuples
-              (fast-list-iter [task-id task-ids]
-                (tuple-action-fn task-id tuple)
-                ))
-            ))))))
-
-(defn executor-max-spout-pending [storm-conf num-tasks]
-  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
-    (if p (* p num-tasks))))
-
-(defn init-spout-wait-strategy [storm-conf]
-  (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) Utils/newInstance)]
-    (.prepare ret storm-conf)
-    ret
-    ))
-
-;; Send sampled data to the eventlogger if the global or component level
-;; debug flag is set (via nimbus api).
-(defn send-to-eventlogger [executor-data task-data values component-id message-id random]
-    (let [c->d @(:storm-component->debug-atom executor-data)
-          options (get c->d component-id (get c->d (:storm-id executor-data)))
-          spct    (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
-      ;; the thread's initialized random number generator is used to generate
-      ;; uniformily distributed random numbers.
-      (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
-        (send-unanchored
-          task-data
-          StormCommon/EVENTLOGGER_STREAM_ID
-          [component-id message-id (System/currentTimeMillis) values]
-          (:transfer-fn executor-data)))))
-
-(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
-  (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
-        ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
-        max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
-        ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))        
-        last-active (atom false)        
-        spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas)))
-        rand (Random. (Utils/secureRandomLong))
-        ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
-        debug? (= true (storm-conf TOPOLOGY-DEBUG))
-
-        pending (RotatingMap.
-                 2 ;; microoptimize for performance of .size method
-                 (reify RotatingMap$ExpiredCallback
-                   (expire [this id [task-id spout-id tuple-info start-time-ms]]
-                     (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
-                       (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
-                       ))))
-        tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          (let [stream-id (.getSourceStreamId tuple)]
-                            (condp = stream-id
-                              Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
-                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [task-data (get task-datas task-id)
-                                      spout-obj (.getTaskObject task-data)]
-                                  (when (instance? ICredentialsListener spout-obj)
-                                    (.setCredentials spout-obj (.getValue tuple 0))))
-                              Acker/ACKER_RESET_TIMEOUT_STREAM_ID 
-                                (let [id (.getValue tuple 0)
-                                      pending-for-id (.get pending id)]
-                                   (when pending-for-id
-                                     (.put pending id pending-for-id))) 
-                              (let [id (.getValue tuple 0)
-                                    time-delta-ms (.getValue tuple 1)
-                                    [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
-                                (when spout-id
-                                  (when-not (= stored-task-id task-id)
-                                    (throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
-                                  (let [time-delta (if start-time-ms time-delta-ms)]
-                                    (condp = stream-id
-                                      Acker/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
-                                                                               spout-id tuple-finished-info time-delta id)
-                                      Acker/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
-                                                                           spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
-                                      )))
-                                ;; TODO: on failure, emit tuple to failure stream
-                                ))))
-        receive-queue (:receive-queue executor-data)
-        event-handler (mk-task-receiver executor-data tuple-action-fn)
-        has-ackers? (StormCommon/hasAckers storm-conf)
-        has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
-        emitted-count (MutableLong. 0)
-        empty-emit-streak (MutableLong. 0)
-        spout-transfer-fn (fn []
-                            ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
-                            (while (not @(:storm-active-atom executor-data))
-                              (Thread/sleep 100))
-                            (log-message "Opening spout " component-id ":" (keys task-datas))
-                            (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas))))
-                            (doseq [[task-id task-data] task-datas
-                                    :let [^ISpout spout-obj (.getTaskObject task-data)
-                                          send-spout-msg (fn [out-stream-id values message-id out-task-id]
-                                                           (.increment emitted-count)
-                                                           (let [out-tasks (if out-task-id
-                                                                             (.getOutgoingTasks task-data out-task-id out-stream-id values)
-                                                                             (.getOutgoingTasks task-data out-stream-id values))
-                                                                 rooted? (and message-id has-ackers?)
-                                                                 root-id (if rooted? (MessageId/generateId rand))
-                                                                 ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
-                                                             (fast-list-iter [out-task out-tasks id out-ids]
-                                                                             (let [tuple-id (if rooted?
-                                                                                              (MessageId/makeRootId root-id id)
-                                                                                              (MessageId/makeUnanchored))
-                                                                                   out-tuple (TupleImpl. worker-context
-                                                                                                         values
-                                                                                                         task-id
-                                                                                                         out-stream-id
-                                                                                                         tuple-id)]
-                                                                               (transfer-fn out-task out-tuple)))
-                                                             (if has-eventloggers?
-                                                               (send-to-eventlogger executor-data task-data values component-id message-id rand))
-                                                             (if (and rooted?
-                                                                      (not (.isEmpty out-ids)))
-                                                               (do
-                                                                 (.put pending root-id [task-id
-                                                                                        message-id
-                                                                                        {:stream out-stream-id 
-                                                                                         :values (if debug? values nil)}
-                                                                                        (if (.call ^Callable sampler) (System/currentTimeMillis))])
-                                                                 (send-unanchored task-data
-                                                                                       Acker/ACKER_INIT_STREAM_ID
-                                                                                       [root-id (Utils/bitXorVals out-ids) task-id]
-                                                                                       (:transfer-fn executor-data)))
-                                                               (when message-id
-                                                                 (ack-spout-msg executor-data task-data message-id
-                                                                                {:stream out-stream-id :values values}
-                                                                                (if (.call ^Callable sampler) 0) "0:")))
-                                                             (or out-tasks [])))]]
-
-                              (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data))
-                              (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
-                                                                       "receive" receive-queue}
-                                                                      storm-conf (.getUserContext task-data))
-
-                              (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
-
-                              (.open spout-obj
-                                     storm-conf
-                                     (.getUserContext task-data)
-                                     (SpoutOutputCollector.
-                                      (reify ISpoutOutputCollector
-                                        (^long getPendingCount[this]
-                                          (.size pending))
-                                        (^List emit [this ^String stream-id ^List tuple ^Object message-id]
-                                          (send-spout-msg stream-id tuple message-id nil))
-                                        (^void emitDirect [this ^int out-task-id ^String stream-id
-                                                           ^List tuple ^Object message-id]
-                                          (send-spout-msg stream-id tuple message-id out-task-id))
-                                        (reportError [this error]
-                                          (report-error error))))))
-
-                            (reset! open-or-prepare-was-called? true) 
-                            (log-message "Opened spout " component-id ":" (keys task-datas))
-                            (setup-metrics! executor-data)
-
-                            (fn []
-                              ;; This design requires that spouts be non-blocking
-                              (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-
-                              (let [active? @(:storm-active-atom executor-data)
-                                    curr-count (.get emitted-count)
-                                    backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
-                                    throttle-on (and backpressure-enabled
-                                                     @(:throttle-on (:worker executor-data)))
-                                    reached-max-spout-pending (and max-spout-pending
-                                                                   (>= (.size pending) max-spout-pending))]
-                                (if active?
-                                        ; activated
-                                  (do
-                                    (when-not @last-active
-                                      (reset! last-active true)
-                                      (log-message "Activating spout " component-id ":" (keys task-datas))
-                                      (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-
-                                    (if (and (not (.isFull transfer-queue))
-                                             (not throttle-on)
-                                             (not reached-max-spout-pending))
-                                      (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
-                                        ; deactivated
-                                  (do
-                                    (when @last-active
-                                      (reset! last-active false)
-                                      (log-message "Deactivating spout " component-id ":" (keys task-datas))
-                                      (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
-                                    ;; TODO: log that it's getting throttled
-                                    (Time/sleep 100)
-                                    (.skippedInactive (:spout-throttling-metrics executor-data) (:stats executor-data))))
-
-                                (if (and (= curr-count (.get emitted-count)) active?)
-                                  (do (.increment empty-emit-streak)
-                                      (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
-                                      ;; update the spout throttling metrics
-                                      (if throttle-on
-                                        (.skippedThrottle (:spout-throttling-metrics executor-data) (:stats executor-data))
-                                        (if reached-max-spout-pending
-                                          (.skippedMaxSpout (:spout-throttling-metrics executor-data) (:stats executor-data)))))
-                                  (.set empty-emit-streak 0)))
-                              0))]
-
-    [(Utils/asyncLoop
-      spout-transfer-fn
-      false ; isDaemon
-      (:report-error-and-die executor-data)
-      Thread/NORM_PRIORITY
-      true ; isFactory
-      true ; startImmediately
-      (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defn- tuple-time-delta! [^TupleImpl tuple]
-  (let [ms (.getProcessSampleStartTime tuple)]
-    (if ms
-      (Time/deltaMs ms))))
-      
-(defn- tuple-execute-time-delta! [^TupleImpl tuple]
-  (let [ms (.getExecuteSampleStartTime tuple)]
-    (if ms
-      (Time/deltaMs ms))))
-
-(defn put-xor! [^Map pending key id]
-  (let [curr (or (.get pending key) (long 0))]
-    (.put pending key (bit-xor curr id))))
-
-(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
-  (let [storm-conf (:storm-conf executor-data)
-        execute-sampler (ConfigUtils/mkStatsSampler storm-conf)
-        executor-stats (:stats executor-data)
-        {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
-                open-or-prepare-was-called?]} executor-data
-        rand (Random. (Utils/secureRandomLong))
-
-        tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          ;; synchronization needs to be done with a key provided by this bolt, otherwise:
-                          ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
-                          ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
-                          ;; buffer other tuples until fully synchronized, then process all of those tuples
-                          ;; then go into normal loop
-                          ;; spill to disk?
-                          ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
-                          ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
-                          ;; or just timeout the sync messages that are coming in until full sync is hit from that task
-                          ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
-                          ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
-                          ;; TODO: how to handle incremental updates as well as synchronizations at same time
-                          ;; TODO: need to version tuples somehow
-
-                          ;;(log-debug "Received tuple " tuple " at task " task-id)
-                          ;; need to do it this way to avoid reflection
-                          (let [stream-id (.getSourceStreamId tuple)]
-                            (condp = stream-id
-                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [^Task task-data (get task-datas task-id)
-                                      bolt-obj (.getTaskObject task-data)]
-                                  (when (instance? ICredentialsListener bolt-obj)
-                                    (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0))))
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
-                              (let [^Task task-data (get task-datas task-id)
-                                    ^IBolt bolt-obj (.getTaskObject task-data)
-                                    user-context (.getUserContext task-data)
-                                    sampler? (.call ^Callable sampler)
-                                    execute-sampler? (.call ^Callable execute-sampler)
-                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
-                                    receive-queue (:receive-queue executor-data)]
-                                (when sampler?
-                                  (.setProcessSampleStartTime tuple now))
-                                (when execute-sampler?
-                                  (.setExecuteSampleStartTime tuple now))
-                                (.execute bolt-obj tuple)
-                                (let [delta (tuple-execute-time-delta! tuple)]
-                                  (when (= true (storm-conf TOPOLOGY-DEBUG))
-                                    (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
-
-                                  (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context)
-                                  (when delta
-                                    (.boltExecuteTuple executor-stats
-                                                               (.getSourceComponent tuple)
-                                                               (.getSourceStreamId tuple)
-                                                               delta)))))))
-        has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
-        bolt-transfer-fn (fn []
-                           ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
-                           (while (not @(:storm-active-atom executor-data))
-                             (Thread/sleep 100))
-
-                           (log-message "Preparing bolt " component-id ":" (keys task-datas))
-                           (doseq [[task-id task-data] task-datas
-                                   :let [^IBolt bolt-obj (.getTaskObject task-data)
-                                         user-context (.getUserContext task-data)
-                                         transfer-fn (:transfer-fn executor-data)
-                                         bolt-emit (fn [stream anchors values task]
-                                                     (let [out-tasks (if task
-                                                                       (.getOutgoingTasks task-data task stream values)
-                                                                       (.getOutgoingTasks task-data stream values))]
-                                                       (fast-list-iter [t out-tasks]
-                                                                       (let [anchors-to-ids (HashMap.)]
-                                                                         (fast-list-iter [^TupleImpl a anchors]
-                                                                                         (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
-                                                                                           (when (pos? (count root-ids))
-                                                                                             (let [edge-id (MessageId/generateId rand)]
-                                                                                               (.updateAckVal a edge-id)
-                                                                                               (fast-list-iter [root-id root-ids]
-                                                                                                               (put-xor! anchors-to-ids root-id edge-id))))))
-                                                                         (let [tuple (TupleImpl. worker-context
-                                                                                                 values
-                                                                                                 task-id
-                                                                                                 stream
-                                                                                                 (MessageId/makeId anchors-to-ids))]
-                                                                           (transfer-fn t tuple))))
-                                                       (if has-eventloggers?
-                                                         (send-to-eventlogger executor-data task-data values component-id nil rand))
-                                                       (or out-tasks [])))]]
-                             (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context)
-                             (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
-                             (if (= component-id Constants/SYSTEM_COMPONENT_ID)
-                               (do
-                                 (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
-                                                                          "receive" (:receive-queue executor-data)
-                                                                          "transfer" (:transfer-queue (:worker executor-data))}
-                                                                         storm-conf user-context)
-                                 (BuiltinMetricsUtil/registerIconnectionClientMetrics
-                                   (.deref (:cached-node+port->socket (:worker executor-data))) storm-conf user-context)
-                                 (BuiltinMetricsUtil/registerIconnectionServerMetric (:receiver (:worker executor-data)) storm-conf user-context))
-                               (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
-                                                                        "receive" (:receive-queue executor-data)}
-                                                                       storm-conf user-context))
-
-                             (.prepare bolt-obj
-                                       storm-conf
-                                       user-context
-                                       (OutputCollector.
-                                        (reify IOutputCollector
-                                          (emit [this stream anchors values]
-                                            (bolt-emit stream anchors values nil))
-                                          (emitDirect [this task stream anchors values]
-                                            (bolt-emit stream anchors values task))
-                                          (^void ack [this ^Tuple tuple]
-                                            (let [^TupleImpl tuple tuple
-                                                  ack-val (.getAckVal tuple)]
-                                              (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
-                                                             (send-unanchored task-data
-                                                                              Acker/ACKER_ACK_STREAM_ID
-                                                                              [root (bit-xor id ack-val)]
-                                                                              transfer-fn)))
-                                            (let [delta (tuple-time-delta! tuple)
-                                                  debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                                              (when debug?
-                                                (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                                              (.applyOn (BoltAckInfo. tuple task-id delta) user-context)
-                                              (when delta
-                                                (.boltAckedTuple executor-stats
-                                                                         (.getSourceComponent tuple)
-                                                                         (.getSourceStreamId tuple)
-                                                                         delta))))
-                                          (^void fail [this ^Tuple tuple]
-                                            (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                                            (send-unanchored task-data
-                                                                                  Acker/ACKER_FAIL_STREAM_ID
-                                                                                  [root]
-                                                                                  transfer-fn))
-                                            (let [delta (tuple-time-delta! tuple)
-                                                  debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                                              (when debug?
-                                                (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                                              (.applyOn (BoltFailInfo. tuple task-id delta) user-context)
-                                              (when delta
-                                                (.boltFailedTuple executor-stats
-                                                                          (.getSourceComponent tuple)
-                                                                          (.getSourceStreamId tuple)
-                                                                          delta))))
-                                          (^void resetTimeout [this ^Tuple tuple]
-                                            (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                                            (send-unanchored task-data
-                                                                                  Acker/ACKER_RESET_TIMEOUT_STREAM_ID
-                                                                                  [root]
-                                                                                  transfer-fn)))
-                                          (reportError [this error]
-                                            (report-error error))))))
-                           (reset! open-or-prepare-was-called? true)
-                           (log-message "Prepared bolt " component-id ":" (keys task-datas))
-                           (setup-metrics! executor-data)
-
-                           (let [receive-queue (:receive-queue executor-data)
-                                 event-handler (mk-task-receiver executor-data tuple-action-fn)]
-                             (fn []
-                               (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
-                               0)))]
-    ;; TODO: can get any SubscribedState objects out of the context now
-
-    [(Utils/asyncLoop
-      bolt-transfer-fn
-      false ; isDaemon
-      (:report-error-and-die executor-data)
-      Thread/NORM_PRIORITY
-      true ; isFactory
-      true ; startImmediately
-      (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defmethod close-component :spout [executor-data spout]
-  (.close spout))
-
-(defmethod close-component :bolt [executor-data bolt]
-  (.cleanup bolt))
-
-;; TODO: refactor this to be part of an executor-specific map
-(defmethod mk-executor-stats :spout [_ rate]
-  (SpoutExecutorStats. rate))
-
-(defmethod mk-executor-stats :bolt [_ rate]
-  (BoltExecutorStats. rate))

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
new file mode 100644
index 0000000..1e46e37
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
@@ -0,0 +1,42 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-executor
+  (:use [org.apache.storm util config log])
+  (:import [org.apache.storm.tuple AddressedTuple]
+           [org.apache.storm.executor Executor ExecutorTransfer])
+  (:import [org.apache.storm.utils DisruptorQueue])
+  (:import [org.apache.storm Config Constants]))
+
+(defn local-transfer-executor-tuple []
+  (fn [task tuple batch-transfer->worker]
+    (let [val (AddressedTuple. task tuple)]
+      (.publish ^DisruptorQueue batch-transfer->worker val))))
+
+(defn mk-local-executor-transfer [worker-topology-context batch-queue storm-conf transfer-fn]
+  (proxy [ExecutorTransfer] [worker-topology-context batch-queue storm-conf transfer-fn]
+    (transfer [task tuple]
+      (let [batch-transfer->worker (.getBatchTransferQueue this)]
+        ((local-transfer-executor-tuple) task tuple batch-transfer->worker)))))
+
+(defn mk-local-executor [workerData executorId credentials]
+  (let [executor (Executor/mkExecutor workerData executorId credentials)
+        worker-topology-context (.getWorkerTopologyContext executor)
+        batch-transfer-queue (.getTransferWorkerQueue executor)
+        storm-conf (.getStormConf executor)
+        transfer-fn (.getTransferFn executor)
+        local-executor-transfer (mk-local-executor-transfer worker-topology-context batch-transfer-queue storm-conf transfer-fn)]
+    (.setLocalExecutorTransfer executor local-executor-transfer)
+    (.execute executor)))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index dddce68..781558c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -18,11 +18,13 @@
   (:use [org.apache.storm config log util converter local-state-converter])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
-  (:require [org.apache.storm.daemon [executor :as executor]])
 
   (:require [clojure.set :as set])
+  (:require [org.apache.storm.daemon
+               [local-executor :as local-executor]])
   (:import [java.io File]
-           [org.apache.storm.stats StatsUtil])
+           [org.apache.storm.stats StatsUtil]
+           [java.util.concurrent.atomic AtomicBoolean AtomicReference])
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
@@ -48,10 +50,9 @@
   (:import [org.apache.logging.log4j.core.config LoggerConfig])
   (:import [org.apache.storm.generated LogConfig LogLevelAction])
   (:import [org.apache.storm StormTimer])
+  (:import [org.apache.storm.executor Executor])
   (:gen-class))
 
-(defmulti mk-suicide-fn cluster-mode)
-
 (defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
   (log-message "Reading Assignments.")
   (let [assignment (:executor->node+port (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)))]
@@ -69,7 +70,7 @@
   (let [stats (if-not executors
                   (StatsUtil/mkEmptyExecutorZkHbs (:executors worker))
                   (StatsUtil/convertExecutorZkHbs (->> executors
-                    (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
+                    (map (fn [e] {(.getExecutorId e) (.renderStats e)}))
                     (apply merge))))
         zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime worker) upTime))]
     ;; do the zookeeper heartbeat
@@ -96,7 +97,7 @@
 (defn worker-outbound-tasks
   "Returns seq of task-ids that receive messages from this worker"
   [worker]
-  (let [context (worker-context worker)
+  (let [context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
         components (mapcat
                      (fn [task-id]
                        (->> (.getComponentId context (int task-id))
@@ -145,11 +146,11 @@
             assignment-id (:assignment-id worker)
             port (:port worker)
             storm-cluster-state (:storm-cluster-state worker)
-            prev-backpressure-flag @(:backpressure worker)
+            prev-backpressure-flag (.get (:backpressure worker))
             ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
             curr-backpressure-flag (if executors
                                      (or (.getThrottleOn (:transfer-queue worker))
-                                       (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))
+                                       (reduce #(or %1 %2) (map #(.getBackPressureFlag %1) executors)))
                                      prev-backpressure-flag)]
         ;; update the worker's backpressure flag to zookeeper only when it has changed
         (when (not= prev-backpressure-flag curr-backpressure-flag)
@@ -157,7 +158,7 @@
             (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag)
             (.workerBackpressure storm-cluster-state storm-id assignment-id (long port) curr-backpressure-flag)
             ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
-            (reset! (:backpressure worker) curr-backpressure-flag)
+            (.set (:backpressure worker) curr-backpressure-flag)
             (catch Exception exc
               (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
         ))))
@@ -276,7 +277,7 @@
         mq-context  (if mq-context
                       mq-context
                       (TransportFactory/makeContext storm-conf))]
-
+    ;; TODO: when translating this function, use constants defined in Constants.java
     (recursive-map
       :conf conf
       :mq-context mq-context
@@ -290,9 +291,9 @@
       ;; when worker bootup, worker will start to setup initial connections to
       ;; other workers. When all connection is ready, we will enable this flag
       ;; and spout and bolt will be activated.
-      :worker-active-flag (atom false)
-      :storm-active-atom (atom false)
-      :storm-component->debug-atom (atom {})
+      :worker-active-flag (atom false) ;; used in worker only, keep it as atom
+      :storm-active-atom (AtomicBoolean. false)
+      :storm-component->debug-atom (AtomicReference.)
       :executors executors
       :task-ids (->> receive-queue-map keys (map int) sort)
       :storm-conf storm-conf
@@ -321,7 +322,7 @@
                                  (mapcat (fn [e] (for [t (executor->tasks e)] [t (first e)])))
                                  (into {})
                                  (HashMap.))
-      :suicide-fn (mk-suicide-fn conf)
+      :suicide-fn (Utils/mkSuicideFn)
       :uptime (Utils/makeUptimeComputer)
       :default-shared-resources (mk-default-resources <>)
       :user-shared-resources (mk-user-resources <>)
@@ -329,10 +330,10 @@
       :transfer-fn (mk-transfer-fn <>)
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
-      :backpressure (atom false) ;; whether this worker is going slow
-      :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
-      :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
-      :throttle-on (atom false) ;; whether throttle is activated for spouts
+      :backpressure (AtomicBoolean. false) ;; whether this worker is going slow
+      :transfer-backpressure (AtomicBoolean. false) ;; if the transfer queue is backed-up
+      :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
+      :throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
       )))
 
 (defn- endpoint->string [[node port]]
@@ -422,6 +423,7 @@
               current-connections (set (keys @(:cached-node+port->socket worker)))
               new-connections (set/difference needed-connections current-connections)
               remove-connections (set/difference current-connections needed-connections)]
+
               (swap! (:cached-node+port->socket worker)
                      #(HashMap. (merge (into {} %1) %2))
                      (into {}
@@ -455,11 +457,9 @@
                  (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
   ([worker callback]
     (let [base (clojurify-storm-base (.stormBase (:storm-cluster-state worker) (:storm-id worker) callback))]
-      (reset!
-        (:storm-active-atom worker)
-        (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
-      (reset! (:storm-component->debug-atom worker) (-> base :component->debug))
-      (log-debug "Event debug options " @(:storm-component->debug-atom worker)))))
+      (.set (:storm-active-atom worker) (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
+      (.set (:storm-component->debug-atom worker) (map-val thriftify-debugoptions (-> base :component->debug)))
+      (log-debug "Event debug options " (.get (:storm-component->debug-atom worker))))))
 
 ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
 (defn mk-transfer-tuples-handler [worker]
@@ -514,7 +514,7 @@
         ^IConnection socket (:receiver worker)]
     (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
     (.registerRecv socket (DeserializingConnectionCallback. (:storm-conf worker)
-                                                            (worker-context worker)
+                                                            (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
                                                             transfer-local-fn))))
 
 (defn- close-resources [worker]
@@ -605,7 +605,7 @@
 (defn run-worker-start-hooks [worker]
   (let [topology (:topology worker)
         topo-conf (:storm-conf worker)
-        worker-topology-context (worker-context worker)
+        worker-topology-context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
         hooks (.get_worker_hooks topology)]
     (dofor [hook hooks]
       (let [hook-bytes (Utils/toByteArray hook)
@@ -680,7 +680,9 @@
 
         _ (run-worker-start-hooks worker)
 
-        _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
+        _ (if (ConfigUtils/isLocalMode storm-conf)
+            (reset! executors (dofor [e (:executors worker)] (local-executor/mk-local-executor worker e initial-credentials)))
+            (reset! executors (dofor [e (:executors worker)] (.execute (Executor/mkExecutor worker e initial-credentials)))))
 
         transfer-tuples (mk-transfer-tuples-handler worker)
         
@@ -699,7 +701,7 @@
             (.start backpressure-thread))
         callback (fn cb []
                    (let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)]
-                     (reset! (:throttle-on worker) throttle-on)))
+                     (.set (:throttle-on worker) throttle-on)))
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
             (.topologyBackpressure storm-cluster-state storm-id callback))
 
@@ -766,14 +768,14 @@
                                     (let [new-creds (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id nil))]
                                       (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
                                         (AuthUtils/updateSubject subject auto-creds new-creds)
-                                        (dofor [e @executors] (.credentials-changed e new-creds))
+                                        (dofor [e @executors] (.credenetialsChanged e new-creds))
                                         (reset! credentials new-creds))))
        check-throttle-changed (fn []
                                 (let [callback (fn cb []
                                                  (let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
-                                                   (reset! (:throttle-on worker) throttle-on)))
+                                                   (.set (:throttle-on worker) throttle-on)))
                                       new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)]
-                                    (reset! (:throttle-on worker) new-throttle-on)))
+                                    (.set (:throttle-on worker) new-throttle-on)))
         check-log-config-changed (fn []
                                   (let [log-config (.topologyLogConfig (:storm-cluster-state worker) storm-id nil)]
                                     (process-log-config-change latest-log-config original-log-levels log-config)
@@ -809,14 +811,6 @@
     ret
     ))))))
 
-(defmethod mk-suicide-fn
-  :local [conf]
-  (fn [] (Utils/exitProcess 1 "Worker died")))
-
-(defmethod mk-suicide-fn
-  :distributed [conf]
-  (fn [] (Utils/exitProcess 1 "Worker died")))
-
 (defn -main [storm-id assignment-id port-str worker-id]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
     (Utils/setupDefaultUncaughtExceptionHandler)

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index d67b48d..dc676d6 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -17,15 +17,17 @@
 (ns org.apache.storm.testing
   (:require [org.apache.storm.daemon
              [nimbus :as nimbus]
+             [local-executor :as local-executor]
              [local-supervisor :as local-supervisor]
              [common :as common]
-             [worker :as worker]
-             [executor :as executor]])
+             [worker :as worker]])
   (:import [org.apache.commons.io FileUtils]
            [org.apache.storm.utils]
            [org.apache.storm.zookeeper Zookeeper]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager])
+           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]
+           [org.apache.storm.executor Executor]
+           [java.util.concurrent.atomic AtomicBoolean])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -697,12 +699,12 @@
          ;; of tuple emission (and not on a separate thread later) for
          ;; topologies to be tracked correctly. This is because "transferred" *must*
          ;; be incremented before "processing".
-         executor/mk-executor-transfer-fn
-         (let [old# executor/mk-executor-transfer-fn]
+         local-executor/local-transfer-executor-tuple
+         (let [old# local-executor/local-transfer-executor-tuple]
            (fn [& args#]
              (let [transferrer# (apply old# args#)]
                (fn [& args2#]
-                 ;; (log-message "Transferring: " transfer-args#)
+                 ;; (log-message "Transferring: " args2#)
                  (increment-global! id# "transferred" 1)
                  (apply transferrer# args2#)))))]
           (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
@@ -759,7 +761,7 @@
                   {}
                   (HashMap.)
                   (HashMap.)
-                  (atom false))]
+                  (AtomicBoolean. false))]
     (TupleImpl. context values 1 stream)))
 
 (defmacro with-timeout

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Constants.java b/storm-core/src/jvm/org/apache/storm/Constants.java
index b2c642e..9436db1 100644
--- a/storm-core/src/jvm/org/apache/storm/Constants.java
+++ b/storm-core/src/jvm/org/apache/storm/Constants.java
@@ -22,7 +22,7 @@ import clojure.lang.RT;
 
 
 public class Constants {
-    public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; 
+    public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
 
     public static final long SYSTEM_TASK_ID = -1;
     public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
@@ -32,5 +32,27 @@ public class Constants {
     public static final String METRICS_STREAM_ID = "__metrics";
     public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
     public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
+
+    public static final Object TOPOLOGY = "topology";
+    public static final String SYSTEM_TOPOLOGY = "system-topology";
+    public static final String STORM_CONF = "storm-conf";
+    public static final String STORM_ID = "storm-id";
+    public static final String WORKER_ID = "worker-id";
+    public static final String CONF = "conf";
+    public static final String PORT = "port";
+    public static final String TASK_TO_COMPONENT = "task->component";
+    public static final String COMPONENT_TO_SORTED_TASKS = "component->sorted-tasks";
+    public static final String COMPONENT_TO_STREAM_TO_FIELDS = "component->stream->fields";
+    public static final String TASK_IDS = "task-ids";
+    public static final String DEFAULT_SHARED_RESOURCES = "default-shared-resources";
+    public static final String USER_SHARED_RESOURCES = "user-shared-resources";
+    public static final String USER_TIMER = "user-timer";
+    public static final String TRANSFER_FN = "transfer-fn";
+    public static final String SUICIDE_FN = "suicide-fn";
+    public static final String THROTTLE_ON = "throttle-on";
+    public static final String EXECUTOR_RECEIVE_QUEUE_MAP = "executor-receive-queue-map";
+    public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
+    public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";
+    public static final Object LOAD_MAPPING = "load-mapping";
 }