You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:03 UTC
[1/6] storm git commit: STORM-2039: backpressure refactoring in
worker and executor
Repository: storm
Updated Branches:
refs/heads/1.0.x-branch b830ad9c3 -> 22fc6f2a5
STORM-2039: backpressure refactoring in worker and executor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fbf8556
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fbf8556
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fbf8556
Branch: refs/heads/1.0.x-branch
Commit: 3fbf8556911a97632d11ae30021776ba0db9288b
Parents: b830ad9
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Mon Aug 15 14:28:40 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:31:57 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/executor.clj | 15 +++++----------
.../src/clj/org/apache/storm/daemon/worker.clj | 17 +++++------------
2 files changed, 10 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3fbf8556/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8c835d5..820a20c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -273,7 +273,6 @@
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>))))
:sampler (mk-stats-sampler storm-conf)
- :backpressure (atom false)
:spout-throttling-metrics (if (= executor-type :spout)
(builtin-metrics/make-spout-throttling-data)
nil)
@@ -286,16 +285,12 @@
(disruptor/disruptor-backpressure-handler
(fn []
"When receive queue is above highWaterMark"
- (if (not @(:backpressure executor-data))
- (do (reset! (:backpressure executor-data) true)
- (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
+ (do (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data)))))
(fn []
"When receive queue is below lowWaterMark"
- (if @(:backpressure executor-data)
- (do (reset! (:backpressure executor-data) false)
- (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
+ (do (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data)))))))
(defn start-batch-transfer->worker-handler! [worker executor-data]
(let [worker-transfer-fn (:transfer-fn worker)
@@ -408,7 +403,7 @@
val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
(disruptor/publish receive-queue val)))
(get-backpressure-flag [this]
- @(:backpressure executor-data))
+ (.getThrottleOn (:receive-queue executor-data)))
Shutdownable
(shutdown
[this]
http://git-wip-us.apache.org/repos/asf/storm/blob/3fbf8556/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b8bc423..5a123fa 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -160,10 +160,10 @@
check highWaterMark and lowWaterMark for backpressure"
(disruptor/disruptor-backpressure-handler
(fn []
- (reset! (:transfer-backpressure worker) true)
+ (log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
(fn []
- (reset! (:transfer-backpressure worker) false)
+ (log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
(defn mk-transfer-fn [worker]
@@ -316,7 +316,6 @@
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
:backpressure (atom false) ;; whether this worker is going slow
- :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
:backpressure-trigger (atom false) ;; a trigger for synchronization with executors
:throttle-on (atom false) ;; whether throttle is activated for spouts
)))
@@ -649,11 +648,11 @@
backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.start backpressure-thread))
- callback (fn cb [& ignored]
+ topology-backpressure-callback (fn cb [& ignored]
(let [throttle-on (.topology-backpressure storm-cluster-state storm-id cb)]
(reset! (:throttle-on worker) throttle-on)))
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (.topology-backpressure storm-cluster-state storm-id callback))
+ (.topology-backpressure storm-cluster-state storm-id topology-backpressure-callback))
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " assignment-id " " port)
@@ -721,12 +720,6 @@
(AuthUtils/updateSubject subject auto-creds new-creds)
(dofor [e @executors] (.credentials-changed e new-creds))
(reset! credentials new-creds))))
- check-throttle-changed (fn []
- (let [callback (fn cb [& ignored]
- (let [throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id cb)]
- (reset! (:throttle-on worker) throttle-on)))
- new-throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id callback)]
- (reset! (:throttle-on worker) new-throttle-on)))
check-log-config-changed (fn []
(let [log-config (.topology-log-config (:storm-cluster-state worker) storm-id nil)]
(process-log-config-change latest-log-config original-log-levels log-config)
@@ -743,7 +736,7 @@
(fn [& args]
(check-credentials-changed)
(if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (check-throttle-changed))))
+ (topology-backpressure-callback))))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
[6/6] storm git commit: add STORM-2039 to CHANGELOG
Posted by ka...@apache.org.
add STORM-2039 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22fc6f2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22fc6f2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22fc6f2a
Branch: refs/heads/1.0.x-branch
Commit: 22fc6f2a5bbccbfeb3ce236919c20b5ca8642b08
Parents: 54e6448
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:36:48 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:36:48 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/22fc6f2a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0bede74..2503627 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.3
+ * STORM-2039: Backpressure refactoring in worker and executor
* STORM-2064: Add storm name and function, access result and function to log-thrift-access
* STORM-2063: Add thread name in worker logs
* STORM-2042: Nimbus client connections not closed properly causing connection leaks
[3/6] storm git commit: STORM-2039: split out backpressure worker
timer into its own
Posted by ka...@apache.org.
STORM-2039: split out backpressure worker timer into its own
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17c23950
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17c23950
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17c23950
Branch: refs/heads/1.0.x-branch
Commit: 17c23950e419c9ea65190c193d19c314ddf8b6e5
Parents: 66593b6
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:05 2016 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 11 ++++++++---
storm-core/src/jvm/org/apache/storm/Config.java | 7 +++++++
3 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b63d8d8..1654ab1 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -186,6 +186,7 @@ topology.worker.receiver.thread.count: 1
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
# now should be null by default
topology.backpressure.enable: false
http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 40fb54a..fd3cf61 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -293,6 +293,7 @@
:reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
:refresh-active-timer (mk-halting-timer "refresh-active-timer")
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+ :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
:user-timer (mk-halting-timer "user-timer")
:task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
@@ -734,9 +735,13 @@
(.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
(schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
(fn [& args]
- (check-credentials-changed)
- (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (topology-backpressure-callback))))
+ (check-credentials-changed)))
+
+ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS)
+ (fn []
+ (topology-backpressure-callback))))
+
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 3973e1a..39dcc8f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1505,6 +1505,13 @@ public class Config extends HashMap<String, Object> {
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
/**
+ * How often to poll for changed topology backpressure flag from ZK
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+ /**
* Whether to enable backpressure in for a certain topology
*/
@isBoolean
[4/6] storm git commit: STORM-2039: no need to wrap the backpressure
function in another function
Posted by ka...@apache.org.
STORM-2039: no need to wrap the backpressure function in another function
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c924c0e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c924c0e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c924c0e7
Branch: refs/heads/1.0.x-branch
Commit: c924c0e718b89c2a3bbe20dd03659005aafa5430
Parents: 17c2395
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Wed Aug 17 08:05:40 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:09 2016 +0900
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c924c0e7/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index fd3cf61..6a0ff38 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -738,9 +738,7 @@
(check-credentials-changed)))
(if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS)
- (fn []
- (topology-backpressure-callback))))
+ (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
[2/6] storm git commit: STORM-2039: make :backpressure-trigger an
Object instead of an atom
Posted by ka...@apache.org.
STORM-2039: make :backpressure-trigger an Object instead of an atom
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66593b68
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66593b68
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66593b68
Branch: refs/heads/1.0.x-branch
Commit: 66593b68a36090cae51fb8e0f1113246cae99de7
Parents: 3fbf855
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 21:23:19 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:01 2016 +0900
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/66593b68/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 5a123fa..40fb54a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -316,7 +316,7 @@
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
:backpressure (atom false) ;; whether this worker is going slow
- :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
+ :backpressure-trigger (Object.) ;; a trigger for synchronization with executors
:throttle-on (atom false) ;; whether throttle is activated for spouts
)))
[5/6] storm git commit: Merge branch 'STORM-2039-1.0.x' into
1.0.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2039-1.0.x' into 1.0.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54e64489
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54e64489
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54e64489
Branch: refs/heads/1.0.x-branch
Commit: 54e64489b871bf579486accab7f8ab5a91115f88
Parents: b830ad9 c924c0e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:36:29 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:36:29 2016 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
.../clj/org/apache/storm/daemon/executor.clj | 15 ++++-------
.../src/clj/org/apache/storm/daemon/worker.clj | 26 +++++++++-----------
storm-core/src/jvm/org/apache/storm/Config.java | 7 ++++++
4 files changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------