You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/09 21:20:31 UTC
[1/3] storm git commit: STORM-1087: Avoid issues with transfer-queue
backpressure.
Repository: storm
Updated Branches:
refs/heads/master 86f2d03c2 -> ada8097e1
STORM-1087: Avoid issues with transfer-queue backpressure.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85a3b3eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85a3b3eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85a3b3eb
Branch: refs/heads/master
Commit: 85a3b3eb63f3285043d57762caf0ec5629c2d3c8
Parents: 7cf4d25
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 5 11:48:11 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 5 11:48:11 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/worker.clj | 27 ++++++++++----------
.../backtype/storm/utils/DisruptorQueue.java | 11 ++++++--
2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/85a3b3eb/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 781a959..fca64d1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -123,12 +123,13 @@
port (:port worker)
storm-cluster-state (:storm-cluster-state worker)
prev-backpressure-flag @(:backpressure worker)]
- (if executors
- (if (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))
- (reset! (:backpressure worker) true) ;; at least one executor has set backpressure
- (reset! (:backpressure worker) false))) ;; no executor has backpressure set
+ (when executors
+ (reset! (:backpressure worker)
+ (or @(:transfer-backpressure worker)
+ (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))))
;; update the worker's backpressure flag to zookeeper only when it has changed
- (if (not= prev-backpressure-flag @(:backpressure worker))
+ (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag)
+ (when (not= prev-backpressure-flag @(:backpressure worker))
(.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker)))
))))
@@ -137,14 +138,11 @@
check highWaterMark and lowWaterMark for backpressure"
(disruptor/disruptor-backpressure-handler
(fn []
- "When worker's queue is above highWaterMark, we set its backpressure flag"
- (if (not @(:backpressure worker))
- (do (reset! (:backpressure worker) true)
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))))) ;; set backpressure no matter how the executors are
+ (reset! (:transfer-backpressure worker) true)
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
(fn []
- "If worker's queue is below low watermark, we do nothing since we want the
- WorkerBackPressureThread to also check for all the executors' status"
- )))
+ (reset! (:transfer-backpressure worker) false)
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
(defn mk-transfer-fn [worker]
(let [local-tasks (-> worker :task-ids set)
@@ -171,8 +169,8 @@
(log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
))))
- (local-transfer local)
- (disruptor/publish transfer-queue remoteMap)))]
+ (when (not (.isEmpty local)) (local-transfer local))
+ (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
(if try-serialize-local
(do
(log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
@@ -290,6 +288,7 @@
:transfer-fn (mk-transfer-fn <>)
:assignment-versions assignment-versions
:backpressure (atom false) ;; whether this worker is going slow
+ :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
:backpressure-trigger (atom false) ;; a trigger for synchronization with executors
:throttle-on (atom false) ;; whether throttle is activated for spouts
)))
http://git-wip-us.apache.org/repos/asf/storm/blob/85a3b3eb/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 72590e5..3e71bca 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -69,6 +69,7 @@ public class DisruptorQueue implements IStatefulObject {
private int _highWaterMark = 0;
private int _lowWaterMark = 0;
private boolean _enableBackpressure = false;
+ private volatile boolean _throttleOn = false;
public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, long timeout) {
this._queueName = PREFIX + queueName;
@@ -141,7 +142,10 @@ public class DisruptorQueue implements IStatefulObject {
handler.onEvent(o, curr, curr == cursor);
if (_enableBackpressure && _cb != null && _metrics.writePos() - curr <= _lowWaterMark) {
try {
- _cb.lowWaterMark();
+ if (_throttleOn) {
+ _throttleOn = false;
+ _cb.lowWaterMark();
+ }
} catch (Exception e) {
throw new RuntimeException("Exception during calling lowWaterMark callback!");
}
@@ -208,7 +212,10 @@ public class DisruptorQueue implements IStatefulObject {
_metrics.notifyArrivals(1);
if (_enableBackpressure && _cb != null && _metrics.population() >= _highWaterMark) {
try {
- _cb.highWaterMark();
+ if (!_throttleOn) {
+ _cb.highWaterMark();
+ _throttleOn = true;
+ }
} catch (Exception e) {
throw new RuntimeException("Exception during calling highWaterMark callback!");
}
[2/3] storm git commit: Merge branch 'STORM-1087' of
https://github.com/revans2/incubator-storm into STORM-1087
Posted by bo...@apache.org.
Merge branch 'STORM-1087' of https://github.com/revans2/incubator-storm into STORM-1087
STORM-1087: Avoid issues with transfer-queue backpressure.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/960743ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/960743ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/960743ef
Branch: refs/heads/master
Commit: 960743ef26b82cfd6579382abcc765b45f57b8eb
Parents: 86f2d03 85a3b3e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 9 14:01:35 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Oct 9 14:01:35 2015 -0500
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/worker.clj | 27 ++++++++++----------
.../backtype/storm/utils/DisruptorQueue.java | 11 ++++++--
2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/960743ef/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-1087 to changelog
Posted by bo...@apache.org.
Added STORM-1087 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ada8097e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ada8097e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ada8097e
Branch: refs/heads/master
Commit: ada8097e16bc70af37f04e9472935e4f1b7d5f9b
Parents: 960743e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 9 14:02:01 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Oct 9 14:02:01 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ada8097e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe367a4..7fe2c1c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1087: Avoid issues with transfer-queue backpressure.
* STORM-893: Resource Aware Scheduling (Experimental)
* STORM-1095: Tuple.getSourceGlobalStreamid() has wrong camel-case naming
* STORM-1091: Add unit test for tick tuples to HiveBolt and HdfsBolt