You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:03 UTC

[1/6] storm git commit: STORM-2039: backpressure refactoring in worker and executor

Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch b830ad9c3 -> 22fc6f2a5


STORM-2039: backpressure refactoring in worker and executor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fbf8556
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fbf8556
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fbf8556

Branch: refs/heads/1.0.x-branch
Commit: 3fbf8556911a97632d11ae30021776ba0db9288b
Parents: b830ad9
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Mon Aug 15 14:28:40 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:31:57 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/executor.clj   | 15 +++++----------
 .../src/clj/org/apache/storm/daemon/worker.clj     | 17 +++++------------
 2 files changed, 10 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3fbf8556/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8c835d5..820a20c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -273,7 +273,6 @@
                                (log-message "Got interrupted excpetion shutting thread down...")
                                ((:suicide-fn <>))))
      :sampler (mk-stats-sampler storm-conf)
-     :backpressure (atom false)
      :spout-throttling-metrics (if (= executor-type :spout) 
                                 (builtin-metrics/make-spout-throttling-data)
                                 nil)
@@ -286,16 +285,12 @@
   (disruptor/disruptor-backpressure-handler
     (fn []
       "When receive queue is above highWaterMark"
-      (if (not @(:backpressure executor-data))
-        (do (reset! (:backpressure executor-data) true)
-            (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
+      (do (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
+          (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data)))))
     (fn []
       "When receive queue is below lowWaterMark"
-      (if @(:backpressure executor-data)
-        (do (reset! (:backpressure executor-data) false)
-            (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
+      (do (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
+          (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data)))))))
 
 (defn start-batch-transfer->worker-handler! [worker executor-data]
   (let [worker-transfer-fn (:transfer-fn worker)
@@ -408,7 +403,7 @@
               val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
           (disruptor/publish receive-queue val)))
       (get-backpressure-flag [this]
-        @(:backpressure executor-data))
+        (.getThrottleOn (:receive-queue executor-data)))
       Shutdownable
       (shutdown
         [this]

http://git-wip-us.apache.org/repos/asf/storm/blob/3fbf8556/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b8bc423..5a123fa 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -160,10 +160,10 @@
   check highWaterMark and lowWaterMark for backpressure"
   (disruptor/disruptor-backpressure-handler
     (fn []
-      (reset! (:transfer-backpressure worker) true)
+      (log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true")
       (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
     (fn []
-      (reset! (:transfer-backpressure worker) false)
+      (log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false")
       (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
 
 (defn mk-transfer-fn [worker]
@@ -316,7 +316,6 @@
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
       :backpressure (atom false) ;; whether this worker is going slow
-      :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
       :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
       :throttle-on (atom false) ;; whether throttle is activated for spouts
       )))
@@ -649,11 +648,11 @@
         backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) 
             (.start backpressure-thread))
-        callback (fn cb [& ignored]
+        topology-backpressure-callback (fn cb [& ignored]
                    (let [throttle-on (.topology-backpressure storm-cluster-state storm-id cb)]
                      (reset! (:throttle-on worker) throttle-on)))
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (.topology-backpressure storm-cluster-state storm-id callback))
+            (.topology-backpressure storm-cluster-state storm-id topology-backpressure-callback))
 
         shutdown* (fn []
                     (log-message "Shutting down worker " storm-id " " assignment-id " " port)
@@ -721,12 +720,6 @@
                                         (AuthUtils/updateSubject subject auto-creds new-creds)
                                         (dofor [e @executors] (.credentials-changed e new-creds))
                                         (reset! credentials new-creds))))
-       check-throttle-changed (fn []
-                                (let [callback (fn cb [& ignored]
-                                                 (let [throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id cb)]
-                                                   (reset! (:throttle-on worker) throttle-on)))
-                                      new-throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id callback)]
-                                    (reset! (:throttle-on worker) new-throttle-on)))
         check-log-config-changed (fn []
                                   (let [log-config (.topology-log-config (:storm-cluster-state worker) storm-id nil)]
                                     (process-log-config-change latest-log-config original-log-levels log-config)
@@ -743,7 +736,7 @@
                         (fn [& args]
                           (check-credentials-changed)
                           (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-                            (check-throttle-changed))))
+                            (topology-backpressure-callback))))
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
       (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))


[6/6] storm git commit: add STORM-2039 to CHANGELOG

Posted by ka...@apache.org.
add STORM-2039 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22fc6f2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22fc6f2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22fc6f2a

