You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/31 09:37:19 UTC

[1/5] storm git commit: STORM-2039: backpressure refactoring in worker and executor

Repository: storm
Updated Branches:
  refs/heads/master f547f3faf -> bcadf0461


STORM-2039: backpressure refactoring in worker and executor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/820c2684
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/820c2684
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/820c2684

Branch: refs/heads/master
Commit: 820c2684809a837f8a58d902d4408de7e1cd79ed
Parents: a1a952a
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Mon Aug 15 14:48:40 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Mon Aug 15 14:56:42 2016 -0500

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/worker.clj  | 17 +++++------------
 .../jvm/org/apache/storm/executor/Executor.java | 20 ++++++--------------
 .../apache/storm/executor/ExecutorShutdown.java |  2 +-
 3 files changed, 12 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/820c2684/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 781558c..0c4cc46 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -168,10 +168,10 @@
   check highWaterMark and lowWaterMark for backpressure"
   (reify DisruptorBackpressureCallback
     (highWaterMark [this]
-      (reset! (:transfer-backpressure worker) true)
+      (log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true")
       (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
     (lowWaterMark [this]
-      (reset! (:transfer-backpressure worker) false)
+      (log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false")
       (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))
 
 (defn mk-transfer-fn [worker]
@@ -331,7 +331,6 @@
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
       :backpressure (AtomicBoolean. false) ;; whether this worker is going slow
-      :transfer-backpressure (AtomicBoolean. false) ;; if the transfer queue is backed-up
       :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
       :throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
       )))
@@ -699,11 +698,11 @@
         backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) 
             (.start backpressure-thread))
-        callback (fn cb []
+        topology-backpressure-callback (fn cb []
                    (let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)]
                      (.set (:throttle-on worker) throttle-on)))
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (.topologyBackpressure storm-cluster-state storm-id callback))
+            (.topologyBackpressure storm-cluster-state storm-id topology-backpressure-callback))
 
         shutdown* (fn []
                     (log-message "Shutting down worker " storm-id " " assignment-id " " port)
@@ -770,12 +769,6 @@
                                         (AuthUtils/updateSubject subject auto-creds new-creds)
                                         (dofor [e @executors] (.credenetialsChanged e new-creds))
                                         (reset! credentials new-creds))))
-       check-throttle-changed (fn []
-                                (let [callback (fn cb []
-                                                 (let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
-                                                   (.set (:throttle-on worker) throttle-on)))
-                                      new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)]
-                                    (.set (:throttle-on worker) new-throttle-on)))
         check-log-config-changed (fn []
                                   (let [log-config (.topologyLogConfig (:storm-cluster-state worker) storm-id nil)]
                                     (process-log-config-change latest-log-config original-log-levels log-config)
@@ -794,7 +787,7 @@
         (fn []
           (check-credentials-changed)
           (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (check-throttle-changed))))
+            (topology-backpressure-callback))))
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
       (.scheduleRecurringWithJitter

http://git-wip-us.apache.org/repos/asf/storm/blob/820c2684/storm-core/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/Executor.java b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
index e9041f2..582e976 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
@@ -100,7 +100,6 @@ public abstract class Executor implements Callable, EventHandler<Object> {
     protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
     protected final ReportErrorAndDie reportErrorDie;
     protected final Callable<Boolean> sampler;
-    protected final AtomicBoolean backpressure;
     protected ExecutorTransfer executorTransfer;
     protected final String type;
     protected final AtomicBoolean throttleOn;
@@ -162,7 +161,6 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext);
         this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
         this.sampler = ConfigUtils.mkStatsSampler(stormConf);
-        this.backpressure = new AtomicBoolean(false);
         this.throttleOn = (AtomicBoolean) workerData.get(Constants.THROTTLE_ON);
         this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
         this.rand = new Random(Utils.secureRandomLong());
@@ -341,20 +339,14 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
             @Override
             public void highWaterMark() throws Exception {
-                if (!backpressure.get()) {
-                    backpressure.set(true);
-                    LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
-                    WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
-                }
+                LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
+                WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
             }
 
             @Override
             public void lowWaterMark() throws Exception {
-                if (backpressure.get()) {
-                    backpressure.set(false);
-                    LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
-                    WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
-                }
+                LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
+                WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
             }
         });
         receiveQueue.setHighWaterMark(Utils.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
@@ -535,8 +527,8 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         return receiveQueue;
     }
 
