You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/08 08:17:39 UTC
[3/5] storm git commit: STORM-1277 port
backtype.storm.daemon.executor to java
STORM-1277 port backtype.storm.daemon.executor to java
* code rebased by Jungtaek Lim <ka...@gmail.com>
* Closes #1445
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5e19d9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5e19d9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5e19d9b
Branch: refs/heads/master
Commit: a5e19d9b8064f83adf00190ed74518e2156faae2
Parents: 44068c4
Author: \u536b\u4e50 <we...@taobao.com>
Authored: Fri Apr 29 17:58:50 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:13:48 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 18 +-
.../clj/org/apache/storm/daemon/executor.clj | 839 -------------------
.../org/apache/storm/daemon/local_executor.clj | 42 +
.../src/clj/org/apache/storm/daemon/worker.clj | 70 +-
storm-core/src/clj/org/apache/storm/testing.clj | 16 +-
.../src/jvm/org/apache/storm/Constants.java | 24 +-
.../org/apache/storm/daemon/StormCommon.java | 175 ++--
.../src/jvm/org/apache/storm/daemon/Task.java | 76 +-
.../jvm/org/apache/storm/executor/Executor.java | 576 +++++++++++++
.../apache/storm/executor/ExecutorShutdown.java | 111 +++
.../apache/storm/executor/ExecutorTransfer.java | 87 ++
.../apache/storm/executor/IRunningExecutor.java | 31 +
.../org/apache/storm/executor/TupleInfo.java | 90 ++
.../storm/executor/bolt/BoltExecutor.java | 138 +++
.../executor/bolt/BoltOutputCollectorImpl.java | 171 ++++
.../storm/executor/error/IReportError.java | 22 +
.../storm/executor/error/ReportError.java | 76 ++
.../storm/executor/error/ReportErrorAndDie.java | 47 ++
.../storm/executor/spout/SpoutExecutor.java | 255 ++++++
.../spout/SpoutOutputCollectorImpl.java | 147 ++++
.../apache/storm/stats/BoltExecutorStats.java | 1 +
.../jvm/org/apache/storm/stats/CommonStats.java | 5 +-
.../apache/storm/stats/SpoutExecutorStats.java | 1 +
.../org/apache/storm/task/TopologyContext.java | 32 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 29 +
.../storm/utils/WorkerBackpressureThread.java | 11 +-
.../org/apache/storm/integration_test.clj | 11 +-
.../test/clj/org/apache/storm/grouping_test.clj | 2 +-
28 files changed, 2077 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index cc5436c..01a49b3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -52,20 +52,4 @@
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(Utils/exitProcess 13 "Error on initialization")
- )))))
-
-(defn worker-context [worker]
- (WorkerTopologyContext. (:system-topology worker)
- (:storm-conf worker)
- (:task->component worker)
- (:component->sorted-tasks worker)
- (:component->stream->fields worker)
- (:storm-id worker)
- (ConfigUtils/supervisorStormResourcesPath
- (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
- (ConfigUtils/workerPidsRoot (:conf worker) (:worker-id worker))
- (:port worker)
- (:task-ids worker)
- (:default-shared-resources worker)
- (:user-shared-resources worker)
- ))
+ )))))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
deleted file mode 100644
index 1fdfbf5..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ /dev/null
@@ -1,839 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.executor
- (:use [org.apache.storm.daemon common])
- (:use [clojure.walk])
- (:import [org.apache.storm.generated Grouping Grouping$_Fields]
- [java.io Serializable]
- [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]
- [org.apache.storm.daemon.metrics BuiltinMetricsUtil SpoutThrottlingMetrics])
- (:use [org.apache.storm util config log])
- (:import [java.util List Random HashMap ArrayList LinkedList Map])
- (:import [org.apache.storm ICredentialsListener Thrift])
- (:import [org.apache.storm.hooks ITaskHook])
- (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId])
- (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector])
- (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
- (:import [org.apache.storm.grouping CustomStreamGrouping])
- (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
- (:import [org.apache.storm.generated GlobalStreamId])
- (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
- (:import [com.lmax.disruptor InsufficientCapacityException])
- (:import [org.apache.storm.serialization KryoTupleSerializer])
- (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory])
- (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
- (:import [org.apache.storm Config Constants])
- (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
- (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
- (:import [java.lang Thread Thread$UncaughtExceptionHandler]
- [java.util.concurrent ConcurrentLinkedQueue]
- [org.json.simple JSONValue]
- [com.lmax.disruptor.dsl ProducerType]
- [org.apache.storm StormTimer])
- (:require [clojure.set :as set]))
-
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn- outbound-groupings
- [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping topo-conf]
- (->> component->grouping
- (filter-key #(-> worker-context
- (.getComponentTasks %)
- count
- pos?))
- (map (fn [[component tgrouping]]
- [component
- (GrouperFactory/mkGrouper worker-context
- this-component-id
- stream-id
- out-fields
- tgrouping
- (.getComponentTasks worker-context component)
- topo-conf)]))
- (into {})
- (HashMap.)))
-
-(defn outbound-components
- "Returns map of stream id to component id to grouper"
- [^WorkerTopologyContext worker-context component-id topo-conf]
- (->> (.getTargets worker-context component-id)
- clojurify-structure
- (map (fn [[stream-id component->grouping]]
- [stream-id
- (outbound-groupings
- worker-context
- component-id
- stream-id
- (.getComponentOutputFields worker-context component-id stream-id)
- component->grouping
- topo-conf)]))
- (into (apply merge (map #(hash-map % nil) (.keySet (.get_streams (.getComponentCommon worker-context component-id))))))
- (HashMap.)))
-
-(defn executor-type [^WorkerTopologyContext context component-id]
- (let [topology (.getRawTopology context)
- spouts (.get_spouts topology)
- bolts (.get_bolts topology)]
- (cond (contains? spouts component-id) "spout"
- (contains? bolts component-id) "bolt"
- :else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology))))))
-
-(defn executor-selector [executor-data & _] (keyword (:type executor-data)))
-
-(defmulti mk-threads executor-selector)
-(defmulti mk-executor-stats executor-selector)
-(defmulti close-component executor-selector)
-
-(defn- normalized-component-conf [storm-conf general-context component-id]
- (let [to-remove (disj (set ALL-CONFIGS)
- TOPOLOGY-DEBUG
- TOPOLOGY-MAX-SPOUT-PENDING
- TOPOLOGY-MAX-TASK-PARALLELISM
- TOPOLOGY-TRANSACTIONAL-ID
- TOPOLOGY-TICK-TUPLE-FREQ-SECS
- TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
- TOPOLOGY-SPOUT-WAIT-STRATEGY
- TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
- TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
- TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
- TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
- TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
- TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
- TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
- TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
- TOPOLOGY-STATE-PROVIDER
- TOPOLOGY-STATE-PROVIDER-CONFIG
- )
- spec-conf (-> general-context
- (.getComponentCommon component-id)
- .get_json_conf
- (#(if % (JSONValue/parse %)))
- clojurify-structure)]
- (merge storm-conf (apply dissoc spec-conf to-remove))
- ))
-
-(defprotocol RunningExecutor
- (render-stats [this])
- (get-executor-id [this])
- (credentials-changed [this creds])
- (get-backpressure-flag [this]))
-
-(defn throttled-report-error-fn [executor]
- (let [storm-conf (:storm-conf executor)
- error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
- max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
- interval-start-time (atom (Time/currentTimeSecs))
- interval-errors (atom 0)
- ]
- (fn [error]
- (log-error error)
- (when (> (Time/deltaSecs @interval-start-time)
- error-interval-secs)
- (reset! interval-errors 0)
- (reset! interval-start-time (Time/currentTimeSecs)))
- (swap! interval-errors inc)
-
- (when (<= @interval-errors max-per-interval)
- (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
- (Utils/hostname storm-conf)
- (long (.getThisWorkerPort (:worker-context executor))) error)
- ))))
-
-;; in its own function so that it can be mocked out by tracked topologies
-(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
- (fn this
- [task tuple]
- (let [val (AddressedTuple. task tuple)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "TRANSFERING tuple " val))
- (.publish ^DisruptorQueue batch-transfer->worker val))))
-
-(defn mk-executor-data [worker executor-id]
- (let [worker-context (worker-context worker)
- task-ids (clojurify-structure (StormCommon/executorIdToTasks executor-id))
- component-id (.getComponentId worker-context (first task-ids))
- storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
- executor-type (executor-type worker-context component-id)
- batch-transfer->worker (DisruptorQueue.
- (str "executor" executor-id "-send-queue")
- ProducerType/SINGLE
- (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
- (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
- (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
- (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
- ]
- (recursive-map
- :worker worker
- :worker-context worker-context
- :executor-id executor-id
- :task-ids task-ids
- :component-id component-id
- :open-or-prepare-was-called? (atom false)
- :storm-conf storm-conf
- :receive-queue ((:executor-receive-queue-map worker) executor-id)
- :storm-id (:storm-id worker)
- :conf (:conf worker)
- :shared-executor-data (HashMap.)
- :storm-active-atom (:storm-active-atom worker)
- :storm-component->debug-atom (:storm-component->debug-atom worker)
- :batch-transfer-queue batch-transfer->worker
- :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
- :suicide-fn (:suicide-fn worker)
- :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
- (ClusterStateContext. DaemonType/WORKER))
- :type executor-type
- ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
- :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
- :interval->task->metric-registry (HashMap.)
- :task->component (:task->component worker)
- :stream->component->grouper (outbound-components worker-context component-id storm-conf)
- :report-error (throttled-report-error-fn <>)
- :report-error-and-die (reify
- Thread$UncaughtExceptionHandler
- (uncaughtException [this _ error]
- (try
- ((:report-error <>) error)
- (catch Exception e
- (log-error e "Error while reporting error to cluster, proceeding with shutdown")))
- (if (or
- (Utils/exceptionCauseIsInstanceOf InterruptedException error)
- (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
- (log-message "Got interrupted excpetion shutting thread down...")
- ((:suicide-fn <>)))))
- :sampler (ConfigUtils/mkStatsSampler storm-conf)
- :backpressure (atom false)
- :spout-throttling-metrics (if (= (keyword executor-type) :spout)
- (SpoutThrottlingMetrics.)
- nil)
- ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
- )))
-
-(defn- mk-disruptor-backpressure-handler [executor-data]
- "make a handler for the executor's receive disruptor queue to
- check highWaterMark and lowWaterMark for backpressure"
- (reify DisruptorBackpressureCallback
- (highWaterMark [this]
- "When receive queue is above highWaterMark"
- (if (not @(:backpressure executor-data))
- (do (reset! (:backpressure executor-data) true)
- (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
- (lowWaterMark [this]
- "When receive queue is below lowWaterMark"
- (if @(:backpressure executor-data)
- (do (reset! (:backpressure executor-data) false)
- (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
-
-(defn start-batch-transfer->worker-handler! [worker executor-data]
- (let [worker-transfer-fn (:transfer-fn worker)
- cached-emit (MutableObject. (ArrayList.))
- storm-conf (:storm-conf executor-data)
- serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
- ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
- handler (reify com.lmax.disruptor.EventHandler
- (onEvent [this o seq-id batch-end?]
- (let [^ArrayList alist (.getObject cached-emit)]
- (.add alist o)
- (when batch-end?
- (worker-transfer-fn serializer alist)
- (.setObject cached-emit (ArrayList.))))))
- ]
- (Utils/asyncLoop
- (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
- (.getName batch-transfer-queue)
- (:uncaught-exception-handler (:report-error-and-die executor-data)))))
-
-;; TODO: this is all expensive... should be precomputed
-(defn send-unanchored
- [^Task task-data stream values transfer-fn]
- (let [out-tuple (.getTuple task-data stream values)]
- (fast-list-iter [t (.getOutgoingTasks task-data stream values)]
- (transfer-fn t out-tuple))))
-
-(defn setup-metrics! [executor-data]
- (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
- distinct-time-bucket-intervals (keys interval->task->metric-registry)]
- (doseq [interval distinct-time-bucket-intervals]
- (.scheduleRecurring
- (:user-timer (:worker executor-data))
- interval
- interval
- (fn []
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val)))))))
-
-(defn metrics-tick
- [executor-data task-data ^TupleImpl tuple]
- (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
- interval (.getInteger tuple 0)
- transfer-fn (:transfer-fn executor-data)
- task-id (.getTaskId task-data)
- name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
- task-info (IMetricsConsumer$TaskInfo.
- (Utils/hostname (:storm-conf executor-data))
- (.getThisWorkerPort worker-context)
- (:component-id executor-data)
- task-id
- (long (Time/currentTimeSecs))
- interval)
- data-points (->> name->imetric
- (map (fn [[name imetric]]
- (let [value (.getValueAndReset ^IMetric imetric)]
- (if value
- (IMetricsConsumer$DataPoint. name value)))))
- (filter identity)
- (into []))]
- (when (seq data-points)
- (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn))))
-
-(defn setup-ticks! [worker executor-data]
- (let [storm-conf (:storm-conf executor-data)
- tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
- receive-queue (:receive-queue executor-data)
- context (:worker-context executor-data)]
- (when tick-time-secs
- (if (or (Utils/isSystemId (:component-id executor-data))
- (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
- (= :spout (keyword (:type executor-data)))))
- (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
- (.scheduleRecurring
- (:user-timer worker)
- tick-time-secs
- tick-time-secs
- (fn []
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val))))))))
-
-(defn mk-executor [worker executor-id initial-credentials]
- (let [executor-data (mk-executor-data worker executor-id)
- transfer-fn (:transfer-fn executor-data)
- _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
- task-datas (->> executor-data
- :task-ids
- (map (fn [t] (let [task (Task. (stringify-keys executor-data) t)
- stream StormCommon/SYSTEM_STREAM_ID
- values ["startup"]]
- ;; when this is called, the threads for the executor haven't been started yet,
- ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
- (send-unanchored task stream values transfer-fn)
- [t task]
- )))
- (into {})
- (HashMap.))
- _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
- report-error-and-die (:report-error-and-die executor-data)
- component-id (:component-id executor-data)
-
-
- disruptor-handler (mk-disruptor-backpressure-handler executor-data)
- _ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
- _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
- (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
- (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
-
- ;; starting the batch-transfer->worker ensures that anything publishing to that queue
- ;; doesn't block (because it's a single threaded queue and the caching/consumer started
- ;; trick isn't thread-safe)
- system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
- handlers (try
- (mk-threads executor-data task-datas initial-credentials)
- (catch Throwable t (.uncaughtException report-error-and-die nil t)))
- threads (concat handlers system-threads)]
- (setup-ticks! worker executor-data)
-
- (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
- ;; TODO: add method here to get rendered stats... have worker call that when heartbeating
- (reify
- RunningExecutor
- (render-stats [this]
- (.renderStats (:stats executor-data)))
- (get-executor-id [this]
- executor-id)
- (credentials-changed [this creds]
- (let [receive-queue (:receive-queue executor-data)
- context (:worker-context executor-data)
- val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val)))
- (get-backpressure-flag [this]
- @(:backpressure executor-data))
- Shutdownable
- (shutdown
- [this]
- (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
- (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data))
- (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data))
- (doseq [t threads]
- (.interrupt t)
- (.join t))
-
- (.cleanupStats (:stats executor-data))
- (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
- (doseq [hook (.getHooks user-context)]
- (.cleanup hook)))
- (.disconnect (:storm-cluster-state executor-data))
- (when @(:open-or-prepare-was-called? executor-data)
- (doseq [obj (map #(.getTaskObject %) (vals task-datas))]
- (close-component executor-data obj)))
- (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
- )))
-
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
- (let [^ISpout spout (.getTaskObject task-data)
- storm-conf (:storm-conf executor-data)
- task-id (.getTaskId task-data)]
- ;;TODO: need to throttle these when there's lots of failures
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
- (.fail spout msg-id)
- (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data))
- (when time-delta
- (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
- (let [storm-conf (:storm-conf executor-data)
- ^ISpout spout (.getTaskObject task-data)
- task-id (.getTaskId task-data)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "SPOUT Acking message " id " " msg-id))
- (.ack spout msg-id)
- (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data))
- (when time-delta
- (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn mk-task-receiver [executor-data tuple-action-fn]
- (let [task-ids (:task-ids executor-data)
- debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
- ]
- (reify com.lmax.disruptor.EventHandler
- (onEvent [this tuple-batch sequence-id end-of-batch?]
- (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
- (let [^TupleImpl tuple (.getTuple addressed-tuple)
- task-id (.getDest addressed-tuple)]
- (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple))
- (if (not= task-id AddressedTuple/BROADCAST_DEST)
- (tuple-action-fn task-id tuple)
- ;; null task ids are broadcast tuples
- (fast-list-iter [task-id task-ids]
- (tuple-action-fn task-id tuple)
- ))
- ))))))
-
-(defn executor-max-spout-pending [storm-conf num-tasks]
- (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
- (if p (* p num-tasks))))
-
-(defn init-spout-wait-strategy [storm-conf]
- (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) Utils/newInstance)]
- (.prepare ret storm-conf)
- ret
- ))
-
-;; Send sampled data to the eventlogger if the global or component level
-;; debug flag is set (via nimbus api).
-(defn send-to-eventlogger [executor-data task-data values component-id message-id random]
- (let [c->d @(:storm-component->debug-atom executor-data)
- options (get c->d component-id (get c->d (:storm-id executor-data)))
- spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
- ;; the thread's initialized random number generator is used to generate
- ;; uniformily distributed random numbers.
- (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
- (send-unanchored
- task-data
- StormCommon/EVENTLOGGER_STREAM_ID
- [component-id message-id (System/currentTimeMillis) values]
- (:transfer-fn executor-data)))))
-
-(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
- (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
- ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
- max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
- ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))
- last-active (atom false)
- spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas)))
- rand (Random. (Utils/secureRandomLong))
- ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))
-
- pending (RotatingMap.
- 2 ;; microoptimize for performance of .size method
- (reify RotatingMap$ExpiredCallback
- (expire [this id [task-id spout-id tuple-info start-time-ms]]
- (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
- (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
- ))))
- tuple-action-fn (fn [task-id ^TupleImpl tuple]
- (let [stream-id (.getSourceStreamId tuple)]
- (condp = stream-id
- Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
- Constants/CREDENTIALS_CHANGED_STREAM_ID
- (let [task-data (get task-datas task-id)
- spout-obj (.getTaskObject task-data)]
- (when (instance? ICredentialsListener spout-obj)
- (.setCredentials spout-obj (.getValue tuple 0))))
- Acker/ACKER_RESET_TIMEOUT_STREAM_ID
- (let [id (.getValue tuple 0)
- pending-for-id (.get pending id)]
- (when pending-for-id
- (.put pending id pending-for-id)))
- (let [id (.getValue tuple 0)
- time-delta-ms (.getValue tuple 1)
- [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
- (when spout-id
- (when-not (= stored-task-id task-id)
- (throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
- (let [time-delta (if start-time-ms time-delta-ms)]
- (condp = stream-id
- Acker/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta id)
- Acker/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
- )))
- ;; TODO: on failure, emit tuple to failure stream
- ))))
- receive-queue (:receive-queue executor-data)
- event-handler (mk-task-receiver executor-data tuple-action-fn)
- has-ackers? (StormCommon/hasAckers storm-conf)
- has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
- emitted-count (MutableLong. 0)
- empty-emit-streak (MutableLong. 0)
- spout-transfer-fn (fn []
- ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
- (while (not @(:storm-active-atom executor-data))
- (Thread/sleep 100))
- (log-message "Opening spout " component-id ":" (keys task-datas))
- (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas))))
- (doseq [[task-id task-data] task-datas
- :let [^ISpout spout-obj (.getTaskObject task-data)
- send-spout-msg (fn [out-stream-id values message-id out-task-id]
- (.increment emitted-count)
- (let [out-tasks (if out-task-id
- (.getOutgoingTasks task-data out-task-id out-stream-id values)
- (.getOutgoingTasks task-data out-stream-id values))
- rooted? (and message-id has-ackers?)
- root-id (if rooted? (MessageId/generateId rand))
- ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
- (fast-list-iter [out-task out-tasks id out-ids]
- (let [tuple-id (if rooted?
- (MessageId/makeRootId root-id id)
- (MessageId/makeUnanchored))
- out-tuple (TupleImpl. worker-context
- values
- task-id
- out-stream-id
- tuple-id)]
- (transfer-fn out-task out-tuple)))
- (if has-eventloggers?
- (send-to-eventlogger executor-data task-data values component-id message-id rand))
- (if (and rooted?
- (not (.isEmpty out-ids)))
- (do
- (.put pending root-id [task-id
- message-id
- {:stream out-stream-id
- :values (if debug? values nil)}
- (if (.call ^Callable sampler) (System/currentTimeMillis))])
- (send-unanchored task-data
- Acker/ACKER_INIT_STREAM_ID
- [root-id (Utils/bitXorVals out-ids) task-id]
- (:transfer-fn executor-data)))
- (when message-id
- (ack-spout-msg executor-data task-data message-id
- {:stream out-stream-id :values values}
- (if (.call ^Callable sampler) 0) "0:")))
- (or out-tasks [])))]]
-
- (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data))
- (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
- "receive" receive-queue}
- storm-conf (.getUserContext task-data))
-
- (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
-
- (.open spout-obj
- storm-conf
- (.getUserContext task-data)
- (SpoutOutputCollector.
- (reify ISpoutOutputCollector
- (^long getPendingCount[this]
- (.size pending))
- (^List emit [this ^String stream-id ^List tuple ^Object message-id]
- (send-spout-msg stream-id tuple message-id nil))
- (^void emitDirect [this ^int out-task-id ^String stream-id
- ^List tuple ^Object message-id]
- (send-spout-msg stream-id tuple message-id out-task-id))
- (reportError [this error]
- (report-error error))))))
-
- (reset! open-or-prepare-was-called? true)
- (log-message "Opened spout " component-id ":" (keys task-datas))
- (setup-metrics! executor-data)
-
- (fn []
- ;; This design requires that spouts be non-blocking
- (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-
- (let [active? @(:storm-active-atom executor-data)
- curr-count (.get emitted-count)
- backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
- throttle-on (and backpressure-enabled
- @(:throttle-on (:worker executor-data)))
- reached-max-spout-pending (and max-spout-pending
- (>= (.size pending) max-spout-pending))]
- (if active?
- ; activated
- (do
- (when-not @last-active
- (reset! last-active true)
- (log-message "Activating spout " component-id ":" (keys task-datas))
- (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-
- (if (and (not (.isFull transfer-queue))
- (not throttle-on)
- (not reached-max-spout-pending))
- (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
- ; deactivated
- (do
- (when @last-active
- (reset! last-active false)
- (log-message "Deactivating spout " component-id ":" (keys task-datas))
- (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
- ;; TODO: log that it's getting throttled
- (Time/sleep 100)
- (.skippedInactive (:spout-throttling-metrics executor-data) (:stats executor-data))))
-
- (if (and (= curr-count (.get emitted-count)) active?)
- (do (.increment empty-emit-streak)
- (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
- ;; update the spout throttling metrics
- (if throttle-on
- (.skippedThrottle (:spout-throttling-metrics executor-data) (:stats executor-data))
- (if reached-max-spout-pending
- (.skippedMaxSpout (:spout-throttling-metrics executor-data) (:stats executor-data)))))
- (.set empty-emit-streak 0)))
- 0))]
-
- [(Utils/asyncLoop
- spout-transfer-fn
- false ; isDaemon
- (:report-error-and-die executor-data)
- Thread/NORM_PRIORITY
- true ; isFactory
- true ; startImmediately
- (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defn- tuple-time-delta! [^TupleImpl tuple]
- (let [ms (.getProcessSampleStartTime tuple)]
- (if ms
- (Time/deltaMs ms))))
-
-(defn- tuple-execute-time-delta! [^TupleImpl tuple]
- (let [ms (.getExecuteSampleStartTime tuple)]
- (if ms
- (Time/deltaMs ms))))
-
-(defn put-xor! [^Map pending key id]
- (let [curr (or (.get pending key) (long 0))]
- (.put pending key (bit-xor curr id))))
-
-(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
- (let [storm-conf (:storm-conf executor-data)
- execute-sampler (ConfigUtils/mkStatsSampler storm-conf)
- executor-stats (:stats executor-data)
- {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
- open-or-prepare-was-called?]} executor-data
- rand (Random. (Utils/secureRandomLong))
-
- tuple-action-fn (fn [task-id ^TupleImpl tuple]
- ;; synchronization needs to be done with a key provided by this bolt, otherwise:
- ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
- ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
- ;; buffer other tuples until fully synchronized, then process all of those tuples
- ;; then go into normal loop
- ;; spill to disk?
- ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
- ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
- ;; or just timeout the sync messages that are coming in until full sync is hit from that task
- ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
- ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
- ;; TODO: how to handle incremental updates as well as synchronizations at same time
- ;; TODO: need to version tuples somehow
-
- ;;(log-debug "Received tuple " tuple " at task " task-id)
- ;; need to do it this way to avoid reflection
- (let [stream-id (.getSourceStreamId tuple)]
- (condp = stream-id
- Constants/CREDENTIALS_CHANGED_STREAM_ID
- (let [^Task task-data (get task-datas task-id)
- bolt-obj (.getTaskObject task-data)]
- (when (instance? ICredentialsListener bolt-obj)
- (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0))))
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
- (let [^Task task-data (get task-datas task-id)
- ^IBolt bolt-obj (.getTaskObject task-data)
- user-context (.getUserContext task-data)
- sampler? (.call ^Callable sampler)
- execute-sampler? (.call ^Callable execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
- receive-queue (:receive-queue executor-data)]
- (when sampler?
- (.setProcessSampleStartTime tuple now))
- (when execute-sampler?
- (.setExecuteSampleStartTime tuple now))
- (.execute bolt-obj tuple)
- (let [delta (tuple-execute-time-delta! tuple)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
-
- (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context)
- (when delta
- (.boltExecuteTuple executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)))))))
- has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
- bolt-transfer-fn (fn []
- ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
- (while (not @(:storm-active-atom executor-data))
- (Thread/sleep 100))
-
- (log-message "Preparing bolt " component-id ":" (keys task-datas))
- (doseq [[task-id task-data] task-datas
- :let [^IBolt bolt-obj (.getTaskObject task-data)
- user-context (.getUserContext task-data)
- transfer-fn (:transfer-fn executor-data)
- bolt-emit (fn [stream anchors values task]
- (let [out-tasks (if task
- (.getOutgoingTasks task-data task stream values)
- (.getOutgoingTasks task-data stream values))]
- (fast-list-iter [t out-tasks]
- (let [anchors-to-ids (HashMap.)]
- (fast-list-iter [^TupleImpl a anchors]
- (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
- (when (pos? (count root-ids))
- (let [edge-id (MessageId/generateId rand)]
- (.updateAckVal a edge-id)
- (fast-list-iter [root-id root-ids]
- (put-xor! anchors-to-ids root-id edge-id))))))
- (let [tuple (TupleImpl. worker-context
- values
- task-id
- stream
- (MessageId/makeId anchors-to-ids))]
- (transfer-fn t tuple))))
- (if has-eventloggers?
- (send-to-eventlogger executor-data task-data values component-id nil rand))
- (or out-tasks [])))]]
- (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context)
- (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
- (if (= component-id Constants/SYSTEM_COMPONENT_ID)
- (do
- (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
- "receive" (:receive-queue executor-data)
- "transfer" (:transfer-queue (:worker executor-data))}
- storm-conf user-context)
- (BuiltinMetricsUtil/registerIconnectionClientMetrics
- (.deref (:cached-node+port->socket (:worker executor-data))) storm-conf user-context)
- (BuiltinMetricsUtil/registerIconnectionServerMetric (:receiver (:worker executor-data)) storm-conf user-context))
- (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
- "receive" (:receive-queue executor-data)}
- storm-conf user-context))
-
- (.prepare bolt-obj
- storm-conf
- user-context
- (OutputCollector.
- (reify IOutputCollector
- (emit [this stream anchors values]
- (bolt-emit stream anchors values nil))
- (emitDirect [this task stream anchors values]
- (bolt-emit stream anchors values task))
- (^void ack [this ^Tuple tuple]
- (let [^TupleImpl tuple tuple
- ack-val (.getAckVal tuple)]
- (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
- (send-unanchored task-data
- Acker/ACKER_ACK_STREAM_ID
- [root (bit-xor id ack-val)]
- transfer-fn)))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
- (when debug?
- (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (.applyOn (BoltAckInfo. tuple task-id delta) user-context)
- (when delta
- (.boltAckedTuple executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta))))
- (^void fail [this ^Tuple tuple]
- (fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (send-unanchored task-data
- Acker/ACKER_FAIL_STREAM_ID
- [root]
- transfer-fn))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
- (when debug?
- (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (.applyOn (BoltFailInfo. tuple task-id delta) user-context)
- (when delta
- (.boltFailedTuple executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta))))
- (^void resetTimeout [this ^Tuple tuple]
- (fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (send-unanchored task-data
- Acker/ACKER_RESET_TIMEOUT_STREAM_ID
- [root]
- transfer-fn)))
- (reportError [this error]
- (report-error error))))))
- (reset! open-or-prepare-was-called? true)
- (log-message "Prepared bolt " component-id ":" (keys task-datas))
- (setup-metrics! executor-data)
-
- (let [receive-queue (:receive-queue executor-data)
- event-handler (mk-task-receiver executor-data tuple-action-fn)]
- (fn []
- (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
- 0)))]
- ;; TODO: can get any SubscribedState objects out of the context now
-
- [(Utils/asyncLoop
- bolt-transfer-fn
- false ; isDaemon
- (:report-error-and-die executor-data)
- Thread/NORM_PRIORITY
- true ; isFactory
- true ; startImmediately
- (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defmethod close-component :spout [executor-data spout]
- (.close spout))
-
-(defmethod close-component :bolt [executor-data bolt]
- (.cleanup bolt))
-
-;; TODO: refactor this to be part of an executor-specific map
-(defmethod mk-executor-stats :spout [_ rate]
- (SpoutExecutorStats. rate))
-
-(defmethod mk-executor-stats :bolt [_ rate]
- (BoltExecutorStats. rate))
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
new file mode 100644
index 0000000..1e46e37
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
@@ -0,0 +1,42 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-executor
+ (:use [org.apache.storm util config log])
+ (:import [org.apache.storm.tuple AddressedTuple]
+ [org.apache.storm.executor Executor ExecutorTransfer])
+ (:import [org.apache.storm.utils DisruptorQueue])
+ (:import [org.apache.storm Config Constants]))
+
+(defn local-transfer-executor-tuple []
+ (fn [task tuple batch-transfer->worker]
+ (let [val (AddressedTuple. task tuple)]
+ (.publish ^DisruptorQueue batch-transfer->worker val))))
+
+(defn mk-local-executor-transfer [worker-topology-context batch-queue storm-conf transfer-fn]
+ (proxy [ExecutorTransfer] [worker-topology-context batch-queue storm-conf transfer-fn]
+ (transfer [task tuple]
+ (let [batch-transfer->worker (.getBatchTransferQueue this)]
+ ((local-transfer-executor-tuple) task tuple batch-transfer->worker)))))
+
+(defn mk-local-executor [workerData executorId credentials]
+ (let [executor (Executor/mkExecutor workerData executorId credentials)
+ worker-topology-context (.getWorkerTopologyContext executor)
+ batch-transfer-queue (.getTransferWorkerQueue executor)
+ storm-conf (.getStormConf executor)
+ transfer-fn (.getTransferFn executor)
+ local-executor-transfer (mk-local-executor-transfer worker-topology-context batch-transfer-queue storm-conf transfer-fn)]
+ (.setLocalExecutorTransfer executor local-executor-transfer)
+ (.execute executor)))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index dddce68..781558c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -18,11 +18,13 @@
(:use [org.apache.storm config log util converter local-state-converter])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
- (:require [org.apache.storm.daemon [executor :as executor]])
(:require [clojure.set :as set])
+ (:require [org.apache.storm.daemon
+ [local-executor :as local-executor]])
(:import [java.io File]
- [org.apache.storm.stats StatsUtil])
+ [org.apache.storm.stats StatsUtil]
+ [java.util.concurrent.atomic AtomicBoolean AtomicReference])
(:import [java.util.concurrent Executors]
[org.apache.storm.hooks IWorkerHook BaseWorkerHook]
[uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
@@ -48,10 +50,9 @@
(:import [org.apache.logging.log4j.core.config LoggerConfig])
(:import [org.apache.storm.generated LogConfig LogLevelAction])
(:import [org.apache.storm StormTimer])
+ (:import [org.apache.storm.executor Executor])
(:gen-class))
-(defmulti mk-suicide-fn cluster-mode)
-
(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
(log-message "Reading Assignments.")
(let [assignment (:executor->node+port (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)))]
@@ -69,7 +70,7 @@
(let [stats (if-not executors
(StatsUtil/mkEmptyExecutorZkHbs (:executors worker))
(StatsUtil/convertExecutorZkHbs (->> executors
- (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
+ (map (fn [e] {(.getExecutorId e) (.renderStats e)}))
(apply merge))))
zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime worker) upTime))]
;; do the zookeeper heartbeat
@@ -96,7 +97,7 @@
(defn worker-outbound-tasks
"Returns seq of task-ids that receive messages from this worker"
[worker]
- (let [context (worker-context worker)
+ (let [context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
components (mapcat
(fn [task-id]
(->> (.getComponentId context (int task-id))
@@ -145,11 +146,11 @@
assignment-id (:assignment-id worker)
port (:port worker)
storm-cluster-state (:storm-cluster-state worker)
- prev-backpressure-flag @(:backpressure worker)
+ prev-backpressure-flag (.get (:backpressure worker))
;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
curr-backpressure-flag (if executors
(or (.getThrottleOn (:transfer-queue worker))
- (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))
+ (reduce #(or %1 %2) (map #(.getBackPressureFlag %1) executors)))
prev-backpressure-flag)]
;; update the worker's backpressure flag to zookeeper only when it has changed
(when (not= prev-backpressure-flag curr-backpressure-flag)
@@ -157,7 +158,7 @@
(log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag)
(.workerBackpressure storm-cluster-state storm-id assignment-id (long port) curr-backpressure-flag)
;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
- (reset! (:backpressure worker) curr-backpressure-flag)
+ (.set (:backpressure worker) curr-backpressure-flag)
(catch Exception exc
(log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
))))
@@ -276,7 +277,7 @@
mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf))]
-
+ ;; TODO: when translating this function, use constants defined in Constants.java
(recursive-map
:conf conf
:mq-context mq-context
@@ -290,9 +291,9 @@
;; when worker bootup, worker will start to setup initial connections to
;; other workers. When all connection is ready, we will enable this flag
;; and spout and bolt will be activated.
- :worker-active-flag (atom false)
- :storm-active-atom (atom false)
- :storm-component->debug-atom (atom {})
+ :worker-active-flag (atom false) ;; used in worker only, keep it as atom
+ :storm-active-atom (AtomicBoolean. false)
+ :storm-component->debug-atom (AtomicReference.)
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
@@ -321,7 +322,7 @@
(mapcat (fn [e] (for [t (executor->tasks e)] [t (first e)])))
(into {})
(HashMap.))
- :suicide-fn (mk-suicide-fn conf)
+ :suicide-fn (Utils/mkSuicideFn)
:uptime (Utils/makeUptimeComputer)
:default-shared-resources (mk-default-resources <>)
:user-shared-resources (mk-user-resources <>)
@@ -329,10 +330,10 @@
:transfer-fn (mk-transfer-fn <>)
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
- :backpressure (atom false) ;; whether this worker is going slow
- :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
- :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
- :throttle-on (atom false) ;; whether throttle is activated for spouts
+ :backpressure (AtomicBoolean. false) ;; whether this worker is going slow
+ :transfer-backpressure (AtomicBoolean. false) ;; if the transfer queue is backed-up
+ :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
+ :throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
)))
(defn- endpoint->string [[node port]]
@@ -422,6 +423,7 @@
current-connections (set (keys @(:cached-node+port->socket worker)))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
+
(swap! (:cached-node+port->socket worker)
#(HashMap. (merge (into {} %1) %2))
(into {}
@@ -455,11 +457,9 @@
(:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
([worker callback]
(let [base (clojurify-storm-base (.stormBase (:storm-cluster-state worker) (:storm-id worker) callback))]
- (reset!
- (:storm-active-atom worker)
- (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
- (reset! (:storm-component->debug-atom worker) (-> base :component->debug))
- (log-debug "Event debug options " @(:storm-component->debug-atom worker)))))
+ (.set (:storm-active-atom worker) (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
+ (.set (:storm-component->debug-atom worker) (map-val thriftify-debugoptions (-> base :component->debug)))
+ (log-debug "Event debug options " (.get (:storm-component->debug-atom worker))))))
;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
(defn mk-transfer-tuples-handler [worker]
@@ -514,7 +514,7 @@
^IConnection socket (:receiver worker)]
(log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
(.registerRecv socket (DeserializingConnectionCallback. (:storm-conf worker)
- (worker-context worker)
+ (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
transfer-local-fn))))
(defn- close-resources [worker]
@@ -605,7 +605,7 @@
(defn run-worker-start-hooks [worker]
(let [topology (:topology worker)
topo-conf (:storm-conf worker)
- worker-topology-context (worker-context worker)
+ worker-topology-context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
hooks (.get_worker_hooks topology)]
(dofor [hook hooks]
(let [hook-bytes (Utils/toByteArray hook)
@@ -680,7 +680,9 @@
_ (run-worker-start-hooks worker)
- _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
+ _ (if (ConfigUtils/isLocalMode storm-conf)
+ (reset! executors (dofor [e (:executors worker)] (local-executor/mk-local-executor worker e initial-credentials)))
+ (reset! executors (dofor [e (:executors worker)] (.execute (Executor/mkExecutor worker e initial-credentials)))))
transfer-tuples (mk-transfer-tuples-handler worker)
@@ -699,7 +701,7 @@
(.start backpressure-thread))
callback (fn cb []
(let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)]
- (reset! (:throttle-on worker) throttle-on)))
+ (.set (:throttle-on worker) throttle-on)))
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.topologyBackpressure storm-cluster-state storm-id callback))
@@ -766,14 +768,14 @@
(let [new-creds (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id nil))]
(when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
(AuthUtils/updateSubject subject auto-creds new-creds)
- (dofor [e @executors] (.credentials-changed e new-creds))
+ (dofor [e @executors] (.credenetialsChanged e new-creds))
(reset! credentials new-creds))))
check-throttle-changed (fn []
(let [callback (fn cb []
(let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
- (reset! (:throttle-on worker) throttle-on)))
+ (.set (:throttle-on worker) throttle-on)))
new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)]
- (reset! (:throttle-on worker) new-throttle-on)))
+ (.set (:throttle-on worker) new-throttle-on)))
check-log-config-changed (fn []
(let [log-config (.topologyLogConfig (:storm-cluster-state worker) storm-id nil)]
(process-log-config-change latest-log-config original-log-levels log-config)
@@ -809,14 +811,6 @@
ret
))))))
-(defmethod mk-suicide-fn
- :local [conf]
- (fn [] (Utils/exitProcess 1 "Worker died")))
-
-(defmethod mk-suicide-fn
- :distributed [conf]
- (fn [] (Utils/exitProcess 1 "Worker died")))
-
(defn -main [storm-id assignment-id port-str worker-id]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
(Utils/setupDefaultUncaughtExceptionHandler)
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index d67b48d..dc676d6 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -17,15 +17,17 @@
(ns org.apache.storm.testing
(:require [org.apache.storm.daemon
[nimbus :as nimbus]
+ [local-executor :as local-executor]
[local-supervisor :as local-supervisor]
[common :as common]
- [worker :as worker]
- [executor :as executor]])
+ [worker :as worker]])
(:import [org.apache.commons.io FileUtils]
[org.apache.storm.utils]
[org.apache.storm.zookeeper Zookeeper]
[org.apache.storm ProcessSimulator]
- [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager])
+ [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]
+ [org.apache.storm.executor Executor]
+ [java.util.concurrent.atomic AtomicBoolean])
(:import [java.io File])
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
@@ -697,12 +699,12 @@
;; of tuple emission (and not on a separate thread later) for
;; topologies to be tracked correctly. This is because "transferred" *must*
;; be incremented before "processing".
- executor/mk-executor-transfer-fn
- (let [old# executor/mk-executor-transfer-fn]
+ local-executor/local-transfer-executor-tuple
+ (let [old# local-executor/local-transfer-executor-tuple]
(fn [& args#]
(let [transferrer# (apply old# args#)]
(fn [& args2#]
- ;; (log-message "Transferring: " transfer-args#)
+ ;; (log-message "Transferring: " args2#)
(increment-global! id# "transferred" 1)
(apply transferrer# args2#)))))]
(with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
@@ -759,7 +761,7 @@
{}
(HashMap.)
(HashMap.)
- (atom false))]
+ (AtomicBoolean. false))]
(TupleImpl. context values 1 stream)))
(defmacro with-timeout
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Constants.java b/storm-core/src/jvm/org/apache/storm/Constants.java
index b2c642e..9436db1 100644
--- a/storm-core/src/jvm/org/apache/storm/Constants.java
+++ b/storm-core/src/jvm/org/apache/storm/Constants.java
@@ -22,7 +22,7 @@ import clojure.lang.RT;
public class Constants {
- public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
+ public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
public static final long SYSTEM_TASK_ID = -1;
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
@@ -32,5 +32,27 @@ public class Constants {
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
+
+ public static final Object TOPOLOGY = "topology";
+ public static final String SYSTEM_TOPOLOGY = "system-topology";
+ public static final String STORM_CONF = "storm-conf";
+ public static final String STORM_ID = "storm-id";
+ public static final String WORKER_ID = "worker-id";
+ public static final String CONF = "conf";
+ public static final String PORT = "port";
+ public static final String TASK_TO_COMPONENT = "task->component";
+ public static final String COMPONENT_TO_SORTED_TASKS = "component->sorted-tasks";
+ public static final String COMPONENT_TO_STREAM_TO_FIELDS = "component->stream->fields";
+ public static final String TASK_IDS = "task-ids";
+ public static final String DEFAULT_SHARED_RESOURCES = "default-shared-resources";
+ public static final String USER_SHARED_RESOURCES = "user-shared-resources";
+ public static final String USER_TIMER = "user-timer";
+ public static final String TRANSFER_FN = "transfer-fn";
+ public static final String SUICIDE_FN = "suicide-fn";
+ public static final String THROTTLE_ON = "throttle-on";
+ public static final String EXECUTOR_RECEIVE_QUEUE_MAP = "executor-receive-queue-map";
+ public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
+ public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";
+ public static final Object LOAD_MAPPING = "load-mapping";
}