Branch: refs/heads/1.0.x-branch
Commit: 22fc6f2a5bbccbfeb3ce236919c20b5ca8642b08
Parents: 54e6448
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:36:48 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:36:48 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/22fc6f2a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0bede74..2503627 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.3
+ * STORM-2039: Backpressure refactoring in worker and executor
  * STORM-2064: Add storm name and function, access result and function to log-thrift-access
  * STORM-2063: Add thread name in worker logs
  * STORM-2042: Nimbus client connections not closed properly causing connection leaks


[3/6] storm git commit: STORM-2039: split out backpressure worker timer into its own

Posted by ka...@apache.org.
STORM-2039: split out backpressure worker timer into its own


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17c23950
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17c23950
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17c23950

Branch: refs/heads/1.0.x-branch
Commit: 17c23950e419c9ea65190c193d19c314ddf8b6e5
Parents: 66593b6
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:05 2016 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                                    |  1 +
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 11 ++++++++---
 storm-core/src/jvm/org/apache/storm/Config.java       |  7 +++++++
 3 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b63d8d8..1654ab1 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -186,6 +186,7 @@ topology.worker.receiver.thread.count: 1
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
 
 # now should be null by default
 topology.backpressure.enable: false

http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 40fb54a..fd3cf61 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -293,6 +293,7 @@
       :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
       :refresh-active-timer (mk-halting-timer "refresh-active-timer")
       :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+      :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
       :user-timer (mk-halting-timer "user-timer")
       :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
       :component->stream->fields (component->stream->fields (:system-topology <>))
@@ -734,9 +735,13 @@
     (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
     (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
                         (fn [& args]
-                          (check-credentials-changed)
-                          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-                            (topology-backpressure-callback))))
+                          (check-credentials-changed)))
+
+    (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+      (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS)
+        (fn [] 
+          (topology-backpressure-callback))))
+
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
       (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))

http://git-wip-us.apache.org/repos/asf/storm/blob/17c23950/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 3973e1a..39dcc8f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1505,6 +1505,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 
     /**
+     * How often to poll for changed topology backpressure flag from ZK
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+    /**
      * Whether to enable backpressure in for a certain topology
      */
     @isBoolean


[4/6] storm git commit: STORM-2039: no need to wrap the backpressure function in another function

Posted by ka...@apache.org.
STORM-2039: no need to wrap the backpressure function in another function


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c924c0e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c924c0e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c924c0e7

Branch: refs/heads/1.0.x-branch
Commit: c924c0e718b89c2a3bbe20dd03659005aafa5430
Parents: 17c2395
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Wed Aug 17 08:05:40 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:09 2016 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c924c0e7/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index fd3cf61..6a0ff38 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -738,9 +738,7 @@
                           (check-credentials-changed)))
 
     (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-      (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS)
-        (fn [] 
-          (topology-backpressure-callback))))
+      (schedule-recurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
 
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)


[2/6] storm git commit: STORM-2039: make :backpressure-trigger an Object instead of an atom

Posted by ka...@apache.org.
STORM-2039: make :backpressure-trigger an Object instead of an atom


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66593b68
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66593b68
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66593b68

Branch: refs/heads/1.0.x-branch
Commit: 66593b68a36090cae51fb8e0f1113246cae99de7
Parents: 3fbf855
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 21:23:19 2016 -0500
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:32:01 2016 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/66593b68/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 5a123fa..40fb54a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -316,7 +316,7 @@
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
       :backpressure (atom false) ;; whether this worker is going slow
-      :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
+      :backpressure-trigger (Object.) ;; a trigger for synchronization with executors
       :throttle-on (atom false) ;; whether throttle is activated for spouts
       )))
 


[5/6] storm git commit: Merge branch 'STORM-2039-1.0.x' into 1.0.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2039-1.0.x' into 1.0.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54e64489
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54e64489
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54e64489

Branch: refs/heads/1.0.x-branch
Commit: 54e64489b871bf579486accab7f8ab5a91115f88
Parents: b830ad9 c924c0e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:36:29 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:36:29 2016 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 .../clj/org/apache/storm/daemon/executor.clj    | 15 ++++-------
 .../src/clj/org/apache/storm/daemon/worker.clj  | 26 +++++++++-----------
 storm-core/src/jvm/org/apache/storm/Config.java |  7 ++++++
 4 files changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------