You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:05 UTC

[3/6] storm git commit: STORM-2039: split out backpressure worker timer into its own

STORM-2039: split out backpressure worker timer into its own


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17c23950
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17c23950
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17c23950

Branch: refs/heads/1.0.x-branch
Commit: 17c23950e419c9ea65190c193d19c314ddf8b6e5
Parents: 66593b6
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:05 2016 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                                    |  1 +
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 11 ++++++++---
 storm-core/src/jvm/org/apache/storm/Config.java       |  7 +++++++
 3 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b63d8d8..1654ab1 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -186,6 +186,7 @@ topology.worker.receiver.thread.count: 1
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
 
 # now should be null by default
 topology.backpressure.enable: false

http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 40fb54a..fd3cf61 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -293,6 +293,7 @@
       :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
       :refresh-active-timer (mk-halting-timer "refresh-active-timer")
       :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+      :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
       :user-timer (mk-halting-timer "user-timer")
       :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
       :component->stream->fields (component->stream->fields (:system-topology <>))
@@ -734,9 +735,13 @@
     (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
     (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
                         (fn [& args]
-                          (check-credentials-changed)
-                          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-                            (topology-backpressure-callback))))
+                          (check-credentials-changed)))
+
+    (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+      (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS)
+        (fn [] 
+          (topology-backpressure-callback))))
+
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
       (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))

http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 3973e1a..39dcc8f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1505,6 +1505,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 
     /**
+     * How often to poll for changed topology backpressure flag from ZK
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+    /**
      * Whether to enable backpressure in for a certain topology
      */
     @isBoolean