You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/17 00:20:54 UTC
[1/3] storm git commit: STORM-1027: Use overflow buffer for emitting
metrics
Repository: storm
Updated Branches:
refs/heads/master e78fcd7ff -> 5f98cd2c1
STORM-1027: Use overflow buffer for emitting metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3144a533
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3144a533
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3144a533
Branch: refs/heads/master
Commit: 3144a533b7ae069cd23483544cc58fa962d88fda
Parents: 154e9ec
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Sep 4 17:15:02 2015 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Fri Sep 4 17:15:02 2015 +0530
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 71 +++++++++++---------
1 file changed, 38 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3144a533/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index e9c7a2e..326def1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -294,27 +294,32 @@
receive-queue
[[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))
-(defn metrics-tick [executor-data task-data ^TupleImpl tuple]
- (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
- interval (.getInteger tuple 0)
- task-id (:task-id task-data)
- name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
- task-info (IMetricsConsumer$TaskInfo.
- (hostname (:storm-conf executor-data))
- (.getThisWorkerPort worker-context)
- (:component-id executor-data)
- task-id
- (long (/ (System/currentTimeMillis) 1000))
- interval)
- data-points (->> name->imetric
- (map (fn [[name imetric]]
- (let [value (.getValueAndReset ^IMetric imetric)]
- (if value
- (IMetricsConsumer$DataPoint. name value)))))
- (filter identity)
- (into []))]
- (if (seq data-points)
- (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
+(defn metrics-tick
+ ([executor-data task-data ^TupleImpl tuple overflow-buffer]
+ (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
+ interval (.getInteger tuple 0)
+ task-id (:task-id task-data)
+ name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
+ task-info (IMetricsConsumer$TaskInfo.
+ (hostname (:storm-conf executor-data))
+ (.getThisWorkerPort worker-context)
+ (:component-id executor-data)
+ task-id
+ (long (/ (System/currentTimeMillis) 1000))
+ interval)
+ data-points (->> name->imetric
+ (map (fn [[name imetric]]
+ (let [value (.getValueAndReset ^IMetric imetric)]
+ (if value
+ (IMetricsConsumer$DataPoint. name value)))))
+ (filter identity)
+ (into []))]
+ (if (seq data-points)
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] overflow-buffer))))
+ ([executor-data task-data ^TupleImpl tuple]
+ (metrics-tick executor-data task-data tuple nil)
+ ))
+
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -453,7 +458,16 @@
last-active (atom false)
spouts (ArrayList. (map :object (vals task-datas)))
rand (Random. (Utils/secureRandomLong))
-
+
+ ;; the overflow buffer is used to ensure that spouts never block when emitting
+ ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
+ ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
+ ;; buffers filled up)
+ ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
+ ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,
+ ;; preventing memory issues
+ overflow-buffer (ConcurrentLinkedQueue.)
+
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
(reify RotatingMap$ExpiredCallback
@@ -465,7 +479,7 @@
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple overflow-buffer)
Constants/CREDENTIALS_CHANGED_STREAM_ID
(let [task-data (get task-datas task-id)
spout-obj (:object task-data)]
@@ -489,16 +503,7 @@
event-handler (mk-task-receiver executor-data tuple-action-fn)
has-ackers? (has-ackers? storm-conf)
emitted-count (MutableLong. 0)
- empty-emit-streak (MutableLong. 0)
-
- ;; the overflow buffer is used to ensure that spouts never block when emitting
- ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
- ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
- ;; buffers filled up)
- ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
- ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,
- ;; preventing memory issues
- overflow-buffer (ConcurrentLinkedQueue.)]
+ empty-emit-streak (MutableLong. 0)]
[(async-loop
(fn []
[3/3] storm git commit: Merge branch 'work' of
https://github.com/abhishekagarwal87/storm into STORM-1027
Posted by ka...@apache.org.
Merge branch 'work' of https://github.com/abhishekagarwal87/storm into STORM-1027
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5f98cd2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5f98cd2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5f98cd2c
Branch: refs/heads/master
Commit: 5f98cd2c1754b3b1b1844d83fd4eb1fb18588fe4
Parents: e78fcd7 62d30a0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 17 07:19:02 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 17 07:19:02 2015 +0900
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 93 +++++++++++---------
1 file changed, 49 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
[2/3] storm git commit: STORM-1027: Apply overflow buffer to metric
emit in bolt
Posted by ka...@apache.org.
STORM-1027: Apply overflow buffer to metric emit in bolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62d30a04
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62d30a04
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62d30a04
Branch: refs/heads/master
Commit: 62d30a04371f52ba5cd29279d8dd7e9f19954d8c
Parents: 3144a53
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Wed Sep 9 18:53:04 2015 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Wed Sep 9 18:53:04 2015 +0530
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/executor.clj | 22 ++++++++++----------
1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d30a04/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 326def1..a9a36a2 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -646,6 +646,15 @@
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
+
+ ;; the overflow buffer is used to ensure that bolts do not block when emitting
+ ;; this ensures that the bolt can always clear the incoming messages, which
+ ;; prevents deadlock from occurs across the topology
+ ;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
+ ;; buffers filled up)
+ ;; the overflow buffer is might gradually fill degrading the performance gradually
+ ;; eventually running out of memory, but at least prevent live-locks/deadlocks.
+ overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -670,7 +679,7 @@
bolt-obj (:object task-data)]
(when (instance? ICredentialsListener bolt-obj)
(.setCredentials bolt-obj (.getValue tuple 0))))
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple overflow-buffer)
(let [task-data (get task-datas task-id)
^IBolt bolt-obj (:object task-data)
user-context (:user-context task-data)
@@ -696,16 +705,7 @@
(stats/bolt-execute-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)))))))
-
- ;; the overflow buffer is used to ensure that bolts do not block when emitting
- ;; this ensures that the bolt can always clear the incoming messages, which
- ;; prevents deadlock from occurs across the topology
- ;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
- ;; buffers filled up)
- ;; the overflow buffer is might gradually fill degrading the performance gradually
- ;; eventually running out of memory, but at least prevent live-locks/deadlocks.
- overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)]
+ delta)))))))]
;; TODO: can get any SubscribedState objects out of the context now