You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:05 UTC
[3/6] storm git commit: STORM-2039: split out backpressure worker
timer into its own
STORM-2039: split out backpressure worker timer into its own
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17c23950
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17c23950
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17c23950
Branch: refs/heads/1.0.x-branch
Commit: 17c23950e419c9ea65190c193d19c314ddf8b6e5
Parents: 66593b6
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:05 2016 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 11 ++++++++---
storm-core/src/jvm/org/apache/storm/Config.java | 7 +++++++
3 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b63d8d8..1654ab1 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -186,6 +186,7 @@ topology.worker.receiver.thread.count: 1
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
# now should be null by default
topology.backpressure.enable: false
http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 40fb54a..fd3cf61 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -293,6 +293,7 @@
:reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
:refresh-active-timer (mk-halting-timer "refresh-active-timer")
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+ :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
:user-timer (mk-halting-timer "user-timer")
:task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
@@ -734,9 +735,13 @@
(.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
(schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
(fn [& args]
- (check-credentials-changed)
- (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (topology-backpressure-callback))))
+ (check-credentials-changed)))
+
+ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS)
+ (fn []
+ (topology-backpressure-callback))))
+
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 3973e1a..39dcc8f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1505,6 +1505,13 @@ public class Config extends HashMap<String, Object> {
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
/**
+ * How often to poll for changed topology backpressure flag from ZK
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+ /**
* Whether to enable backpressure in for a certain topology
*/
@isBoolean