-    public AtomicBoolean getBackpressure() {
-        return backpressure;
+    public boolean getBackpressure() {
+        return receiveQueue.getThrottleOn();
     }
 
     public DisruptorQueue getTransferWorkerQueue() {

http://git-wip-us.apache.org/repos/asf/storm/blob/820c2684/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 795a33c..a3772f6 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -70,7 +70,7 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
 
     @Override
     public boolean getBackPressureFlag() {
-        return executor.getBackpressure().get();
+        return executor.getBackpressure();
     }
 
     @Override


[5/5] storm git commit: add STORM-2039 to CHANGELOG

Posted by ka...@apache.org.
add STORM-2039 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bcadf046
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bcadf046
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bcadf046

Branch: refs/heads/master
Commit: bcadf0461f1819d54cd3e8bde321c8f8a7775789
Parents: a149adf
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:16:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:16:20 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bcadf046/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2734e77..083b961 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -186,6 +186,7 @@
  * STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
 
 ## 1.0.3
+ * STORM-2039: Backpressure refactoring in worker and executor
  * STORM-2064: Add storm name and function, access result and function to log-thrift-access
  * STORM-2063: Add thread name in worker logs
  * STORM-2047: Add note to add logviewer hosts to browser whitelist


[4/5] storm git commit: Merge branch 'STORM-2039_backpressure_refactoring_in_worker_and_executor_2x' of https://github.com/abellina/storm into STORM-2039

Posted by ka...@apache.org.
Merge branch 'STORM-2039_backpressure_refactoring_in_worker_and_executor_2x' of https://github.com/abellina/storm into STORM-2039


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a149adf0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a149adf0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a149adf0

Branch: refs/heads/master
Commit: a149adf02b6278851b7be591377341bbb4bd56be
Parents: f547f3f d090d87
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 31 18:15:21 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 31 18:15:21 2016 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 .../src/clj/org/apache/storm/daemon/worker.clj  | 27 +++++++++-----------
 storm-core/src/jvm/org/apache/storm/Config.java |  7 +++++
 .../jvm/org/apache/storm/executor/Executor.java | 20 +++++----------
 .../apache/storm/executor/ExecutorShutdown.java |  2 +-
 5 files changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------



[3/5] storm git commit: STORM-2039: split out backpressure worker timer into its own

Posted by ka...@apache.org.
STORM-2039: split out backpressure worker timer into its own


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d090d877
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d090d877
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d090d877

Branch: refs/heads/master
Commit: d090d877f002ecd8be831639d2c29e95f820e7dc
Parents: 96b3325
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 22:25:52 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Wed Aug 17 08:03:58 2016 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                                    |  1 +
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 10 +++++++---
 storm-core/src/jvm/org/apache/storm/Config.java       |  7 +++++++
 3 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e92c11d..8392936 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -185,6 +185,7 @@ topology.worker.receiver.thread.count: 1
 task.heartbeat.frequency.secs: 3
 task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
+task.backpressure.poll.secs: 30
 
 # now should be null by default
 topology.backpressure.enable: false

http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 2b6843c..222de36 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -306,6 +306,7 @@
       :reset-log-levels-timer (mk-halting-timer "reset-log-levels-timer")
       :refresh-active-timer (mk-halting-timer "refresh-active-timer")
       :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+      :refresh-backpressure-timer (mk-halting-timer "refresh-backpressure-timer")
       :user-timer (mk-halting-timer "user-timer")
       :task->component (StormCommon/stormTaskInfo topology storm-conf) ; for optimized access when used in tasks later on
       :component->stream->fields (component->stream->fields (:system-topology <>))
@@ -780,14 +781,17 @@
       (.topologyLogConfig (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed))))
 
     (establish-log-setting-callback)
+
     (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id (fn [] (check-credentials-changed))))
 
     (.scheduleRecurring
       (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
         (fn []
-          (check-credentials-changed)
-          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (topology-backpressure-callback))))
+          (check-credentials-changed)))
+
+    (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+      (.scheduleRecurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
+
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
       (.scheduleRecurringWithJitter

http://git-wip-us.apache.org/repos/asf/storm/blob/d090d877/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index f27f966..43369bd 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1497,6 +1497,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 
     /**
+     * How often to poll for changed topology backpressure flag from ZK
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
+
+    /**
      * Whether to enable backpressure in for a certain topology
      */
     @isBoolean


[2/5] storm git commit: STORM-2039: make :backpressure-trigger an Object instead of an atom

Posted by ka...@apache.org.
STORM-2039: make :backpressure-trigger an Object instead of an atom


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96b3325c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96b3325c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96b3325c

Branch: refs/heads/master
Commit: 96b3325ccdf7ac7649a1425d176c59308811954c
Parents: 820c268
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Aug 16 21:23:19 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Tue Aug 16 22:59:56 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/worker.clj | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/96b3325c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 0c4cc46..2b6843c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -331,7 +331,7 @@
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
       :backpressure (AtomicBoolean. false) ;; whether this worker is going slow
-      :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
+      :backpressure-trigger (Object.) ;; a trigger for synchronization with executors
       :throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
       )))