You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:21 UTC
[3/5] storm git commit: STORM-2039: split out backpressure worker
timer into its own
STORM-2039: split out backpressure worker timer into its own
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d090d877
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d090d877
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d090d877
Branch: refs/heads/master
Commit: d090d877f002ecd8be831639d2c29e95f820e7dc
Parents: 96b3325
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Wed Aug 17 08:03:58 2016 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 10 +++++++---
storm-core/src/jvm/org/apache/storm/Config.java | 7 +++++++
3 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e92c11d..8392936 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -185,6 +185,7 @@ topology.worker.receiver.thread.count: 1
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
# now should be null by default
topology.backpressure.enable: false
http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 2b6843c..222de36 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -306,6 +306,7 @@
:reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
:refresh-active-timer (mk-halting-timer "refresh-active-timer")
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+ :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
:user-timer (mk-halting-timer "user-timer")
:task->component (StormCommon/stormTaskInfo topology storm-conf) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
@@ -780,14 +781,17 @@
(.topologyLogConfig (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed))))
(establish-log-setting-callback)
+
(clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id (fn [] (check-credentials-changed))))
(.scheduleRecurring
(:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
(fn []
- (check-credentials-changed)
- (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (topology-backpressure-callback))))
+ (check-credentials-changed)))
+
+ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (.scheduleRecurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
+
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(.scheduleRecurringWithJitter
http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index f27f966..43369bd 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1497,6 +1497,13 @@ public class Config extends HashMap<String, Object> {
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
/**
+ * How often to poll for changed topology backpressure flag from ZK
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+ /**
* Whether to enable backpressure in for a certain topology
*/
@isBoolean