You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/09 21:20:31 UTC

[1/3] storm git commit: STORM-1087: Avoid issues with transfer-queue backpressure.

Repository: storm
Updated Branches:
  refs/heads/master 86f2d03c2 -> ada8097e1


STORM-1087: Avoid issues with transfer-queue backpressure.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85a3b3eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85a3b3eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85a3b3eb

Branch: refs/heads/master
Commit: 85a3b3eb63f3285043d57762caf0ec5629c2d3c8
Parents: 7cf4d25
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 5 11:48:11 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 5 11:48:11 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 27 ++++++++++----------
 .../backtype/storm/utils/DisruptorQueue.java    | 11 ++++++--
 2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/85a3b3eb/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 781a959..fca64d1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -123,12 +123,13 @@
             port (:port worker)
             storm-cluster-state (:storm-cluster-state worker)
             prev-backpressure-flag @(:backpressure worker)]
-        (if executors 
-          (if (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))
-            (reset! (:backpressure worker) true)   ;; at least one executor has set backpressure
-            (reset! (:backpressure worker) false))) ;; no executor has backpressure set
+        (when executors
+          (reset! (:backpressure worker)
+                  (or @(:transfer-backpressure worker)
+                      (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))))
         ;; update the worker's backpressure flag to zookeeper only when it has changed
-        (if (not= prev-backpressure-flag @(:backpressure worker))
+        (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag)
+        (when (not= prev-backpressure-flag @(:backpressure worker))
           (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker)))
         ))))
 
@@ -137,14 +138,11 @@
   check highWaterMark and lowWaterMark for backpressure"
   (disruptor/disruptor-backpressure-handler
     (fn []
-      "When worker's queue is above highWaterMark, we set its backpressure flag"
-      (if (not @(:backpressure worker))
-        (do (reset! (:backpressure worker) true)
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))  ;; set backpressure no matter how the executors are
+      (reset! (:transfer-backpressure worker) true)
+      (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
     (fn []
-      "If worker's queue is below low watermark, we do nothing since we want the
-      WorkerBackPressureThread to also check for all the executors' status"
-      )))
+      (reset! (:transfer-backpressure worker) false)
+      (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
 
 (defn mk-transfer-fn [worker]
   (let [local-tasks (-> worker :task-ids set)
@@ -171,8 +169,8 @@
                         (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
                      ))))
 
-              (local-transfer local)
-              (disruptor/publish transfer-queue remoteMap)))]
+              (when (not (.isEmpty local)) (local-transfer local))
+              (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
     (if try-serialize-local
       (do
         (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
@@ -290,6 +288,7 @@
       :transfer-fn (mk-transfer-fn <>)
       :assignment-versions assignment-versions
       :backpressure (atom false) ;; whether this worker is going slow
+      :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
       :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
       :throttle-on (atom false) ;; whether throttle is activated for spouts
       )))

http://git-wip-us.apache.org/repos/asf/storm/blob/85a3b3eb/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 72590e5..3e71bca 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -69,6 +69,7 @@ public class DisruptorQueue implements IStatefulObject {
     private int _highWaterMark = 0;
     private int _lowWaterMark = 0;
     private boolean _enableBackpressure = false;
+    private volatile boolean _throttleOn = false;
 
     public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, long timeout) {
         this._queueName = PREFIX + queueName;
@@ -141,7 +142,10 @@ public class DisruptorQueue implements IStatefulObject {
                     handler.onEvent(o, curr, curr == cursor);
                     if (_enableBackpressure && _cb != null && _metrics.writePos() - curr <= _lowWaterMark) {
                         try {
-                            _cb.lowWaterMark();
+                            if (_throttleOn) {
+                                _throttleOn = false;
+                                _cb.lowWaterMark();
+                            }
                         } catch (Exception e) {
                             throw new RuntimeException("Exception during calling lowWaterMark callback!");
                         }
@@ -208,7 +212,10 @@ public class DisruptorQueue implements IStatefulObject {
         _metrics.notifyArrivals(1);
         if (_enableBackpressure && _cb != null && _metrics.population() >= _highWaterMark) {
            try {
-               _cb.highWaterMark();
+               if (!_throttleOn) {
+                   _cb.highWaterMark();
+                   _throttleOn = true;
+               }
            } catch (Exception e) {
                throw new RuntimeException("Exception during calling highWaterMark callback!");
            }


[2/3] storm git commit: Merge branch 'STORM-1087' of https://github.com/revans2/incubator-storm into STORM-1087

Posted by bo...@apache.org.
Merge branch 'STORM-1087' of https://github.com/revans2/incubator-storm into STORM-1087

STORM-1087: Avoid issues with transfer-queue backpressure.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/960743ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/960743ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/960743ef

Branch: refs/heads/master
Commit: 960743ef26b82cfd6579382abcc765b45f57b8eb
Parents: 86f2d03 85a3b3e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 9 14:01:35 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Oct 9 14:01:35 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 27 ++++++++++----------
 .../backtype/storm/utils/DisruptorQueue.java    | 11 ++++++--
 2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/960743ef/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------


[3/3] storm git commit: Added STORM-1087 to changelog

Posted by bo...@apache.org.
Added STORM-1087 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ada8097e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ada8097e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ada8097e

Branch: refs/heads/master
Commit: ada8097e16bc70af37f04e9472935e4f1b7d5f9b
Parents: 960743e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 9 14:02:01 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Oct 9 14:02:01 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ada8097e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe367a4..7fe2c1c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1087: Avoid issues with transfer-queue backpressure.
  * STORM-893: Resource Aware Scheduling (Experimental)
  * STORM-1095: Tuple.getSourceGlobalStreamid() has wrong camel-case naming
  * STORM-1091: Add unit test for tick tuples to HiveBolt and HdfsBolt