You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/16 16:12:50 UTC

[3/5] storm git commit: Merge apache master branch into STORM-1272

Merge apache master branch into STORM-1272


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6696e3f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6696e3f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6696e3f1

Branch: refs/heads/master
Commit: 6696e3f1d8309f85d487a3361306f67c5608a0ff
Parents: 55b26dd d041183
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Feb 12 02:14:42 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Fri Feb 12 02:14:42 2016 +0530

----------------------------------------------------------------------
 CHANGELOG.md                                    |   4 +-
 README.markdown                                 |   1 +
 dev-tools/travis/travis-script.sh               |   4 +-
 external/sql/storm-sql-core/pom.xml             |   9 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 pom.xml                                         |   9 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 storm-core/src/clj/org/apache/storm/cluster.clj |  27 +-
 .../cluster_state/zookeeper_state_factory.clj   |  11 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  29 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  23 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 552 ++++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  68 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 170 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 200 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  70 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  36 -
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |  24 +-
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  83 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  97 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 923 +----------------
 .../src/clj/org/apache/storm/zookeeper.clj      |   1 -
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 989 +++++++++++++++++--
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       | 100 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  20 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 131 ++-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 645 ++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 79 files changed, 3042 insertions(+), 2357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 619a885,e2380b7..03db855
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -36,10 -36,11 +36,12 @@@
    (:import [org.apache.storm Config Constants])
    (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
    (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-   (:import [java.util.concurrent ConcurrentLinkedQueue]
-            (com.lmax.disruptor.dsl ProducerType))
+   (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+            [java.util.concurrent ConcurrentLinkedQueue]
 -           [org.json.simple JSONValue])
++           [org.json.simple JSONValue]
++           [com.lmax.disruptor.dsl ProducerType])
    (:require [org.apache.storm [thrift :as thrift]
--             [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]])
++             [cluster :as cluster] [stats :as stats]])
    (:require [org.apache.storm.daemon [task :as task]])
    (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
    (:require [clojure.set :as set]))
@@@ -300,17 -305,16 +306,19 @@@
          cached-emit (MutableObject. (ArrayList.))
          storm-conf (:storm-conf executor-data)
          serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
++        ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
++        handler (reify com.lmax.disruptor.EventHandler
++                  (onEvent [this o seq-id batch-end?]
++                    (let [^ArrayList alist (.getObject cached-emit)]
++                      (.add alist o)
++                      (when batch-end?
++                        (worker-transfer-fn serializer alist)
++                        (.setObject cached-emit (ArrayList.))))))
          ]
--    (disruptor/consume-loop*
--      (:batch-transfer-queue executor-data)
-       (reify com.lmax.disruptor.EventHandler
-         (onEvent [this o seq-id batch-end?]
-           (let [^ArrayList alist (.getObject cached-emit)]
-             (.add alist o)
-             (when batch-end?
-               (worker-transfer-fn serializer alist)
-               (.setObject cached-emit (ArrayList.))))))
-       :kill-fn (:report-error-and-die executor-data))))
 -      (disruptor/handler [o seq-id batch-end?]
 -        (let [^ArrayList alist (.getObject cached-emit)]
 -          (.add alist o)
 -          (when batch-end?
 -            (worker-transfer-fn serializer alist)
 -            (.setObject cached-emit (ArrayList.)))))
 -      :uncaught-exception-handler (:report-error-and-die executor-data))))
