You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:54:54 UTC
[1/3] storm git commit: STORM-1271: Port backtype.storm.daemon.task
to java
Repository: storm
Updated Branches:
refs/heads/master 223b615d1 -> 084492782
STORM-1271: Port backtype.storm.daemon.task to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d63cb33
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d63cb33
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d63cb33
Branch: refs/heads/master
Commit: 7d63cb33a99940e11663257b515186e40d4f239e
Parents: 4264bfc
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Mar 25 12:19:12 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Wed Mar 30 21:43:54 2016 +0530
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/config.clj | 27 --
.../clj/org/apache/storm/daemon/executor.clj | 222 +++++++----------
.../src/clj/org/apache/storm/daemon/task.clj | 190 --------------
.../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++++++++++++
.../src/jvm/org/apache/storm/daemon/Task.java | 247 +++++++++++++++++++
.../daemon/metrics/BuiltinMetricsUtil.java | 8 +-
.../apache/storm/hooks/info/BoltAckInfo.java | 8 +
.../storm/hooks/info/BoltExecuteInfo.java | 8 +
.../apache/storm/hooks/info/BoltFailInfo.java | 8 +
.../org/apache/storm/hooks/info/EmitInfo.java | 9 +
.../apache/storm/hooks/info/SpoutAckInfo.java | 9 +
.../apache/storm/hooks/info/SpoutFailInfo.java | 9 +
.../jvm/org/apache/storm/utils/ConfigUtils.java | 35 ++-
.../test/clj/org/apache/storm/grouping_test.clj | 19 +-
14 files changed, 675 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index e50f023..d0c4d87 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -42,30 +42,3 @@
(defn cluster-mode
[conf & args]
(keyword (conf STORM-CLUSTER-MODE)))
-
-(defn sampling-rate
- [conf]
- (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
- (/ 1)
- int))
-
-(defn- even-sampler
- [freq]
- (let [freq (int freq)
- start (int 0)
- r (java.util.Random.)
- curr (MutableInt. -1)
- target (MutableInt. (.nextInt r freq))]
- (with-meta
- (fn []
- (let [i (.increment curr)]
- (when (>= i freq)
- (.set curr start)
- (.set target (.nextInt r freq))))
- (= (.get curr) (.get target)))
- {:rate freq})))
-
-;; TODO this function together with sampling-rate are to be replaced with Java version when util.clj is in
-(defn mk-stats-sampler
- [conf]
- (even-sampler (sampling-rate conf)))
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index e759b1d..8a77a61 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -15,6 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.daemon.executor
(:use [org.apache.storm.daemon common])
+ (:use [clojure.walk])
(:import [org.apache.storm.generated Grouping Grouping$_Fields]
[java.io Serializable]
[org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]
@@ -33,7 +34,7 @@
(:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
(:import [com.lmax.disruptor InsufficientCapacityException])
(:import [org.apache.storm.serialization KryoTupleSerializer])
- (:import [org.apache.storm.daemon Shutdownable StormCommon Acker])
+ (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory])
(:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
@@ -43,74 +44,8 @@
[org.json.simple JSONValue]
[com.lmax.disruptor.dsl ProducerType]
[org.apache.storm StormTimer])
- (:require [org.apache.storm.daemon [task :as task]])
(:require [clojure.set :as set]))
-(defn- mk-fields-grouper
- [^Fields out-fields ^Fields group-fields ^List target-tasks]
- (let [num-tasks (count target-tasks)
- task-getter (fn [i] (.get target-tasks i))]
- (fn [task-id ^List values load]
- (-> (.select out-fields group-fields values)
- (TupleUtils/listHashCode)
- (mod num-tasks)
- task-getter))))
-
-(defn- mk-custom-grouper
- [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks]
- (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks)
- (if (instance? LoadAwareCustomStreamGrouping grouping)
- (fn [task-id ^List values load]
- (.chooseTasks ^LoadAwareCustomStreamGrouping grouping task-id values load))
- (fn [task-id ^List values load]
- (.chooseTasks grouping task-id values))))
-
-(defn mk-shuffle-grouper
- [^List target-tasks topo-conf ^WorkerTopologyContext context ^String component-id ^String stream-id]
- (if (.get topo-conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
- (mk-custom-grouper (ShuffleGrouping.) context component-id stream-id target-tasks)
- (mk-custom-grouper (LoadAwareShuffleGrouping.) context component-id stream-id target-tasks)))
-
-(defn- mk-grouper
- "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
- [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks topo-conf]
- (let [num-tasks (count target-tasks)
- random (Random.)
- target-tasks (vec (sort target-tasks))]
- (condp = (Thrift/groupingType thrift-grouping)
- Grouping$_Fields/FIELDS
- (if (Thrift/isGlobalGrouping thrift-grouping)
- (fn [task-id tuple load]
- ;; It's possible for target to have multiple tasks if it reads multiple sources
- (first target-tasks))
- (let [group-fields (Fields. (Thrift/fieldGrouping thrift-grouping))]
- (mk-fields-grouper out-fields group-fields target-tasks)
- ))
- Grouping$_Fields/ALL
- (fn [task-id tuple load] target-tasks)
- Grouping$_Fields/SHUFFLE
- (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id)
- Grouping$_Fields/LOCAL_OR_SHUFFLE
- (let [same-tasks (set/intersection
- (set target-tasks)
- (set (.getThisWorkerTasks context)))]
- (if-not (empty? same-tasks)
- (mk-shuffle-grouper (vec same-tasks) topo-conf context component-id stream-id)
- (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id)))
- Grouping$_Fields/NONE
- (fn [task-id tuple load]
- (let [i (mod (.nextInt random) num-tasks)]
- (get target-tasks i)
- ))
- Grouping$_Fields/CUSTOM_OBJECT
- (let [grouping (Thrift/instantiateJavaObject (.get_custom_object thrift-grouping))]
- (mk-custom-grouper grouping context component-id stream-id target-tasks))
- Grouping$_Fields/CUSTOM_SERIALIZED
- (let [grouping (Utils/javaDeserialize (.get_custom_serialized thrift-grouping) Serializable)]
- (mk-custom-grouper grouping context component-id stream-id target-tasks))
- Grouping$_Fields/DIRECT
- :direct
- )))
;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
(defn- outbound-groupings
@@ -122,13 +57,13 @@
pos?))
(map (fn [[component tgrouping]]
[component
- (mk-grouper worker-context
- this-component-id
- stream-id
- out-fields
- tgrouping
- (.getComponentTasks worker-context component)
- topo-conf)]))
+ (GrouperFactory/mkGrouper worker-context
+ this-component-id
+ stream-id
+ out-fields
+ tgrouping
+ (.getComponentTasks worker-context component)
+ topo-conf)]))
(into {})
(HashMap.)))
@@ -153,11 +88,11 @@
(let [topology (.getRawTopology context)
spouts (.get_spouts topology)
bolts (.get_bolts topology)]
- (cond (contains? spouts component-id) :spout
- (contains? bolts component-id) :bolt
+ (cond (contains? spouts component-id) "spout"
+ (contains? bolts component-id) "bolt"
:else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology))))))
-(defn executor-selector [executor-data & _] (:type executor-data))
+(defn executor-selector [executor-data & _] (keyword (:type executor-data)))
(defmulti mk-threads executor-selector)
(defmulti mk-executor-stats executor-selector)
@@ -275,9 +210,9 @@
(Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>)))))
- :sampler (mk-stats-sampler storm-conf)
+ :sampler (ConfigUtils/mkStatsSampler storm-conf)
:backpressure (atom false)
- :spout-throttling-metrics (if (= executor-type :spout)
+ :spout-throttling-metrics (if (= (keyword executor-type) :spout)
(SpoutThrottlingMetrics.)
nil)
;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
@@ -319,6 +254,13 @@
(.getName batch-transfer-queue)
(:uncaught-exception-handler (:report-error-and-die executor-data)))))
+;; TODO: this is all expensive... should be precomputed
+(defn send-unanchored
+ [^Task task-data stream values transfer-fn]
+ (let [out-tuple (.getTuple task-data stream values)]
+ (fast-list-iter [t (.getOutgoingTasks task-data stream values)]
+ (transfer-fn t out-tuple))))
+
(defn setup-metrics! [executor-data]
(let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
distinct-time-bucket-intervals (keys interval->task->metric-registry)]
@@ -335,7 +277,8 @@
[executor-data task-data ^TupleImpl tuple]
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
interval (.getInteger tuple 0)
- task-id (:task-id task-data)
+ transfer-fn (:transfer-fn executor-data)
+ task-id (.getTaskId task-data)
name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
task-info (IMetricsConsumer$TaskInfo.
(Utils/hostname (:storm-conf executor-data))
@@ -352,7 +295,7 @@
(filter identity)
(into []))]
(when (seq data-points)
- (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
+ (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -362,7 +305,7 @@
(when tick-time-secs
(if (or (Utils/isSystemId (:component-id executor-data))
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
- (= :spout (:type executor-data))))
+ (= :spout (keyword (:type executor-data)))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
(.scheduleRecurring
(:user-timer worker)
@@ -374,10 +317,18 @@
(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)
+ transfer-fn (:transfer-fn executor-data)
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
- (map (fn [t] [t (task/mk-task executor-data t)]))
+ (map (fn [t] (let [task (Task. (stringify-keys executor-data) t)
+ stream StormCommon/SYSTEM_STREAM_ID
+ values ["startup"]]
+ ;; when this is called, the threads for the executor haven't been started yet,
+ ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
+ (send-unanchored task stream values transfer-fn)
+ [t task]
+ )))
(into {})
(HashMap.))
_ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
@@ -426,36 +377,36 @@
(.interrupt t)
(.join t))
- (doseq [user-context (map :user-context (vals task-datas))]
+ (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
(doseq [hook (.getHooks user-context)]
(.cleanup hook)))
(.disconnect (:storm-cluster-state executor-data))
(when @(:open-or-prepare-was-called? executor-data)
- (doseq [obj (map :object (vals task-datas))]
+ (doseq [obj (map #(.getTaskObject %) (vals task-datas))]
(close-component executor-data obj)))
(log-message "Shut down executor " component-id ":" (pr-str executor-id)))
)))
(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
- (let [^ISpout spout (:object task-data)
+ (let [^ISpout spout (.getTaskObject task-data)
storm-conf (:storm-conf executor-data)
- task-id (:task-id task-data)]
+ task-id (.getTaskId task-data)]
;;TODO: need to throttle these when there's lots of failures
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
(.fail spout msg-id)
- (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
+ (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data))
(when time-delta
(.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
(let [storm-conf (:storm-conf executor-data)
- ^ISpout spout (:object task-data)
- task-id (:task-id task-data)]
+ ^ISpout spout (.getTaskObject task-data)
+ task-id (.getTaskId task-data)]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "SPOUT Acking message " id " " msg-id))
(.ack spout msg-id)
- (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
+ (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data))
(when time-delta
(.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
@@ -496,10 +447,11 @@
;; the thread's initialized random number generator is used to generate
;; uniformily distributed random numbers.
(when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
- (task/send-unanchored
+ (send-unanchored
task-data
StormCommon/EVENTLOGGER_STREAM_ID
- [component-id message-id (System/currentTimeMillis) values]))))
+ [component-id message-id (System/currentTimeMillis) values]
+ (:transfer-fn executor-data)))))
(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
@@ -507,7 +459,7 @@
max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))
last-active (atom false)
- spouts (ArrayList. (map :object (vals task-datas)))
+ spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas)))
rand (Random. (Utils/secureRandomLong))
^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
debug? (= true (storm-conf TOPOLOGY-DEBUG))
@@ -526,7 +478,7 @@
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
Constants/CREDENTIALS_CHANGED_STREAM_ID
(let [task-data (get task-datas task-id)
- spout-obj (:object task-data)]
+ spout-obj (.getTaskObject task-data)]
(when (instance? ICredentialsListener spout-obj)
(.setCredentials spout-obj (.getValue tuple 0))))
Acker/ACKER_RESET_TIMEOUT_STREAM_ID
@@ -559,15 +511,14 @@
(while (not @(:storm-active-atom executor-data))
(Thread/sleep 100))
(log-message "Opening spout " component-id ":" (keys task-datas))
- (.registerAll (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas))))
+ (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas))))
(doseq [[task-id task-data] task-datas
- :let [^ISpout spout-obj (:object task-data)
- tasks-fn (:tasks-fn task-data)
+ :let [^ISpout spout-obj (.getTaskObject task-data)
send-spout-msg (fn [out-stream-id values message-id out-task-id]
(.increment emitted-count)
(let [out-tasks (if out-task-id
- (tasks-fn out-task-id out-stream-id values)
- (tasks-fn out-stream-id values))
+ (.getOutgoingTasks task-data out-task-id out-stream-id values)
+ (.getOutgoingTasks task-data out-stream-id values))
rooted? (and message-id has-ackers?)
root-id (if rooted? (MessageId/generateId rand))
^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
@@ -590,25 +541,27 @@
message-id
{:stream out-stream-id
:values (if debug? values nil)}
- (if (sampler) (System/currentTimeMillis))])
- (task/send-unanchored task-data
+ (if (.call ^Callable sampler) (System/currentTimeMillis))])
+ (send-unanchored task-data
Acker/ACKER_INIT_STREAM_ID
- [root-id (Utils/bitXorVals out-ids) task-id]))
+ [root-id (Utils/bitXorVals out-ids) task-id]
+ (:transfer-fn executor-data)))
(when message-id
(ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values}
- (if (sampler) 0) "0:")))
+ (if (.call ^Callable sampler) 0) "0:")))
(or out-tasks [])))]]
- (.registerAll (:builtin-metrics task-data) storm-conf (:user-context task-data))
+ (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data))
(BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
"receive" receive-queue}
- storm-conf (:user-context task-data))
+ storm-conf (.getUserContext task-data))
+
(when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
(.open spout-obj
storm-conf
- (:user-context task-data)
+ (.getUserContext task-data)
(SpoutOutputCollector.
(reify ISpoutOutputCollector
(^long getPendingCount[this]
@@ -694,7 +647,7 @@
(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
(let [storm-conf (:storm-conf executor-data)
- execute-sampler (mk-stats-sampler storm-conf)
+ execute-sampler (ConfigUtils/mkStatsSampler storm-conf)
executor-stats (:stats executor-data)
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
@@ -720,16 +673,16 @@
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/CREDENTIALS_CHANGED_STREAM_ID
- (let [task-data (get task-datas task-id)
- bolt-obj (:object task-data)]
+ (let [^Task task-data (get task-datas task-id)
+ bolt-obj (.getTaskObject task-data)]
(when (instance? ICredentialsListener bolt-obj)
- (.setCredentials bolt-obj (.getValue tuple 0))))
+ (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0))))
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
- (let [task-data (get task-datas task-id)
- ^IBolt bolt-obj (:object task-data)
- user-context (:user-context task-data)
- sampler? (sampler)
- execute-sampler? (execute-sampler)
+ (let [^Task task-data (get task-datas task-id)
+ ^IBolt bolt-obj (.getTaskObject task-data)
+ user-context (.getUserContext task-data)
+ sampler? (.call ^Callable sampler)
+ execute-sampler? (.call ^Callable execute-sampler)
now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
receive-queue (:receive-queue executor-data)]
(when sampler?
@@ -741,7 +694,7 @@
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
- (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
+ (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context)
(when delta
(.boltExecuteTuple executor-stats
(.getSourceComponent tuple)
@@ -755,13 +708,13 @@
(log-message "Preparing bolt " component-id ":" (keys task-datas))
(doseq [[task-id task-data] task-datas
- :let [^IBolt bolt-obj (:object task-data)
- tasks-fn (:tasks-fn task-data)
- user-context (:user-context task-data)
+ :let [^IBolt bolt-obj (.getTaskObject task-data)
+ user-context (.getUserContext task-data)
+ transfer-fn (:transfer-fn executor-data)
bolt-emit (fn [stream anchors values task]
(let [out-tasks (if task
- (tasks-fn task stream values)
- (tasks-fn stream values))]
+ (.getOutgoingTasks task-data task stream values)
+ (.getOutgoingTasks task-data stream values))]
(fast-list-iter [t out-tasks]
(let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
@@ -780,8 +733,8 @@
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values component-id nil rand))
(or out-tasks [])))]]
- (.registerAll (:builtin-metrics task-data) storm-conf user-context)
- (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
+ (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context)
+ (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
(if (= component-id Constants/SYSTEM_COMPONENT_ID)
(do
(BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
@@ -808,14 +761,15 @@
(let [^TupleImpl tuple tuple
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
- (task/send-unanchored task-data
- Acker/ACKER_ACK_STREAM_ID
- [root (bit-xor id ack-val)])))
+ (send-unanchored task-data
+ Acker/ACKER_ACK_STREAM_ID
+ [root (bit-xor id ack-val)]
+ transfer-fn)))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
+ (.applyOn (BoltAckInfo. tuple task-id delta) user-context)
(when delta
(.boltAckedTuple executor-stats
(.getSourceComponent tuple)
@@ -823,14 +777,15 @@
delta))))
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (task/send-unanchored task-data
+ (send-unanchored task-data
Acker/ACKER_FAIL_STREAM_ID
- [root]))
+ [root]
+ transfer-fn))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
+ (.applyOn (BoltFailInfo. tuple task-id delta) user-context)
(when delta
(.boltFailedTuple executor-stats
(.getSourceComponent tuple)
@@ -838,9 +793,10 @@
delta))))
(^void resetTimeout [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (task/send-unanchored task-data
+ (send-unanchored task-data
Acker/ACKER_RESET_TIMEOUT_STREAM_ID
- [root])))
+ [root]
+ transfer-fn)))
(reportError [this error]
(report-error error))))))
(reset! open-or-prepare-was-called? true)
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
deleted file mode 100644
index 2cfad7c..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ /dev/null
@@ -1,190 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.task
- (:use [org.apache.storm.daemon common])
- (:use [org.apache.storm config util log])
- (:import [org.apache.storm.hooks ITaskHook]
- [org.apache.storm.daemon.metrics BuiltinMetrics BuiltinMetricsUtil])
- (:import [org.apache.storm.tuple Tuple TupleImpl])
- (:import [org.apache.storm.grouping LoadMapping])
- (:import [org.apache.storm.generated SpoutSpec Bolt StateSpoutSpec StormTopology])
- (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo])
- (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext])
- (:import [org.apache.storm.utils Utils ConfigUtils])
- (:import [org.apache.storm.generated ShellComponent JavaObject])
- (:import [org.apache.storm.spout ShellSpout])
- (:import [java.util Collection List ArrayList])
- (:import [org.apache.storm Thrift]
- (org.apache.storm.daemon StormCommon)))
-
-(defn mk-topology-context-builder [worker executor-data topology]
- (let [conf (:conf worker)]
- #(TopologyContext.
- topology
- (:storm-conf worker)
- (:task->component worker)
- (:component->sorted-tasks worker)
- (:component->stream->fields worker)
- (:storm-id worker)
- (ConfigUtils/supervisorStormResourcesPath
- (ConfigUtils/supervisorStormDistRoot conf (:storm-id worker)))
- (ConfigUtils/workerPidsRoot conf (:worker-id worker))
- (int %)
- (:port worker)
- (:task-ids worker)
- (:default-shared-resources worker)
- (:user-shared-resources worker)
- (:shared-executor-data executor-data)
- (:interval->task->metric-registry executor-data)
- (:open-or-prepare-was-called? executor-data))))
-
-(defn system-topology-context [worker executor-data tid]
- ((mk-topology-context-builder
- worker
- executor-data
- (:system-topology worker))
- tid))
-
-(defn user-topology-context [worker executor-data tid]
- ((mk-topology-context-builder
- worker
- executor-data
- (:topology worker))
- tid))
-
-(defn- get-task-object [^StormTopology topology component-id]
- (let [spouts (.get_spouts topology)
- bolts (.get_bolts topology)
- state-spouts (.get_state_spouts topology)
- obj (Utils/getSetComponentObject
- (cond
- (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))
- (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))
- (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))
- true (throw (RuntimeException. (str "Could not find " component-id " in " topology)))))
- obj (if (instance? ShellComponent obj)
- (if (contains? spouts component-id)
- (ShellSpout. obj)
- (ShellBolt. obj))
- obj )
- obj (if (instance? JavaObject obj)
- (Thrift/instantiateJavaObject obj)
- obj )]
- obj
- ))
-
-(defn get-context-hooks [^TopologyContext context]
- (.getHooks context))
-
-(defn hooks-empty? [^Collection hooks]
- (.isEmpty hooks))
-
-(defmacro apply-hooks [topology-context method-sym info-form]
- (let [hook-sym (with-meta (gensym "hook") {:tag 'org.apache.storm.hooks.ITaskHook})]
- `(let [hooks# (get-context-hooks ~topology-context)]
- (when-not (hooks-empty? hooks#)
- (let [info# ~info-form]
- (fast-list-iter [~hook-sym hooks#]
- (~method-sym ~hook-sym info#)
- ))))))
-
-
-;; TODO: this is all expensive... should be precomputed
-(defn send-unanchored
- [task-data stream values]
- (let [^TopologyContext topology-context (:system-context task-data)
- tasks-fn (:tasks-fn task-data)
- transfer-fn (-> task-data :executor-data :transfer-fn)
- out-tuple (TupleImpl. topology-context
- values
- (.getThisTaskId topology-context)
- stream)]
- (fast-list-iter [t (tasks-fn stream values)]
- (transfer-fn t out-tuple))))
-
-(defn mk-tasks-fn [task-data]
- (let [task-id (:task-id task-data)
- executor-data (:executor-data task-data)
- ^LoadMapping load-mapping (:load-mapping (:worker executor-data))
- component-id (:component-id executor-data)
- ^WorkerTopologyContext worker-context (:worker-context executor-data)
- storm-conf (:storm-conf executor-data)
- emit-sampler (mk-stats-sampler storm-conf)
- stream->component->grouper (:stream->component->grouper executor-data)
- user-context (:user-context task-data)
- executor-stats (:stats executor-data)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-
- (fn ([^Integer out-task-id ^String stream ^List values]
- (when debug?
- (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
- (let [target-component (.getComponentId worker-context out-task-id)
- component->grouping (get stream->component->grouper stream)
- grouping (get component->grouping target-component)
- out-task-id (if grouping out-task-id)]
- (when (and (not-nil? grouping) (not= :direct grouping))
- (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
- (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
- (when (emit-sampler)
- (.emittedTuple executor-stats stream)
- (if out-task-id
- (.transferredTuples executor-stats stream, 1)))
- (if out-task-id [out-task-id])
- ))
- ([^String stream ^List values]
- (when debug?
- (log-message "Emitting: " component-id " " stream " " values))
- (let [out-tasks (ArrayList.)]
- (if (not (.containsKey stream->component->grouper stream))
- (throw (IllegalArgumentException. (str "Unknown stream ID: " stream))))
- (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
- (when (= :direct grouper)
- ;; TODO: this is wrong, need to check how the stream was declared
- (throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
- (let [comp-tasks (grouper task-id values load-mapping)]
- (if (or (sequential? comp-tasks) (instance? Collection comp-tasks))
- (.addAll out-tasks comp-tasks)
- (.add out-tasks comp-tasks)
- )))
- (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
- (when (emit-sampler)
- (.emittedTuple executor-stats stream)
- (.transferredTuples executor-stats stream (count out-tasks)))
- out-tasks)))
- ))
-
-(defn mk-task-data [executor-data task-id]
- (recursive-map
- :executor-data executor-data
- :task-id task-id
- :system-context (system-topology-context (:worker executor-data) executor-data task-id)
- :user-context (user-topology-context (:worker executor-data) executor-data task-id)
- :builtin-metrics (BuiltinMetricsUtil/mkData (.getName (:type executor-data)) (:stats executor-data))
- :tasks-fn (mk-tasks-fn <>)
- :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
-
-
-(defn mk-task [executor-data task-id]
- (let [task-data (mk-task-data executor-data task-id)
- storm-conf (:storm-conf executor-data)]
- (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
- (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
- ;; when this is called, the threads for the executor haven't been started yet,
- ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
- (send-unanchored task-data StormCommon/SYSTEM_STREAM_ID ["startup"])
- task-data
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java
new file mode 100644
index 0000000..d06682f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareShuffleGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.grouping.ShuffleGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class GrouperFactory {
+
+ public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId, Fields outFields,
+ Grouping thriftGrouping,
+ List<Integer> unsortedTargetTasks,
+ Map topoConf) {
+ List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
+ final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
+ .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING));
+ CustomStreamGrouping result = null;
+ switch (Thrift.groupingType(thriftGrouping)) {
+ case FIELDS:
+ if (Thrift.isGlobalGrouping(thriftGrouping)) {
+ result = new GlobalGrouper();
+ } else {
+ result = new FieldsGrouper(outFields, thriftGrouping);
+ }
+ break;
+ case SHUFFLE:
+ if (isNotLoadAware) {
+ result = new ShuffleGrouping();
+ } else {
+ result = new LoadAwareShuffleGrouping();
+ }
+ break;
+ case ALL:
+ result = new AllGrouper();
+ break;
+ case LOCAL_OR_SHUFFLE:
+ // Prefer local tasks as target tasks if possible
+ Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
+ targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
+ if (isNotLoadAware) {
+ result = new ShuffleGrouping();
+ } else {
+ result = new LoadAwareShuffleGrouping();
+ }
+ break;
+ case NONE:
+ result = new NoneGrouper();
+ break;
+ case CUSTOM_OBJECT:
+ result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
+ break;
+ case CUSTOM_SERIALIZED:
+ result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
+ break;
+ case DIRECT:
+ result = DIRECT;
+ break;
+ default:
+ result = null;
+ break;
+ }
+
+ if (null != result) {
+ result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
+ }
+
+ if (result instanceof LoadAwareCustomStreamGrouping) {
+ return (LoadAwareCustomStreamGrouping) result;
+ } else {
+ return new BasicLoadAwareCustomStreamGrouping (result);
+ }
+ }
+
+
+ /**
+ * A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping
+ */
+ public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping {
+
+ private final CustomStreamGrouping customStreamGrouping;
+
+ public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) {
+ this.customStreamGrouping = customStreamGrouping;
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+ return customStreamGrouping.chooseTasks(taskId, values);
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ customStreamGrouping.prepare(context, stream, targetTasks);
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return customStreamGrouping.chooseTasks(taskId, values);
+ }
+ }
+
+ public static class FieldsGrouper implements CustomStreamGrouping {
+
+ private Fields outFields;
+ private List<Integer> targetTasks;
+ private Fields groupFields;
+ private int numTasks;
+
+ public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
+ this.outFields = outFields;
+ this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
+
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ this.numTasks = targetTasks.size();
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
+ return Collections.singletonList(targetTasks.get(targetTaskIndex));
+ }
+ }
+
+ public static class GlobalGrouper implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+
+ public GlobalGrouper() {
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ if (targetTasks.isEmpty()) {
+ return null;
+ }
+ // It's possible for target to have multiple tasks if it reads multiple sources
+ return Collections.singletonList(targetTasks.get(0));
+ }
+ }
+
+ public static class NoneGrouper implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+ private int numTasks;
+ private final Random random;
+
+ public NoneGrouper() {
+ random = new Random();
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ this.numTasks = targetTasks.size();
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int index = random.nextInt(numTasks);
+ return Collections.singletonList(targetTasks.get(index));
+ }
+ }
+
+ public static class AllGrouper implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return targetTasks;
+ }
+ }
+
+ // A no-op grouper
+ public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+ return null;
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return null;
+ }
+
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Task.java b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
new file mode 100644
index 0000000..60b570a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.ShellComponent;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class Task {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private Map executorData;
+ private Map workerData;
+ private TopologyContext systemTopologyContext;
+ private TopologyContext userTopologyContext;
+ private WorkerTopologyContext workerTopologyContext;
+ private LoadMapping loadMapping;
+ private Integer taskId;
+ private String componentId;
+ private Object taskObject; // Spout/Bolt object
+ private Map stormConf;
+ private Callable<Boolean> emitSampler;
+ private CommonStats executorStats;
+ private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
+ private BuiltinMetrics builtInMetrics;
+ private boolean debug;
+
+ public Task(Map executorData, Integer taskId) throws IOException {
+ this.taskId = taskId;
+ this.executorData = executorData;
+ this.workerData = (Map) executorData.get("worker");
+ this.stormConf = (Map) executorData.get("storm-conf");
+ this.componentId = (String) executorData.get("component-id");
+ this.streamComponentToGrouper = (Map<String, Map<String, LoadAwareCustomStreamGrouping>>) executorData.get("stream->component->grouper");
+ this.executorStats = (CommonStats) executorData.get("stats");
+ this.builtInMetrics = BuiltinMetricsUtil.mkData((String) executorData.get("type"), this.executorStats);
+ this.workerTopologyContext = (WorkerTopologyContext) executorData.get("worker-context");
+ this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
+ this.loadMapping = (LoadMapping) workerData.get("load-mapping");
+ this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get("system-topology"));
+ this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get("topology"));
+ this.taskObject = mkTaskObject();
+ this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
+ this.addTaskHooks();
+ }
+
+ public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) {
+ if (debug) {
+ LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values);
+ }
+ String targetComponent = workerTopologyContext.getComponentId(outTaskId);
+ Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
+ LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
+ if (null == grouping) {
+ outTaskId = null;
+ }
+ if (grouping != null && grouping != GrouperFactory.DIRECT) {
+ throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
+ }
+ new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+ try {
+ if (emitSampler.call()) {
+ executorStats.emittedTuple(stream);
+ if (null != outTaskId) {
+ executorStats.transferredTuples(stream, 1);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (null != outTaskId) {
+ return Collections.singletonList(outTaskId);
+ }
+ return null;
+ }
+
+ public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
+ if (debug) {
+ LOG.info("Emitting: {} {} {}", componentId, stream, values);
+ }
+ List<Integer> outTasks = new ArrayList<>();
+ if (!streamComponentToGrouper.containsKey(stream)) {
+ throw new IllegalArgumentException("Unknown stream ID: " + stream);
+ }
+ if (null != streamComponentToGrouper.get(stream)) {
+ // null value for __system
+ for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+ if (grouper == GrouperFactory.DIRECT) {
+ throw new IllegalArgumentException("Cannot do regular emit to direct stream");
+ }
+ List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+ outTasks.addAll(compTasks);
+ }
+ }
+ new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
+ try {
+ if (emitSampler.call()) {
+ executorStats.emittedTuple(stream);
+ executorStats.transferredTuples(stream, outTasks.size());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return outTasks;
+ }
+
+ public Tuple getTuple(String stream, List values) {
+ return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream);
+ }
+
+ public Integer getTaskId() {
+ return taskId;
+ }
+
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public TopologyContext getUserContext() throws IOException {
+ return userTopologyContext;
+ }
+
+ public Object getTaskObject() {
+ return taskObject;
+ }
+
+ public BuiltinMetrics getBuiltInMetrics() {
+ return builtInMetrics;
+ }
+
+ private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
+ Map conf = (Map) workerData.get("conf");
+ return new TopologyContext(
+ topology,
+ (Map) workerData.get("storm-conf"),
+ (Map<Integer, String>) workerData.get("task->component"),
+ (Map<String, List<Integer>>) workerData.get("component->sorted-tasks"),
+ (Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"),
+ (String) workerData.get("storm-id"),
+ ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))),
+ ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")),
+ taskId,
+ (Integer) workerData.get("port"),
+ (List<Integer>) workerData.get("task-ids"),
+ (Map<String, Object>) workerData.get("default-shared-resources"),
+ (Map<String, Object>) workerData.get("user-shared-resources"),
+ (Map<String, Object>) executorData.get("shared-executor-data"),
+ (Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"),
+ (clojure.lang.Atom) executorData.get("open-or-prepare-was-called?")
+ );
+ }
+
+ private Object mkTaskObject() {
+ StormTopology topology = systemTopologyContext.getRawTopology();
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
+ Object result = null;
+ ComponentObject componentObject = null;
+ if (spouts.containsKey(componentId)) {
+ componentObject = spouts.get(componentId).get_spout_object();
+ } else if (bolts.containsKey(componentId)) {
+ componentObject = bolts.get(componentId).get_bolt_object();
+ } else if (stateSpouts.containsKey(componentId)) {
+ componentObject = stateSpouts.get(componentId).get_state_spout_object();
+ } else {
+ throw new RuntimeException("Could not find " + componentId + " in " + topology);
+ }
+ result = Utils.getSetComponentObject(componentObject);
+
+ if (result instanceof ShellComponent) {
+ if (spouts.containsKey(componentId)) {
+ result = new ShellSpout((ShellComponent) result);
+ } else {
+ result = new ShellBolt((ShellComponent) result);
+ }
+ }
+
+ if (result instanceof JavaObject) {
+ result = Thrift.instantiateJavaObject((JavaObject) result);
+ }
+
+ return result;
+ }
+
+ private void addTaskHooks() {
+ List<String> hooksClassList = (List<String>) stormConf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS);
+ if (null != hooksClassList) {
+ for (String hookClass : hooksClassList) {
+ try {
+ userTopologyContext.addTaskHook(((ITaskHook) Class.forName(hookClass).newInstance()));
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException("Failed to add hook: " + hookClass, e);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 84c75d7..2827420 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -17,19 +17,21 @@
*/
package org.apache.storm.daemon.metrics;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.api.StateMetric;
import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
import org.apache.storm.stats.SpoutExecutorStats;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.task.TopologyContext;
+import java.util.HashMap;
+import java.util.Map;
+
public class BuiltinMetricsUtil {
- public static BuiltinMetrics mkData(String type, Object stats) {
+ public static BuiltinMetrics mkData(String type, CommonStats stats) {
if (StatsUtil.SPOUT.equals(type)) {
return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
} else if (StatsUtil.BOLT.equals(type)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
index 905f747..e6f4b11 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.hooks.info;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
public class BoltAckInfo {
@@ -29,4 +31,10 @@ public class BoltAckInfo {
this.ackingTaskId = ackingTaskId;
this.processLatencyMs = processLatencyMs;
}
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.boltAck(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
index d92a1f8..73a7f33 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.hooks.info;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
public class BoltExecuteInfo {
@@ -29,4 +31,10 @@ public class BoltExecuteInfo {
this.executingTaskId = executingTaskId;
this.executeLatencyMs = executeLatencyMs;
}
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.boltExecute(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
index 38e31b7..4e1e32d 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.hooks.info;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
public class BoltFailInfo {
@@ -29,4 +31,10 @@ public class BoltFailInfo {
this.failingTaskId = failingTaskId;
this.failLatencyMs = failLatencyMs;
}
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.boltFail(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
index 3e3ed8c..52965a1 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
@@ -17,6 +17,9 @@
*/
package org.apache.storm.hooks.info;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
import java.util.Collection;
import java.util.List;
@@ -32,4 +35,10 @@ public class EmitInfo {
this.taskId = taskId;
this.outTasks = outTasks;
}
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.emit(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
index 1b1bc76..4949f0f 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
@@ -17,6 +17,9 @@
*/
package org.apache.storm.hooks.info;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
public class SpoutAckInfo {
public Object messageId;
public int spoutTaskId;
@@ -27,4 +30,10 @@ public class SpoutAckInfo {
this.spoutTaskId = spoutTaskId;
this.completeLatencyMs = completeLatencyMs;
}
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.spoutAck(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
index 34b38b7..5b40005 100644
--- a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
@@ -17,6 +17,9 @@
*/
package org.apache.storm.hooks.info;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
public class SpoutFailInfo {
public Object messageId;
public int spoutTaskId;
@@ -27,4 +30,10 @@ public class SpoutFailInfo {
this.spoutTaskId = spoutTaskId;
this.failLatencyMs = failLatencyMs;
}
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.spoutFail(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index d7b7dbf..30d314f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -18,10 +18,10 @@
package org.apache.storm.utils;
+import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
-import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.generated.StormTopology;
-import org.apache.commons.io.FileUtils;
+import org.apache.storm.validation.ConfigValidation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,14 +35,16 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.reflect.Field;
+import java.net.URLEncoder;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.net.URLEncoder;
+import java.util.concurrent.Callable;
public class ConfigUtils {
private final static Logger LOG = LoggerFactory.getLogger(ConfigUtils.class);
@@ -135,7 +137,28 @@ public class ConfigUtils {
throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
}
- // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util
+ public static Callable<Boolean> evenSampler(final int samplingFreq) {
+ final Random random = new Random();
+
+ return new Callable<Boolean>() {
+ private int curr = -1;
+ private int target = random.nextInt(samplingFreq);
+
+ @Override
+ public Boolean call() throws Exception {
+ curr++;
+ if (curr >= samplingFreq) {
+ curr = 0;
+ target = random.nextInt(samplingFreq);
+ }
+ return (curr == target);
+ }
+ };
+ }
+
+ public static Callable<Boolean> mkStatsSampler(Map conf) {
+ return evenSampler(samplingRate(conf));
+ }
// we use this "wired" wrapper pattern temporarily for mocking in clojure test
public static Map readStormConfig() {
http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 61caf68..487d80f 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -16,21 +16,24 @@
(ns org.apache.storm.grouping-test
(:use [clojure test])
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
- [org.apache.storm.generated JavaObject JavaObjectArg])
+ [org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
(:import [org.apache.storm.grouping LoadMapping])
(:use [org.apache.storm testing log config])
(:use [org.apache.storm.internal clojure])
(:use [org.apache.storm.daemon common executor])
(:import [org.apache.storm Thrift])
- (:import [org.apache.storm.utils Utils]))
+ (:import [org.apache.storm.utils Utils]
+ (org.apache.storm.daemon GrouperFactory)))
+
+(def shuffle-grouping (Grouping/shuffle (NullStruct. )))
(deftest test-shuffle
- (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true} nil "comp" "stream")
+ (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true})
num-messages 100000
min-prcnt (int (* num-messages 0.49))
max-prcnt (int (* num-messages 0.51))
data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data nil)))
+ freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil)))
load1 (.get freq [(int 1)])
load2 (.get freq [(int 2)])]
(log-message "FREQ:" freq)
@@ -40,14 +43,14 @@
(is (<= load2 max-prcnt))))
(deftest test-shuffle-load-even
- (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {} nil "comp" "stream")
+ (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
num-messages 100000
min-prcnt (int (* num-messages 0.49))
max-prcnt (int (* num-messages 0.51))
load (LoadMapping.)
_ (.setLocal load {(int 1) 0.0 (int 2) 0.0})
data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data load)))
+ freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
load1 (.get freq [(int 1)])
load2 (.get freq [(int 2)])]
(log-message "FREQ:" freq)
@@ -57,7 +60,7 @@
(is (<= load2 max-prcnt))))
(deftest test-shuffle-load-uneven
- (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {} nil "comp" "stream")
+ (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
num-messages 100000
min1-prcnt (int (* num-messages 0.32))
max1-prcnt (int (* num-messages 0.34))
@@ -66,7 +69,7 @@
load (LoadMapping.)
_ (.setLocal load {(int 1) 0.5 (int 2) 0.0})
data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data load)))
+ freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
load1 (.get freq [(int 1)])
load2 (.get freq [(int 2)])]
(log-message "FREQ:" freq)
[2/3] storm git commit: Merge branch 'task' of
https://github.com/abhishekagarwal87/storm into STORM-1271
Posted by bo...@apache.org.
Merge branch 'task' of https://github.com/abhishekagarwal87/storm into STORM-1271
STORM-1271: Port backtype.storm.daemon.task to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31d558ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31d558ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31d558ca
Branch: refs/heads/master
Commit: 31d558cad5c8ee99f0e3540203cfa7e04341d6f8
Parents: 223b615 7d63cb3
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Mar 31 14:27:36 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 14:27:36 2016 -0500
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/config.clj | 27 --
.../clj/org/apache/storm/daemon/executor.clj | 222 +++++++----------
.../src/clj/org/apache/storm/daemon/task.clj | 190 --------------
.../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++++++++++++
.../src/jvm/org/apache/storm/daemon/Task.java | 247 +++++++++++++++++++
.../daemon/metrics/BuiltinMetricsUtil.java | 8 +-
.../apache/storm/hooks/info/BoltAckInfo.java | 8 +
.../storm/hooks/info/BoltExecuteInfo.java | 8 +
.../apache/storm/hooks/info/BoltFailInfo.java | 8 +
.../org/apache/storm/hooks/info/EmitInfo.java | 9 +
.../apache/storm/hooks/info/SpoutAckInfo.java | 9 +
.../apache/storm/hooks/info/SpoutFailInfo.java | 9 +
.../jvm/org/apache/storm/utils/ConfigUtils.java | 35 ++-
.../test/clj/org/apache/storm/grouping_test.clj | 19 +-
14 files changed, 675 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/31d558ca/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index d5f80db,30d314f..0f53343
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@@ -135,9 -137,30 +137,30 @@@ public class ConfigUtils
throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
}
- // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util
+ public static Callable<Boolean> evenSampler(final int samplingFreq) {
+ final Random random = new Random();
+
+ return new Callable<Boolean>() {
+ private int curr = -1;
+ private int target = random.nextInt(samplingFreq);
+
+ @Override
+ public Boolean call() throws Exception {
+ curr++;
+ if (curr >= samplingFreq) {
+ curr = 0;
+ target = random.nextInt(samplingFreq);
+ }
+ return (curr == target);
+ }
+ };
+ }
+
+ public static Callable<Boolean> mkStatsSampler(Map conf) {
+ return evenSampler(samplingRate(conf));
+ }
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static Map readStormConfig() {
return _instance.readStormConfigImpl();
}
[3/3] storm git commit: Added STORM-1271 to Changelog
Posted by bo...@apache.org.
Added STORM-1271 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08449278
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08449278
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08449278
Branch: refs/heads/master
Commit: 084492782356220404b560aae2567cd4ee7f8052
Parents: 31d558c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Mar 31 14:54:22 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 14:54:22 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/08449278/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fef71c5..06ab074 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1271: Port backtype.storm.daemon.task to java
* STORM-822: Kafka Spout New Consumer API
* STORM-1663: Stats couldn't handle null worker HB.
* STORM-1665: Worker cannot instantiate kryo