You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/16 16:12:50 UTC
[3/5] storm git commit: Merge apache master branch into STORM-1272
Merge apache master branch into STORM-1272
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6696e3f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6696e3f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6696e3f1
Branch: refs/heads/master
Commit: 6696e3f1d8309f85d487a3361306f67c5608a0ff
Parents: 55b26dd d041183
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Feb 12 02:14:42 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Fri Feb 12 02:14:42 2016 +0530
----------------------------------------------------------------------
CHANGELOG.md | 4 +-
README.markdown | 1 +
dev-tools/travis/travis-script.sh | 4 +-
external/sql/storm-sql-core/pom.xml | 9 +
.../storm/hbase/security/HBaseSecurityUtil.java | 36 +-
external/storm-mqtt/core/pom.xml | 4 +-
pom.xml | 9 +-
storm-core/pom.xml | 11 +-
.../src/clj/org/apache/storm/LocalCluster.clj | 4 +-
storm-core/src/clj/org/apache/storm/clojure.clj | 8 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 27 +-
.../cluster_state/zookeeper_state_factory.clj | 11 +-
.../clj/org/apache/storm/command/blobstore.clj | 11 +-
.../org/apache/storm/command/dev_zookeeper.clj | 6 +-
.../clj/org/apache/storm/command/get_errors.clj | 12 +-
.../apache/storm/command/shell_submission.clj | 4 +-
storm-core/src/clj/org/apache/storm/config.clj | 18 +-
.../src/clj/org/apache/storm/converter.clj | 14 +-
.../src/clj/org/apache/storm/daemon/acker.clj | 21 +-
.../src/clj/org/apache/storm/daemon/common.clj | 29 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 23 +-
.../clj/org/apache/storm/daemon/executor.clj | 552 ++++++-----
.../clj/org/apache/storm/daemon/logviewer.clj | 68 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 170 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 200 ++--
.../src/clj/org/apache/storm/daemon/task.clj | 2 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 70 +-
.../src/clj/org/apache/storm/disruptor.clj | 36 -
storm-core/src/clj/org/apache/storm/event.clj | 2 +-
.../src/clj/org/apache/storm/local_state.clj | 9 +-
.../org/apache/storm/pacemaker/pacemaker.clj | 7 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 24 +-
.../clj/org/apache/storm/process_simulator.clj | 4 +-
.../apache/storm/scheduler/DefaultScheduler.clj | 7 +-
.../apache/storm/scheduler/EvenScheduler.clj | 23 +-
.../storm/scheduler/IsolationScheduler.clj | 29 +-
storm-core/src/clj/org/apache/storm/stats.clj | 82 +-
storm-core/src/clj/org/apache/storm/testing.clj | 83 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 6 +-
storm-core/src/clj/org/apache/storm/timer.clj | 12 +-
.../clj/org/apache/storm/trident/testing.clj | 9 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 97 +-
.../src/clj/org/apache/storm/ui/helpers.clj | 14 +-
storm-core/src/clj/org/apache/storm/util.clj | 923 +----------------
.../src/clj/org/apache/storm/zookeeper.clj | 1 -
.../storm/logging/ThriftAccessLogger.java | 13 +-
.../serialization/SerializationFactory.java | 17 +-
.../staticmocking/MockedConfigUtils.java | 31 -
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +-
.../jvm/org/apache/storm/utils/Container.java | 11 +-
.../jvm/org/apache/storm/utils/IPredicate.java | 27 +
.../org/apache/storm/utils/NimbusClient.java | 2 +-
.../jvm/org/apache/storm/utils/TestUtils.java | 34 -
.../src/jvm/org/apache/storm/utils/Time.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 989 +++++++++++++++++--
.../storm/validation/ConfigValidation.java | 2 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 7 +
.../org/apache/storm/integration_test.clj | 100 +-
.../org/apache/storm/testing4j_test.clj | 37 +-
.../apache/storm/trident/integration_test.clj | 15 +-
.../test/clj/org/apache/storm/cluster_test.clj | 20 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../clj/org/apache/storm/logviewer_test.clj | 267 ++---
.../storm/messaging/netty_integration_test.clj | 2 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 131 ++-
.../scheduler/resource_aware_scheduler_test.clj | 21 +-
.../apache/storm/security/auth/auth_test.clj | 11 +-
.../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +-
.../BlowfishTupleSerializer_test.clj | 1 -
.../clj/org/apache/storm/serialization_test.clj | 23 +-
.../clj/org/apache/storm/supervisor_test.clj | 645 ++++++------
.../clj/org/apache/storm/transactional_test.clj | 18 +
.../clj/org/apache/storm/trident/state_test.clj | 5 +-
.../clj/org/apache/storm/trident/tuple_test.clj | 15 +-
.../test/clj/org/apache/storm/utils_test.clj | 16 +-
.../test/clj/org/apache/storm/worker_test.clj | 1 -
.../staticmocking/ConfigUtilsInstaller.java | 38 +
.../utils/staticmocking/UtilsInstaller.java | 38 +
.../storm/utils/staticmocking/package-info.java | 95 ++
79 files changed, 3042 insertions(+), 2357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 619a885,e2380b7..03db855
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -36,10 -36,11 +36,12 @@@
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
(:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
- (:import [java.util.concurrent ConcurrentLinkedQueue]
- (com.lmax.disruptor.dsl ProducerType))
+ (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+ [java.util.concurrent ConcurrentLinkedQueue]
- [org.json.simple JSONValue])
++ [org.json.simple JSONValue]
++ [com.lmax.disruptor.dsl ProducerType])
(:require [org.apache.storm [thrift :as thrift]
-- [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]])
++ [cluster :as cluster] [stats :as stats]])
(:require [org.apache.storm.daemon [task :as task]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
(:require [clojure.set :as set]))
@@@ -300,17 -305,16 +306,19 @@@
cached-emit (MutableObject. (ArrayList.))
storm-conf (:storm-conf executor-data)
serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
++ ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
++ handler (reify com.lmax.disruptor.EventHandler
++ (onEvent [this o seq-id batch-end?]
++ (let [^ArrayList alist (.getObject cached-emit)]
++ (.add alist o)
++ (when batch-end?
++ (worker-transfer-fn serializer alist)
++ (.setObject cached-emit (ArrayList.))))))
]
-- (disruptor/consume-loop*
-- (:batch-transfer-queue executor-data)
- (reify com.lmax.disruptor.EventHandler
- (onEvent [this o seq-id batch-end?]
- (let [^ArrayList alist (.getObject cached-emit)]
- (.add alist o)
- (when batch-end?
- (worker-transfer-fn serializer alist)
- (.setObject cached-emit (ArrayList.))))))
- :kill-fn (:report-error-and-die executor-data))))
- (disruptor/handler [o seq-id batch-end?]
- (let [^ArrayList alist (.getObject cached-emit)]
- (.add alist o)
- (when batch-end?
- (worker-transfer-fn serializer alist)
- (.setObject cached-emit (ArrayList.)))))
- :uncaught-exception-handler (:report-error-and-die executor-data))))
++ (Utils/asyncLoop
++ (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
++ (.getName batch-transfer-queue)
++ (:uncaught-exception-handler (:report-error-and-die executor-data)))))
(defn setup-metrics! [executor-data]
(let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
@@@ -540,132 -549,130 +553,130 @@@
has-ackers? (has-ackers? storm-conf)
has-eventloggers? (has-eventloggers? storm-conf)
emitted-count (MutableLong. 0)
- empty-emit-streak (MutableLong. 0)]
-
- [(async-loop
- (fn []
- ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
- (while (not @(:storm-active-atom executor-data))
- (Thread/sleep 100))
-
- (log-message "Opening spout " component-id ":" (keys task-datas))
- (builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas))))
- (doseq [[task-id task-data] task-datas
- :let [^ISpout spout-obj (:object task-data)
- tasks-fn (:tasks-fn task-data)
- send-spout-msg (fn [out-stream-id values message-id out-task-id]
- (.increment emitted-count)
- (let [out-tasks (if out-task-id
- (tasks-fn out-task-id out-stream-id values)
- (tasks-fn out-stream-id values))
- rooted? (and message-id has-ackers?)
- root-id (if rooted? (MessageId/generateId rand))
- ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
- (fast-list-iter [out-task out-tasks id out-ids]
- (let [tuple-id (if rooted?
- (MessageId/makeRootId root-id id)
- (MessageId/makeUnanchored))
- out-tuple (TupleImpl. worker-context
- values
- task-id
- out-stream-id
- tuple-id)]
- (transfer-fn out-task out-tuple)))
- (if has-eventloggers?
- (send-to-eventlogger executor-data task-data values component-id message-id rand))
- (if (and rooted?
- (not (.isEmpty out-ids)))
- (do
- (.put pending root-id [task-id
- message-id
- {:stream out-stream-id
- :values (if debug? values nil)}
- (if (sampler) (System/currentTimeMillis))])
- (task/send-unanchored task-data
- ACKER-INIT-STREAM-ID
- [root-id (bit-xor-vals out-ids) task-id]))
- (when message-id
- (ack-spout-msg executor-data task-data message-id
- {:stream out-stream-id :values values}
- (if (sampler) 0) "0:")))
- (or out-tasks [])
- ))]]
- (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
- (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
- :receive receive-queue}
- storm-conf (:user-context task-data))
- (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
-
- (.open spout-obj
- storm-conf
- (:user-context task-data)
- (SpoutOutputCollector.
- (reify ISpoutOutputCollector
- (^long getPendingCount[this]
- (.size pending)
- )
- (^List emit [this ^String stream-id ^List tuple ^Object message-id]
- (send-spout-msg stream-id tuple message-id nil)
- )
- (^void emitDirect [this ^int out-task-id ^String stream-id
- ^List tuple ^Object message-id]
- (send-spout-msg stream-id tuple message-id out-task-id)
- )
- (reportError [this error]
- (report-error error)
- )))))
- (reset! open-or-prepare-was-called? true)
- (log-message "Opened spout " component-id ":" (keys task-datas))
- (setup-metrics! executor-data)
-
- (fn []
- ;; This design requires that spouts be non-blocking
- (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-
- (let [active? @(:storm-active-atom executor-data)
- curr-count (.get emitted-count)
- backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
- throttle-on (and backpressure-enabled
- @(:throttle-on (:worker executor-data)))
- reached-max-spout-pending (and max-spout-pending
- (>= (.size pending) max-spout-pending))
- ]
- (if active?
- ; activated
- (do
- (when-not @last-active
- (reset! last-active true)
- (log-message "Activating spout " component-id ":" (keys task-datas))
- (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-
- (if (and (not (.isFull transfer-queue))
- (not throttle-on)
- (not reached-max-spout-pending))
- (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
- ; deactivated
- (do
- (when @last-active
- (reset! last-active false)
- (log-message "Deactivating spout " component-id ":" (keys task-datas))
- (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
- ;; TODO: log that it's getting throttled
- (Time/sleep 100)
- (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data) (:stats executor-data))))
-
- (if (and (= curr-count (.get emitted-count)) active?)
- (do (.increment empty-emit-streak)
- (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
- ;; update the spout throttling metrics
- (if throttle-on
- (builtin-metrics/skipped-throttle! (:spout-throttling-metrics executor-data) (:stats executor-data))
- (if reached-max-spout-pending
- (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data) (:stats executor-data)))))
- (.set empty-emit-streak 0)
- ))
- 0))
- :kill-fn (:report-error-and-die executor-data)
- :factory? true
- :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+ empty-emit-streak (MutableLong. 0)
+ spout-transfer-fn (fn []
+ ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
+ (while (not @(:storm-active-atom executor-data))
+ (Thread/sleep 100))
+ (log-message "Opening spout " component-id ":" (keys task-datas))
+ (builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas))))
+ (doseq [[task-id task-data] task-datas
+ :let [^ISpout spout-obj (:object task-data)
+ tasks-fn (:tasks-fn task-data)
+ send-spout-msg (fn [out-stream-id values message-id out-task-id]
+ (.increment emitted-count)
+ (let [out-tasks (if out-task-id
+ (tasks-fn out-task-id out-stream-id values)
+ (tasks-fn out-stream-id values))
+ rooted? (and message-id has-ackers?)
+ root-id (if rooted? (MessageId/generateId rand))
+ ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
+ (fast-list-iter [out-task out-tasks id out-ids]
+ (let [tuple-id (if rooted?
+ (MessageId/makeRootId root-id id)
+ (MessageId/makeUnanchored))
+ out-tuple (TupleImpl. worker-context
+ values
+ task-id
+ out-stream-id
+ tuple-id)]
+ (transfer-fn out-task out-tuple)))
+ (if has-eventloggers?
+ (send-to-eventlogger executor-data task-data values component-id message-id rand))
+ (if (and rooted?
+ (not (.isEmpty out-ids)))
+ (do
+ (.put pending root-id [task-id
+ message-id
+ {:stream out-stream-id
+ :values (if debug? values nil)}
+ (if (sampler) (System/currentTimeMillis))])
+ (task/send-unanchored task-data
+ ACKER-INIT-STREAM-ID
+ [root-id (bit-xor-vals out-ids) task-id]))
+ (when message-id
+ (ack-spout-msg executor-data task-data message-id
+ {:stream out-stream-id :values values}
+ (if (sampler) 0) "0:")))
+ (or out-tasks [])))]]
+
+ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
+ (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
+ :receive receive-queue}
+ storm-conf (:user-context task-data))
+ (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
+
+ (.open spout-obj
+ storm-conf
+ (:user-context task-data)
+ (SpoutOutputCollector.
+ (reify ISpoutOutputCollector
+ (^long getPendingCount[this]
+ (.size pending))
+ (^List emit [this ^String stream-id ^List tuple ^Object message-id]
+ (send-spout-msg stream-id tuple message-id nil))
+ (^void emitDirect [this ^int out-task-id ^String stream-id
+ ^List tuple ^Object message-id]
+ (send-spout-msg stream-id tuple message-id out-task-id))
+ (reportError [this error]
+ (report-error error))))))
+
+ (reset! open-or-prepare-was-called? true)
+ (log-message "Opened spout " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
+
+ (fn []
+ ;; This design requires that spouts be non-blocking
- (disruptor/consume-batch receive-queue event-handler)
++ (.consumeBatch ^DisruptorQueue receive-queue event-handler)
+
+ (let [active? @(:storm-active-atom executor-data)
+ curr-count (.get emitted-count)
+ backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
+ throttle-on (and backpressure-enabled
+ @(:throttle-on (:worker executor-data)))
+ reached-max-spout-pending (and max-spout-pending
+ (>= (.size pending) max-spout-pending))]
+ (if active?
+ ; activated
+ (do
+ (when-not @last-active
+ (reset! last-active true)
+ (log-message "Activating spout " component-id ":" (keys task-datas))
+ (fast-list-iter [^ISpout spout spouts] (.activate spout)))
+
+ (if (and (not (.isFull transfer-queue))
+ (not throttle-on)
+ (not reached-max-spout-pending))
+ (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
+ ; deactivated
+ (do
+ (when @last-active
+ (reset! last-active false)
+ (log-message "Deactivating spout " component-id ":" (keys task-datas))
+ (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
+ ;; TODO: log that it's getting throttled
+ (Time/sleep 100)
+ (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data) (:stats executor-data))))
+
+ (if (and (= curr-count (.get emitted-count)) active?)
+ (do (.increment empty-emit-streak)
+ (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
+ ;; update the spout throttling metrics
+ (if throttle-on
+ (builtin-metrics/skipped-throttle! (:spout-throttling-metrics executor-data) (:stats executor-data))
+ (if reached-max-spout-pending
+ (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data) (:stats executor-data)))))
+ (.set empty-emit-streak 0)))
+ 0))]
+
+ [(Utils/asyncLoop
+ spout-transfer-fn
+ false ; isDaemon
+ (:report-error-and-die executor-data)
+ Thread/NORM_PRIORITY
+ true ; isFactory
+ true ; startImmediately
+ (str component-id "-executor" (:executor-id executor-data)))]))
(defn- tuple-time-delta! [^TupleImpl tuple]
(let [ms (.getProcessSampleStartTime tuple)]
@@@ -736,115 -743,116 +747,116 @@@
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)))))))
- has-eventloggers? (has-eventloggers? storm-conf)]
-
+ has-eventloggers? (has-eventloggers? storm-conf)
+ bolt-transfer-fn (fn []
+ ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
+ (while (not @(:storm-active-atom executor-data))
+ (Thread/sleep 100))
+
+ (log-message "Preparing bolt " component-id ":" (keys task-datas))
+ (doseq [[task-id task-data] task-datas
+ :let [^IBolt bolt-obj (:object task-data)
+ tasks-fn (:tasks-fn task-data)
+ user-context (:user-context task-data)
+ bolt-emit (fn [stream anchors values task]
+ (let [out-tasks (if task
+ (tasks-fn task stream values)
+ (tasks-fn stream values))]
+ (fast-list-iter [t out-tasks]
+ (let [anchors-to-ids (HashMap.)]
+ (fast-list-iter [^TupleImpl a anchors]
+ (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
+ (when (pos? (count root-ids))
+ (let [edge-id (MessageId/generateId rand)]
+ (.updateAckVal a edge-id)
+ (fast-list-iter [root-id root-ids]
+ (put-xor! anchors-to-ids root-id edge-id))))))
+ (let [tuple (TupleImpl. worker-context
+ values
+ task-id
+ stream
+ (MessageId/makeId anchors-to-ids))]
+ (transfer-fn t tuple))))
+ (if has-eventloggers?
+ (send-to-eventlogger executor-data task-data values component-id nil rand))
+ (or out-tasks [])))]]
+ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
+ (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
+ (if (= component-id Constants/SYSTEM_COMPONENT_ID)
+ (do
+ (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
+ :receive (:receive-queue executor-data)
+ :transfer (:transfer-queue (:worker executor-data))}
+ storm-conf user-context)
+ (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
+ (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context))
+ (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
+ :receive (:receive-queue executor-data)}
+ storm-conf user-context))
+
+ (.prepare bolt-obj
+ storm-conf
+ user-context
+ (OutputCollector.
+ (reify IOutputCollector
+ (emit [this stream anchors values]
+ (bolt-emit stream anchors values nil))
+ (emitDirect [this task stream anchors values]
+ (bolt-emit stream anchors values task))
+ (^void ack [this ^Tuple tuple]
+ (let [^TupleImpl tuple tuple
+ ack-val (.getAckVal tuple)]
+ (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
+ (task/send-unanchored task-data
+ ACKER-ACK-STREAM-ID
+ [root (bit-xor id ack-val)])))
+ (let [delta (tuple-time-delta! tuple)
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+ (when debug?
+ (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+ (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
+ (when delta
+ (stats/bolt-acked-tuple! executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta))))
+ (^void fail [this ^Tuple tuple]
+ (fast-list-iter [root (.. tuple getMessageId getAnchors)]
+ (task/send-unanchored task-data
+ ACKER-FAIL-STREAM-ID
+ [root]))
+ (let [delta (tuple-time-delta! tuple)
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+ (when debug?
+ (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+ (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
+ (when delta
+ (stats/bolt-failed-tuple! executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta))))
+ (reportError [this error]
+ (report-error error))))))
+
+ (reset! open-or-prepare-was-called? true)
+ (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
+
+ (let [receive-queue (:receive-queue executor-data)
+ event-handler (mk-task-receiver executor-data tuple-action-fn)]
+ (fn []
- (disruptor/consume-batch-when-available receive-queue event-handler)
++ (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
+ 0)))]
;; TODO: can get any SubscribedState objects out of the context now
- [(async-loop
- (fn []
- ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
- (while (not @(:storm-active-atom executor-data))
- (Thread/sleep 100))
-
- (log-message "Preparing bolt " component-id ":" (keys task-datas))
- (doseq [[task-id task-data] task-datas
- :let [^IBolt bolt-obj (:object task-data)
- tasks-fn (:tasks-fn task-data)
- user-context (:user-context task-data)
- bolt-emit (fn [stream anchors values task]
- (let [out-tasks (if task
- (tasks-fn task stream values)
- (tasks-fn stream values))]
- (fast-list-iter [t out-tasks]
- (let [anchors-to-ids (HashMap.)]
- (fast-list-iter [^TupleImpl a anchors]
- (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
- (when (pos? (count root-ids))
- (let [edge-id (MessageId/generateId rand)]
- (.updateAckVal a edge-id)
- (fast-list-iter [root-id root-ids]
- (put-xor! anchors-to-ids root-id edge-id))
- ))))
- (let [tuple (TupleImpl. worker-context
- values
- task-id
- stream
- (MessageId/makeId anchors-to-ids))]
- (transfer-fn t tuple))))
- (if has-eventloggers?
- (send-to-eventlogger executor-data task-data values component-id nil rand))
- (or out-tasks [])))]]
- (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
- (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
- (if (= component-id Constants/SYSTEM_COMPONENT_ID)
- (do
- (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
- :receive (:receive-queue executor-data)
- :transfer (:transfer-queue (:worker executor-data))}
- storm-conf user-context)
- (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
- (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context))
- (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
- :receive (:receive-queue executor-data)}
- storm-conf user-context)
- )
-
- (.prepare bolt-obj
- storm-conf
- user-context
- (OutputCollector.
- (reify IOutputCollector
- (emit [this stream anchors values]
- (bolt-emit stream anchors values nil))
- (emitDirect [this task stream anchors values]
- (bolt-emit stream anchors values task))
- (^void ack [this ^Tuple tuple]
- (let [^TupleImpl tuple tuple
- ack-val (.getAckVal tuple)]
- (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
- (task/send-unanchored task-data
- ACKER-ACK-STREAM-ID
- [root (bit-xor id ack-val)])))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
- (when debug?
- (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
- (when delta
- (stats/bolt-acked-tuple! executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta))))
- (^void fail [this ^Tuple tuple]
- (fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (task/send-unanchored task-data
- ACKER-FAIL-STREAM-ID
- [root]))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
- (when debug?
- (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
- (when delta
- (stats/bolt-failed-tuple! executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta))))
- (reportError [this error]
- (report-error error)
- )))))
- (reset! open-or-prepare-was-called? true)
- (log-message "Prepared bolt " component-id ":" (keys task-datas))
- (setup-metrics! executor-data)
-
- (let [receive-queue (:receive-queue executor-data)
- event-handler (mk-task-receiver executor-data tuple-action-fn)]
- (fn []
- (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
- 0)))
- :kill-fn (:report-error-and-die executor-data)
- :factory? true
- :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+ [(Utils/asyncLoop
+ bolt-transfer-fn
+ false ; isDaemon
+ (:report-error-and-die executor-data)
+ Thread/NORM_PRIORITY
+ true ; isFactory
+ true ; startImmediately
+ (str component-id "-executor" (:executor-id executor-data)))]))
(defmethod close-component :spout [executor-data spout]
(.close spout))
http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index bfece6a,b2bdcdb..1f530ac
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -19,13 -19,16 +19,16 @@@
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [org.apache.storm.daemon [executor :as executor]])
-- (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
++ (:require [org.apache.storm [cluster :as cluster]])
(:require [clojure.set :as set])
(:require [org.apache.storm.messaging.loader :as msg-loader])
(:import [java.util.concurrent Executors]
- [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
- (:import [java.util ArrayList HashMap])
- (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
+ [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
+ [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
++ (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time WorkerBackpressureCallback DisruptorBackpressureCallback])
+ (:import [java.util ArrayList HashMap]
+ [java.util.concurrent.locks ReentrantReadWriteLock])
+ (:import [org.apache.commons.io FileUtils])
- (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time])
(:import [org.apache.storm.grouping LoadMapping])
(:import [org.apache.storm.messaging TransportFactory])
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
@@@ -632,7 -658,7 +658,9 @@@
transfer-tuples (mk-transfer-tuples-handler worker)
-- transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)
++ transfer-thread (Utils/asyncLoop
++ (fn []
++ (.consumeBatchWhenAvailable ^DisruptorQueue (:transfer-queue worker) transfer-tuples) 0))
disruptor-handler (mk-disruptor-backpressure-handler worker)
_ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)
http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/disruptor.clj
index 78b16dc,e2211c0..0000000
deleted file mode 100644,100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ /dev/null
@@@ -1,36 -1,89 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements. See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership. The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License. You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--
--(ns org.apache.storm.disruptor
- (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
- (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback Utils])
-- (:import [com.lmax.disruptor.dsl ProducerType])
-- (:require [clojure [string :as str]])
-- (:require [clojure [set :as set]])
-- (:use [clojure walk])
-- (:use [org.apache.storm util log]))
-
-(def PRODUCER-TYPE
- {:multi-threaded ProducerType/MULTI
- :single-threaded ProducerType/SINGLE})
-
-(defnk disruptor-queue
- [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
- (DisruptorQueue. queue-name
- (PRODUCER-TYPE producer-type) buffer-size
- timeout batch-size batch-timeout))
-
-(defn clojure-handler
- [afn]
- (reify com.lmax.disruptor.EventHandler
- (onEvent
- [this o seq-id batchEnd?]
- (afn o seq-id batchEnd?))))
-
-(defn disruptor-backpressure-handler
- [afn-high-wm afn-low-wm]
- (reify DisruptorBackpressureCallback
- (highWaterMark
- [this]
- (afn-high-wm))
- (lowWaterMark
- [this]
- (afn-low-wm))))
-
-(defn worker-backpressure-handler
- [afn]
- (reify WorkerBackpressureCallback
- (onEvent
- [this o]
- (afn o))))
-
-(defmacro handler
- [& args]
- `(clojure-handler (fn ~@args)))
--
-(defn publish
- [^DisruptorQueue q o]
- (.publish q o))
--
-(defn consume-batch
- [^DisruptorQueue queue handler]
- (.consumeBatch queue handler))
--
-(defn consume-batch-when-available
- [^DisruptorQueue queue handler]
- (.consumeBatchWhenAvailable queue handler))
--
-(defn halt-with-interrupt!
- [^DisruptorQueue queue]
- (.haltWithInterrupt queue))
--
--(defnk consume-loop*
-- [^DisruptorQueue queue handler
- :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
- (async-loop
- (fn [] (.consumeBatchWhenAvailable ^DisruptorQueue queue handler) 0)
- :kill-fn kill-fn
- :thread-name (.getName queue)))
- :uncaught-exception-handler nil]
- (Utils/asyncLoop
- (fn [] (consume-batch-when-available queue handler) 0)
- (.getName queue)
- uncaught-exception-handler))
--
-(defmacro consume-loop [queue & handler-args]
- `(let [handler# (handler ~@handler-args)]
- (consume-loop* ~queue handler#)))