++    (Utils/asyncLoop
++      (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
++      (.getName batch-transfer-queue)
++      (:uncaught-exception-handler (:report-error-and-die executor-data)))))
  
  (defn setup-metrics! [executor-data]
    (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
@@@ -540,132 -549,130 +553,130 @@@
          has-ackers? (has-ackers? storm-conf)
          has-eventloggers? (has-eventloggers? storm-conf)
          emitted-count (MutableLong. 0)
-         empty-emit-streak (MutableLong. 0)]
-    
-     [(async-loop
-       (fn []
-         ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
-         (while (not @(:storm-active-atom executor-data))
-           (Thread/sleep 100))
-         
-         (log-message "Opening spout " component-id ":" (keys task-datas))
-         (builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas))))
-         (doseq [[task-id task-data] task-datas
-                 :let [^ISpout spout-obj (:object task-data)
-                      tasks-fn (:tasks-fn task-data)
-                      send-spout-msg (fn [out-stream-id values message-id out-task-id]
-                                        (.increment emitted-count)
-                                        (let [out-tasks (if out-task-id
-                                                          (tasks-fn out-task-id out-stream-id values)
-                                                          (tasks-fn out-stream-id values))
-                                              rooted? (and message-id has-ackers?)
-                                              root-id (if rooted? (MessageId/generateId rand))
-                                              ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
-                                          (fast-list-iter [out-task out-tasks id out-ids]
-                                                          (let [tuple-id (if rooted?
-                                                                           (MessageId/makeRootId root-id id)
-                                                                           (MessageId/makeUnanchored))
-                                                                out-tuple (TupleImpl. worker-context
-                                                                                      values
-                                                                                      task-id
-                                                                                      out-stream-id
-                                                                                      tuple-id)]
-                                                            (transfer-fn out-task out-tuple)))
-                                          (if has-eventloggers?
-                                            (send-to-eventlogger executor-data task-data values component-id message-id rand))
-                                          (if (and rooted?
-                                                   (not (.isEmpty out-ids)))
-                                            (do
-                                              (.put pending root-id [task-id
-                                                                     message-id
-                                                                     {:stream out-stream-id 
-                                                                      :values (if debug? values nil)}
-                                                                     (if (sampler) (System/currentTimeMillis))])
-                                              (task/send-unanchored task-data
-                                                                    ACKER-INIT-STREAM-ID
-                                                                    [root-id (bit-xor-vals out-ids) task-id]))
-                                            (when message-id
-                                              (ack-spout-msg executor-data task-data message-id
-                                                             {:stream out-stream-id :values values}
-                                                             (if (sampler) 0) "0:")))
-                                          (or out-tasks [])
-                                          ))]]
-           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
-           (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
-                                                    :receive receive-queue}
-                                                   storm-conf (:user-context task-data))
-           (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
- 
-           (.open spout-obj
-                  storm-conf
-                  (:user-context task-data)
-                  (SpoutOutputCollector.
-                   (reify ISpoutOutputCollector
-                     (^long getPendingCount[this]
-                       (.size pending)
-                       )
-                     (^List emit [this ^String stream-id ^List tuple ^Object message-id]
-                       (send-spout-msg stream-id tuple message-id nil)
-                       )
-                     (^void emitDirect [this ^int out-task-id ^String stream-id
-                                        ^List tuple ^Object message-id]
-                       (send-spout-msg stream-id tuple message-id out-task-id)
-                       )
-                     (reportError [this error]
-                       (report-error error)
-                       )))))
-         (reset! open-or-prepare-was-called? true) 
-         (log-message "Opened spout " component-id ":" (keys task-datas))
-         (setup-metrics! executor-data)
-         
-         (fn []
-           ;; This design requires that spouts be non-blocking
-           (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-           
-           (let [active? @(:storm-active-atom executor-data)
-                 curr-count (.get emitted-count)
-                 backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
-                 throttle-on (and backpressure-enabled
-                               @(:throttle-on (:worker executor-data)))
-                 reached-max-spout-pending (and max-spout-pending
-                                                (>= (.size pending) max-spout-pending))
-                 ]
-             (if active?
-               ; activated
-               (do
-                 (when-not @last-active
-                   (reset! last-active true)
-                   (log-message "Activating spout " component-id ":" (keys task-datas))
-                   (fast-list-iter [^ISpout spout spouts] (.activate spout)))
- 
-                 (if (and (not (.isFull transfer-queue))
-                       (not throttle-on)
-                       (not reached-max-spout-pending))
-                   (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
-               ; deactivated
-               (do
-                 (when @last-active
-                   (reset! last-active false)
-                   (log-message "Deactivating spout " component-id ":" (keys task-datas))
-                   (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
-                 ;; TODO: log that it's getting throttled
-                 (Time/sleep 100)
-                 (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data) (:stats executor-data))))
- 
-             (if (and (= curr-count (.get emitted-count)) active?)
-               (do (.increment empty-emit-streak)
-                   (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
-                   ;; update the spout throttling metrics
-                   (if throttle-on
-                     (builtin-metrics/skipped-throttle! (:spout-throttling-metrics executor-data) (:stats executor-data))
-                     (if reached-max-spout-pending
-                       (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data) (:stats executor-data)))))
-               (.set empty-emit-streak 0)
-               ))
-           0))
-       :kill-fn (:report-error-and-die executor-data)
-       :factory? true
-       :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+         empty-emit-streak (MutableLong. 0)
+         spout-transfer-fn (fn []
+                             ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
+                             (while (not @(:storm-active-atom executor-data))
+                               (Thread/sleep 100))
+                             (log-message "Opening spout " component-id ":" (keys task-datas))
+                             (builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas))))
+                             (doseq [[task-id task-data] task-datas
+                                     :let [^ISpout spout-obj (:object task-data)
+                                           tasks-fn (:tasks-fn task-data)
+                                           send-spout-msg (fn [out-stream-id values message-id out-task-id]
+                                                            (.increment emitted-count)
+                                                            (let [out-tasks (if out-task-id
+                                                                              (tasks-fn out-task-id out-stream-id values)
+                                                                              (tasks-fn out-stream-id values))
+                                                                  rooted? (and message-id has-ackers?)
+                                                                  root-id (if rooted? (MessageId/generateId rand))
+                                                                  ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
+                                                              (fast-list-iter [out-task out-tasks id out-ids]
+                                                                              (let [tuple-id (if rooted?
+                                                                                               (MessageId/makeRootId root-id id)
+                                                                                               (MessageId/makeUnanchored))
+                                                                                    out-tuple (TupleImpl. worker-context
+                                                                                                          values
+                                                                                                          task-id
+                                                                                                          out-stream-id
+                                                                                                          tuple-id)]
+                                                                                (transfer-fn out-task out-tuple)))
+                                                              (if has-eventloggers?
+                                                                (send-to-eventlogger executor-data task-data values component-id message-id rand))
+                                                              (if (and rooted?
+                                                                       (not (.isEmpty out-ids)))
+                                                                (do
+                                                                  (.put pending root-id [task-id
+                                                                                         message-id
+                                                                                         {:stream out-stream-id 
+                                                                                          :values (if debug? values nil)}
+                                                                                         (if (sampler) (System/currentTimeMillis))])
+                                                                  (task/send-unanchored task-data
+                                                                                        ACKER-INIT-STREAM-ID
+                                                                                        [root-id (bit-xor-vals out-ids) task-id]))
+                                                                (when message-id
+                                                                  (ack-spout-msg executor-data task-data message-id
+                                                                                 {:stream out-stream-id :values values}
+                                                                                 (if (sampler) 0) "0:")))
+                                                              (or out-tasks [])))]]
+ 
+                               (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
+                               (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
+                                                                        :receive receive-queue}
+                                                                       storm-conf (:user-context task-data))
+                               (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
+ 
+                               (.open spout-obj
+                                      storm-conf
+                                      (:user-context task-data)
+                                      (SpoutOutputCollector.
+                                       (reify ISpoutOutputCollector
+                                         (^long getPendingCount[this]
+                                           (.size pending))
+                                         (^List emit [this ^String stream-id ^List tuple ^Object message-id]
+                                           (send-spout-msg stream-id tuple message-id nil))
+                                         (^void emitDirect [this ^int out-task-id ^String stream-id
+                                                            ^List tuple ^Object message-id]
+                                           (send-spout-msg stream-id tuple message-id out-task-id))
+                                         (reportError [this error]
+                                           (report-error error))))))
+ 
+                             (reset! open-or-prepare-was-called? true) 
+                             (log-message "Opened spout " component-id ":" (keys task-datas))
+                             (setup-metrics! executor-data)
+ 
+                             (fn []
+                               ;; This design requires that spouts be non-blocking
 -                              (disruptor/consume-batch receive-queue event-handler)
++                              (.consumeBatch ^DisruptorQueue receive-queue event-handler)
+ 
+                               (let [active? @(:storm-active-atom executor-data)
+                                     curr-count (.get emitted-count)
+                                     backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
+                                     throttle-on (and backpressure-enabled
+                                                      @(:throttle-on (:worker executor-data)))
+                                     reached-max-spout-pending (and max-spout-pending
+                                                                    (>= (.size pending) max-spout-pending))]
+                                 (if active?
+                                         ; activated
+                                   (do
+                                     (when-not @last-active
+                                       (reset! last-active true)
+                                       (log-message "Activating spout " component-id ":" (keys task-datas))
+                                       (fast-list-iter [^ISpout spout spouts] (.activate spout)))
+ 
+                                     (if (and (not (.isFull transfer-queue))
+                                              (not throttle-on)
+                                              (not reached-max-spout-pending))
+                                       (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
+                                         ; deactivated
+                                   (do
+                                     (when @last-active
+                                       (reset! last-active false)
+                                       (log-message "Deactivating spout " component-id ":" (keys task-datas))
+                                       (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
+                                     ;; TODO: log that it's getting throttled
+                                     (Time/sleep 100)
+                                     (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data) (:stats executor-data))))
+ 
+                                 (if (and (= curr-count (.get emitted-count)) active?)
+                                   (do (.increment empty-emit-streak)
+                                       (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
+                                       ;; update the spout throttling metrics
+                                       (if throttle-on
+                                         (builtin-metrics/skipped-throttle! (:spout-throttling-metrics executor-data) (:stats executor-data))
+                                         (if reached-max-spout-pending
+                                           (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data) (:stats executor-data)))))
+                                   (.set empty-emit-streak 0)))
+                               0))]
+ 
+     [(Utils/asyncLoop
+       spout-transfer-fn
+       false ; isDaemon
+       (:report-error-and-die executor-data)
+       Thread/NORM_PRIORITY
+       true ; isFactory
+       true ; startImmediately
+       (str component-id "-executor" (:executor-id executor-data)))]))
  
  (defn- tuple-time-delta! [^TupleImpl tuple]
    (let [ms (.getProcessSampleStartTime tuple)]
@@@ -736,115 -743,116 +747,116 @@@
                                                                 (.getSourceComponent tuple)
                                                                 (.getSourceStreamId tuple)
                                                                 delta)))))))
-         has-eventloggers? (has-eventloggers? storm-conf)]
-     
+         has-eventloggers? (has-eventloggers? storm-conf)
+         bolt-transfer-fn (fn []
+                            ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
+                            (while (not @(:storm-active-atom executor-data))
+                              (Thread/sleep 100))
+ 
+                            (log-message "Preparing bolt " component-id ":" (keys task-datas))
+                            (doseq [[task-id task-data] task-datas
+                                    :let [^IBolt bolt-obj (:object task-data)
+                                          tasks-fn (:tasks-fn task-data)
+                                          user-context (:user-context task-data)
+                                          bolt-emit (fn [stream anchors values task]
+                                                      (let [out-tasks (if task
+                                                                        (tasks-fn task stream values)
+                                                                        (tasks-fn stream values))]
+                                                        (fast-list-iter [t out-tasks]
+                                                                        (let [anchors-to-ids (HashMap.)]
+                                                                          (fast-list-iter [^TupleImpl a anchors]
+                                                                                          (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
+                                                                                            (when (pos? (count root-ids))
+                                                                                              (let [edge-id (MessageId/generateId rand)]
+                                                                                                (.updateAckVal a edge-id)
+                                                                                                (fast-list-iter [root-id root-ids]
+                                                                                                                (put-xor! anchors-to-ids root-id edge-id))))))
+                                                                          (let [tuple (TupleImpl. worker-context
+                                                                                                  values
+                                                                                                  task-id
+                                                                                                  stream
+                                                                                                  (MessageId/makeId anchors-to-ids))]
+                                                                            (transfer-fn t tuple))))
+                                                        (if has-eventloggers?
+                                                          (send-to-eventlogger executor-data task-data values component-id nil rand))
+                                                        (or out-tasks [])))]]
+                              (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
+                              (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) 
+                              (if (= component-id Constants/SYSTEM_COMPONENT_ID)
+                                (do
+                                  (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
+                                                                           :receive (:receive-queue executor-data)
+                                                                           :transfer (:transfer-queue (:worker executor-data))}
+                                                                          storm-conf user-context)
+                                  (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
+                                  (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context))
+                                (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
+                                                                         :receive (:receive-queue executor-data)}
+                                                                        storm-conf user-context))
+ 
+                              (.prepare bolt-obj
+                                        storm-conf
+                                        user-context
+                                        (OutputCollector.
+                                         (reify IOutputCollector
+                                           (emit [this stream anchors values]
+                                             (bolt-emit stream anchors values nil))
+                                           (emitDirect [this task stream anchors values]
+                                             (bolt-emit stream anchors values task))
+                                           (^void ack [this ^Tuple tuple]
+                                             (let [^TupleImpl tuple tuple
+                                                   ack-val (.getAckVal tuple)]
+                                               (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
+                                                              (task/send-unanchored task-data
+                                                                                    ACKER-ACK-STREAM-ID
+                                                                                    [root (bit-xor id ack-val)])))
+                                             (let [delta (tuple-time-delta! tuple)
+                                                   debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+                                               (when debug?
+                                                 (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+                                               (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
+                                               (when delta
+                                                 (stats/bolt-acked-tuple! executor-stats
+                                                                          (.getSourceComponent tuple)
+                                                                          (.getSourceStreamId tuple)
+                                                                          delta))))
+                                           (^void fail [this ^Tuple tuple]
+                                             (fast-list-iter [root (.. tuple getMessageId getAnchors)]
+                                                             (task/send-unanchored task-data
+                                                                                   ACKER-FAIL-STREAM-ID
+                                                                                   [root]))
+                                             (let [delta (tuple-time-delta! tuple)
+                                                   debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+                                               (when debug?
+                                                 (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+                                               (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
+                                               (when delta
+                                                 (stats/bolt-failed-tuple! executor-stats
+                                                                           (.getSourceComponent tuple)
+                                                                           (.getSourceStreamId tuple)
+                                                                           delta))))
+                                           (reportError [this error]
+                                             (report-error error))))))
+ 
+                            (reset! open-or-prepare-was-called? true)
+                            (log-message "Prepared bolt " component-id ":" (keys task-datas))
+                            (setup-metrics! executor-data)
+ 
+                            (let [receive-queue (:receive-queue executor-data)
+                                  event-handler (mk-task-receiver executor-data tuple-action-fn)]
+                              (fn []
 -                               (disruptor/consume-batch-when-available receive-queue event-handler)
++                               (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
+                                0)))]
      ;; TODO: can get any SubscribedState objects out of the context now
  
-     [(async-loop
-       (fn []
-         ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
-         (while (not @(:storm-active-atom executor-data))          
-           (Thread/sleep 100))
-         
-         (log-message "Preparing bolt " component-id ":" (keys task-datas))
-         (doseq [[task-id task-data] task-datas
-                 :let [^IBolt bolt-obj (:object task-data)
-                       tasks-fn (:tasks-fn task-data)
-                       user-context (:user-context task-data)
-                       bolt-emit (fn [stream anchors values task]
-                                   (let [out-tasks (if task
-                                                     (tasks-fn task stream values)
-                                                     (tasks-fn stream values))]
-                                     (fast-list-iter [t out-tasks]
-                                                     (let [anchors-to-ids (HashMap.)]
-                                                       (fast-list-iter [^TupleImpl a anchors]
-                                                                       (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
-                                                                         (when (pos? (count root-ids))
-                                                                           (let [edge-id (MessageId/generateId rand)]
-                                                                             (.updateAckVal a edge-id)
-                                                                             (fast-list-iter [root-id root-ids]
-                                                                                             (put-xor! anchors-to-ids root-id edge-id))
-                                                                             ))))
-                                                         (let [tuple (TupleImpl. worker-context
-                                                                                values
-                                                                                task-id
-                                                                                stream
-                                                                                (MessageId/makeId anchors-to-ids))]
-                                                           (transfer-fn t tuple))))
-                                     (if has-eventloggers?
-                                       (send-to-eventlogger executor-data task-data values component-id nil rand))
-                                     (or out-tasks [])))]]
-           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
-           (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) 
-           (if (= component-id Constants/SYSTEM_COMPONENT_ID)
-             (do
-               (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
-                                                        :receive (:receive-queue executor-data)
-                                                        :transfer (:transfer-queue (:worker executor-data))}
-                                                       storm-conf user-context)
-               (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
-               (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context))
-             (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
-                                                      :receive (:receive-queue executor-data)}
-                                                     storm-conf user-context)
-             )
- 
-           (.prepare bolt-obj
-                     storm-conf
-                     user-context
-                     (OutputCollector.
-                      (reify IOutputCollector
-                        (emit [this stream anchors values]
-                          (bolt-emit stream anchors values nil))
-                        (emitDirect [this task stream anchors values]
-                          (bolt-emit stream anchors values task))
-                        (^void ack [this ^Tuple tuple]
-                          (let [^TupleImpl tuple tuple
-                                ack-val (.getAckVal tuple)]
-                            (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
-                                           (task/send-unanchored task-data
-                                                                 ACKER-ACK-STREAM-ID
-                                                                 [root (bit-xor id ack-val)])))
-                          (let [delta (tuple-time-delta! tuple)
-                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                            (when debug? 
-                              (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
-                            (when delta
-                              (stats/bolt-acked-tuple! executor-stats
-                                                       (.getSourceComponent tuple)
-                                                       (.getSourceStreamId tuple)
-                                                       delta))))
-                        (^void fail [this ^Tuple tuple]
-                          (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                          (task/send-unanchored task-data
-                                                                ACKER-FAIL-STREAM-ID
-                                                                [root]))
-                          (let [delta (tuple-time-delta! tuple)
-                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                            (when debug? 
-                              (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
-                            (when delta
-                              (stats/bolt-failed-tuple! executor-stats
-                                                        (.getSourceComponent tuple)
-                                                        (.getSourceStreamId tuple)
-                                                        delta))))
-                        (reportError [this error]
-                          (report-error error)
-                          )))))
-         (reset! open-or-prepare-was-called? true)        
-         (log-message "Prepared bolt " component-id ":" (keys task-datas))
-         (setup-metrics! executor-data)
- 
-         (let [receive-queue (:receive-queue executor-data)
-               event-handler (mk-task-receiver executor-data tuple-action-fn)]
-           (fn []            
-             (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
-             0)))
-       :kill-fn (:report-error-and-die executor-data)
-       :factory? true
-       :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+     [(Utils/asyncLoop
+       bolt-transfer-fn
+       false ; isDaemon
+       (:report-error-and-die executor-data)
+       Thread/NORM_PRIORITY
+       true ; isFactory
+       true ; startImmediately
+       (str component-id "-executor" (:executor-id executor-data)))]))
  
  (defmethod close-component :spout [executor-data spout]
    (.close spout))

http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index bfece6a,b2bdcdb..1f530ac
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -19,13 -19,16 +19,16 @@@
    (:require [clj-time.core :as time])
    (:require [clj-time.coerce :as coerce])
    (:require [org.apache.storm.daemon [executor :as executor]])
--  (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
++  (:require [org.apache.storm [cluster :as cluster]])
    (:require [clojure.set :as set])
    (:require [org.apache.storm.messaging.loader :as msg-loader])
    (:import [java.util.concurrent Executors]
-            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
-   (:import [java.util ArrayList HashMap])
-   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
+            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
+            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
++  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time WorkerBackpressureCallback DisruptorBackpressureCallback])
+   (:import [java.util ArrayList HashMap]
+            [java.util.concurrent.locks ReentrantReadWriteLock])
+   (:import [org.apache.commons.io FileUtils])
 -  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time])
    (:import [org.apache.storm.grouping LoadMapping])
    (:import [org.apache.storm.messaging TransportFactory])
    (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
@@@ -632,7 -658,7 +658,9 @@@
  
          transfer-tuples (mk-transfer-tuples-handler worker)
          
--        transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)               
++        transfer-thread (Utils/asyncLoop
++                          (fn []
++                            (.consumeBatchWhenAvailable ^DisruptorQueue (:transfer-queue worker) transfer-tuples) 0))
  
          disruptor-handler (mk-disruptor-backpressure-handler worker)
          _ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)

http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/disruptor.clj
index 78b16dc,e2211c0..0000000
deleted file mode 100644,100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ /dev/null
@@@ -1,36 -1,89 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements.  See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership.  The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License.  You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--
--(ns org.apache.storm.disruptor
-   (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
 -  (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback Utils])
--  (:import [com.lmax.disruptor.dsl ProducerType])
--  (:require [clojure [string :as str]])
--  (:require [clojure [set :as set]])
--  (:use [clojure walk])
--  (:use [org.apache.storm util log]))
 -
 -(def PRODUCER-TYPE
 -  {:multi-threaded ProducerType/MULTI
 -   :single-threaded ProducerType/SINGLE})
 -
 -(defnk disruptor-queue
 -  [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
 -  (DisruptorQueue. queue-name
 -                   (PRODUCER-TYPE producer-type) buffer-size
 -                   timeout batch-size batch-timeout))
 -
 -(defn clojure-handler
 -  [afn]
 -  (reify com.lmax.disruptor.EventHandler
 -    (onEvent
 -      [this o seq-id batchEnd?]
 -      (afn o seq-id batchEnd?))))
 -
 -(defn disruptor-backpressure-handler
 -  [afn-high-wm afn-low-wm]
 -  (reify DisruptorBackpressureCallback
 -    (highWaterMark
 -      [this]
 -      (afn-high-wm))
 -    (lowWaterMark
 -      [this]
 -      (afn-low-wm))))
 -
 -(defn worker-backpressure-handler
 -  [afn]
 -  (reify WorkerBackpressureCallback
 -    (onEvent
 -      [this o]
 -      (afn o))))
 -
 -(defmacro handler
 -  [& args]
 -  `(clojure-handler (fn ~@args)))
--
 -(defn publish
 -  [^DisruptorQueue q o]
 -  (.publish q o))
--
 -(defn consume-batch
 -  [^DisruptorQueue queue handler]
 -  (.consumeBatch queue handler))
--
 -(defn consume-batch-when-available
 -  [^DisruptorQueue queue handler]
 -  (.consumeBatchWhenAvailable queue handler))
--
 -(defn halt-with-interrupt!
 -  [^DisruptorQueue queue]
 -  (.haltWithInterrupt queue))
--
--(defnk consume-loop*
--  [^DisruptorQueue queue handler
-    :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
-   (async-loop
-           (fn [] (.consumeBatchWhenAvailable ^DisruptorQueue queue handler) 0)
-           :kill-fn kill-fn
-           :thread-name (.getName queue)))
 -   :uncaught-exception-handler nil]
 -  (Utils/asyncLoop
 -          (fn [] (consume-batch-when-available queue handler) 0)
 -          (.getName queue)
 -          uncaught-exception-handler))
--
 -(defmacro consume-loop [queue & handler-args]
 -  `(let [handler# (handler ~@handler-args)]
 -     (consume-loop* ~queue handler#)))