You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:19 UTC
[1/5] storm git commit: STORM-2039: backpressure refactoring in
worker and executor
Repository: storm
Updated Branches:
refs/heads/master f547f3faf -> bcadf0461
STORM-2039: backpressure refactoring in worker and executor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/820c2684
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/820c2684
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/820c2684
Branch: refs/heads/master
Commit: 820c2684809a837f8a58d902d4408de7e1cd79ed
Parents: a1a952a
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Mon Aug 15 14:48:40 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Mon Aug 15 14:56:42 2016 -0500
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/worker.clj | 17 +++++------------
.../jvm/org/apache/storm/executor/Executor.java | 20 ++++++--------------
.../apache/storm/executor/ExecutorShutdown.java | 2 +-
3 files changed, 12 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/820c2684/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 781558c..0c4cc46 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -168,10 +168,10 @@
check highWaterMark and lowWaterMark for backpressure"
(reify DisruptorBackpressureCallback
(highWaterMark [this]
- (reset! (:transfer-backpressure worker) true)
+ (log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
(lowWaterMark [this]
- (reset! (:transfer-backpressure worker) false)
+ (log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
(defn mk-transfer-fn [worker]
@@ -331,7 +331,6 @@
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
:backpressure (AtomicBoolean. false) ;; whether this worker is going slow
- :transfer-backpressure (AtomicBoolean. false) ;; if the transfer queue is backed-up
:backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
:throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
)))
@@ -699,11 +698,11 @@
backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.start backpressure-thread))
- callback (fn cb []
+ topology-backpressure-callback (fn cb []
(let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)]
(.set (:throttle-on worker) throttle-on)))
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (.topologyBackpressure storm-cluster-state storm-id callback))
+ (.topologyBackpressure storm-cluster-state storm-id topology-backpressure-callback))
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " assignment-id " " port)
@@ -770,12 +769,6 @@
(AuthUtils/updateSubject subject auto-creds new-creds)
(dofor [e @executors] (.credenetialsChanged e new-creds))
(reset! credentials new-creds))))
- check-throttle-changed (fn []
- (let [callback (fn cb []
- (let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
- (.set (:throttle-on worker) throttle-on)))
- new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)]
- (.set (:throttle-on worker) new-throttle-on)))
check-log-config-changed (fn []
(let [log-config (.topologyLogConfig (:storm-cluster-state worker) storm-id nil)]
(process-log-config-change latest-log-config original-log-levels log-config)
@@ -794,7 +787,7 @@
(fn []
(check-credentials-changed)
(if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (check-throttle-changed))))
+ (topology-backpressure-callback))))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(.scheduleRecurringWithJitter
http://git-wip-us.apache.org/repos/asf/storm/blob/820c2684/storm-core/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/Executor.java b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
index e9041f2..582e976 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
@@ -100,7 +100,6 @@ public abstract class Executor implements Callable, EventHandler<Object> {
protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
protected final ReportErrorAndDie reportErrorDie;
protected final Callable<Boolean> sampler;
- protected final AtomicBoolean backpressure;
protected ExecutorTransfer executorTransfer;
protected final String type;
protected final AtomicBoolean throttleOn;
@@ -162,7 +161,6 @@ public abstract class Executor implements Callable, EventHandler<Object> {
this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext);
this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
this.sampler = ConfigUtils.mkStatsSampler(stormConf);
- this.backpressure = new AtomicBoolean(false);
this.throttleOn = (AtomicBoolean) workerData.get(Constants.THROTTLE_ON);
this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
this.rand = new Random(Utils.secureRandomLong());
@@ -341,20 +339,14 @@ public abstract class Executor implements Callable, EventHandler<Object> {
receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
@Override
public void highWaterMark() throws Exception {
- if (!backpressure.get()) {
- backpressure.set(true);
- LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
- WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
- }
+ LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
+ WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
}
@Override
public void lowWaterMark() throws Exception {
- if (backpressure.get()) {
- backpressure.set(false);
- LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
- WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
- }
+ LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
+ WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
}
});
receiveQueue.setHighWaterMark(Utils.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
@@ -535,8 +527,8 @@ public abstract class Executor implements Callable, EventHandler<Object> {
return receiveQueue;
}
- public AtomicBoolean getBackpressure() {
- return backpressure;
+ public boolean getBackpressure() {
+ return receiveQueue.getThrottleOn();
}
public DisruptorQueue getTransferWorkerQueue() {
http://git-wip-us.apache.org/repos/asf/storm/blob/820c2684/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 795a33c..a3772f6 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -70,7 +70,7 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
@Override
public boolean getBackPressureFlag() {
- return executor.getBackpressure().get();
+ return executor.getBackpressure();
}
@Override
[5/5] storm git commit: add STORM-2039 to CHANGELOG
Posted by ka...@apache.org.
add STORM-2039 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bcadf046
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bcadf046
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bcadf046
Branch: refs/heads/master
Commit: bcadf0461f1819d54cd3e8bde321c8f8a7775789
Parents: a149adf
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:16:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:16:20 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bcadf046/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2734e77..083b961 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -186,6 +186,7 @@
* STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
## 1.0.3
+ * STORM-2039: Backpressure refactoring in worker and executor
* STORM-2064: Add storm name and function, access result and function to log-thrift-access
* STORM-2063: Add thread name in worker logs
* STORM-2047: Add note to add logviewer hosts to browser whitelist
[4/5] storm git commit: Merge branch
'STORM-2039_backpressure_refactoring_in_worker_and_executor_2x' of
https://github.com/abellina/storm into STORM-2039
Posted by ka...@apache.org.
Merge branch 'STORM-2039_backpressure_refactoring_in_worker_and_executor_2x' of https://github.com/abellina/storm into STORM-2039
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a149adf0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a149adf0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a149adf0
Branch: refs/heads/master
Commit: a149adf02b6278851b7be591377341bbb4bd56be
Parents: f547f3f d090d87
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:15:21 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:15:21 2016 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
.../src/clj/org/apache/storm/daemon/worker.clj | 27 +++++++++-----------
storm-core/src/jvm/org/apache/storm/Config.java | 7 +++++
.../jvm/org/apache/storm/executor/Executor.java | 20 +++++----------
.../apache/storm/executor/ExecutorShutdown.java | 2 +-
5 files changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
[3/5] storm git commit: STORM-2039: split out backpressure worker
timer into its own
Posted by ka...@apache.org.
STORM-2039: split out backpressure worker timer into its own
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d090d877
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d090d877
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d090d877
Branch: refs/heads/master
Commit: d090d877f002ecd8be831639d2c29e95f820e7dc
Parents: 96b3325
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Wed Aug 17 08:03:58 2016 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 10 +++++++---
storm-core/src/jvm/org/apache/storm/Config.java | 7 +++++++
3 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e92c11d..8392936 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -185,6 +185,7 @@ topology.worker.receiver.thread.count: 1
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
# now should be null by default
topology.backpressure.enable: false
http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 2b6843c..222de36 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -306,6 +306,7 @@
:reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
:refresh-active-timer (mk-halting-timer "refresh-active-timer")
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+ :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
:user-timer (mk-halting-timer "user-timer")
:task->component (StormCommon/stormTaskInfo topology storm-conf) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
@@ -780,14 +781,17 @@
(.topologyLogConfig (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed))))
(establish-log-setting-callback)
+
(clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id (fn [] (check-credentials-changed))))
(.scheduleRecurring
(:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
(fn []
- (check-credentials-changed)
- (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (topology-backpressure-callback))))
+ (check-credentials-changed)))
+
+ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (.scheduleRecurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
+
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
(.scheduleRecurringWithJitter
http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index f27f966..43369bd 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1497,6 +1497,13 @@ public class Config extends HashMap<String, Object> {
public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
/**
+ * How often to poll for changed topology backpressure flag from ZK
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+ /**
* Whether to enable backpressure in for a certain topology
*/
@isBoolean
[2/5] storm git commit: STORM-2039: make :backpressure-trigger an
Object instead of an atom
Posted by ka...@apache.org.
STORM-2039: make :backpressure-trigger an Object instead of an atom
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96b3325c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96b3325c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96b3325c
Branch: refs/heads/master
Commit: 96b3325ccdf7ac7649a1425d176c59308811954c
Parents: 820c268
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 21:23:19 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Tue Aug 16 22:59:56 2016 -0500
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/worker.clj | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/96b3325c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 0c4cc46..2b6843c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -331,7 +331,7 @@
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
:backpressure (AtomicBoolean. false) ;; whether this worker is going slow
- :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
+ :backpressure-trigger (Object.) ;; a trigger for synchronization with executors
:throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
)))