You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:21 UTC

[3/5] storm git commit: STORM-2039: split out backpressure worker timer into its own

STORM-2039: split out backpressure worker timer into its own


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d090d877
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d090d877
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d090d877

Branch: refs/heads/master
Commit: d090d877f002ecd8be831639d2c29e95f820e7dc
Parents: 96b3325
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Wed Aug 17 08:03:58 2016 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                                    |  1 +
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 10 +++++++---
 storm-core/src/jvm/org/apache/storm/Config.java       |  7 +++++++
 3 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e92c11d..8392936 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -185,6 +185,7 @@ topology.worker.receiver.thread.count: 1
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
 
 # now should be null by default
 topology.backpressure.enable: false

http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 2b6843c..222de36 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -306,6 +306,7 @@
       :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
       :refresh-active-timer (mk-halting-timer "refresh-active-timer")
       :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+      :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
       :user-timer (mk-halting-timer "user-timer")
       :task->component (StormCommon/stormTaskInfo topology storm-conf) ; for optimized access when used in tasks later on
       :component->stream->fields (component->stream->fields (:system-topology <>))
@@ -780,14 +781,17 @@
       (.topologyLogConfig (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed))))
 
     (establish-log-setting-callback)
+
     (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id (fn [] (check-credentials-changed))))
 
     (.scheduleRecurring
       (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
         (fn []
-          (check-credentials-changed)
-          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (topology-backpressure-callback))))
+          (check-credentials-changed)))
+
+    (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+      (.scheduleRecurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
+
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
       (.scheduleRecurringWithJitter

http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index f27f966..43369bd 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1497,6 +1497,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 
     /**
+     * How often to poll for changed topology backpressure flag from ZK
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+    /**
      * Whether to enable backpressure in for a certain topology
      */
     @isBoolean