You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/09/23 00:23:38 UTC

[13/50] storm git commit: Address the comments on config, notifyBPC and upmerge, etc.

Address the comments on config, notifyBPC and upmerge, etc.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5374036e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5374036e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5374036e

Branch: refs/heads/STORM-1040
Commit: 5374036ed5764435f5eba6e265c036a97788db6c
Parents: cf50518
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Sep 18 12:23:43 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Sep 18 12:23:43 2015 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  6 ++--
 .../src/clj/backtype/storm/daemon/executor.clj  | 11 +++---
 .../src/clj/backtype/storm/daemon/worker.clj    |  6 ++--
 storm-core/src/clj/backtype/storm/disruptor.clj |  4 ---
 storm-core/src/jvm/backtype/storm/Config.java   | 37 +++++---------------
 .../backtype/storm/utils/DisruptorQueue.java    | 11 ------
 .../storm/utils/WorkerBackpressureThread.java   | 10 ++++++
 .../utils/DisruptorQueueBackpressureTest.java   |  6 ++--
 8 files changed, 32 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8434a1f..68cc724 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -147,10 +147,8 @@ task.credentials.poll.secs: 30
 
 # now should be null by default
 topology.backpressure.enable: true
-backpressure.worker.high.watermark: 0.9
-backpressure.worker.low.watermark: 0.4
-backpressure.executor.high.watermark: 0.9
-backpressure.executor.low.watermark: 0.4
+backpressure.disruptor.high.watermark: 0.9
+backpressure.disruptor.low.watermark: 0.4
 
 zmq.threads: 1
 zmq.linger.millis: 5000

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 29afa2d..0683f38 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -28,7 +28,7 @@
   (:import [backtype.storm.grouping CustomStreamGrouping])
   (:import [backtype.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
   (:import [backtype.storm.generated GlobalStreamId])
-  (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue])
+  (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
   (:import [com.lmax.disruptor InsufficientCapacityException])
   (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer])
   (:import [backtype.storm.daemon Shutdownable])
@@ -278,12 +278,12 @@
       (if (not @(:backpressure executor-data))
         (do (reset! (:backpressure executor-data) true)
             (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
-            (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data))))))
+            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
     (fn []
       "When receive queue is below lowWaterMark"
       (if @(:backpressure executor-data)
         (do (reset! (:backpressure executor-data) false)
-            (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data))))))))
+            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
 
 (defn start-batch-transfer->worker-handler! [worker executor-data]
   (let [worker-transfer-fn (:transfer-fn worker)
@@ -377,8 +377,8 @@
 
         disruptor-handler (mk-disruptor-backpressure-handler executor-data)
         _ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
-        _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-WORKER-HIGH-WATERMARK))
-              (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-WORKER-LOW-WATERMARK))
+        _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
+              (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
               (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
 
         ;; starting the batch-transfer->worker ensures that anything publishing to that queue 
@@ -645,7 +645,6 @@
                     (log-message "Activating spout " component-id ":" (keys task-datas))
                     (fast-list-iter [^ISpout spout spouts] (.activate spout)))
                
-                    ;; (log-message "Spout executor " (:executor-id executor-data) " found throttle-on, now suspends sending tuples")
                   (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
                 (do
                   (when @last-active

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e19b2c2..f795daa 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -140,7 +140,7 @@
       "When worker's queue is above highWaterMark, we set its backpressure flag"
       (if (not @(:backpressure worker))
         (do (reset! (:backpressure worker) true)
-            (DisruptorQueue/notifyBackpressureChecker (:backpressure-trigger worker)))))  ;; set backpressure no matter how the executors are
+            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))  ;; set backpressure no matter how the executors are
     (fn []
       "If worker's queue is below low watermark, we do nothing since we want the
       WorkerBackPressureThread to also check for all the executors' status"
@@ -493,8 +493,8 @@
 
         disruptor-handler (mk-disruptor-backpressure-handler worker)
         _ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)
-        _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-WORKER-HIGH-WATERMARK))
-              (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-WORKER-LOW-WATERMARK))
+        _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
+              (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
               (.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)))
         backpressure-handler (mk-backpressure-handler @executors)        
         backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 7ea9725..25e0050 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -88,10 +88,6 @@
   [^DisruptorQueue q o]
   (.tryPublish q o))
 
-(defn notify-backpressure-checker
-  [trigger]
-  (DisruptorQueue/notifyBackpressureChecker trigger))
-
 (defn consume-batch
   [^DisruptorQueue queue handler]
   (.consumeBatch queue handler))

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 52c1cc8..5bc31f4 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1053,40 +1053,21 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_BACKPRESSURE_ENABLE_SCHEMA = Boolean.class;
 
     /**
-     * This signifies the tuple congestion in a worker's out-going queue.
-     * When the used ratio of a worker's outgoing queue is higher than the high watermark,
+     * This signifies the tuple congestion in a disruptor queue.
+     * When the used ratio of a disruptor queue is higher than the high watermark,
      * the backpressure scheme, if enabled, should slow down the tuple sending speed of
      * the spouts until reaching the low watermark.
      */
-    public static final String BACKPRESSURE_WORKER_HIGH_WATERMARK="backpressure.worker.high.watermark";
-    public static final Object BACKPRESSURE_WORKER_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
+    public static final String BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK="backpressure.disruptor.high.watermark";
+    public static final Object BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
 
     /**
-     * This signifies a state that a worker has left the congestion.
-     * If the used ratio of a worker's outgoing queue is lower than the low watermark,
-     * it notifies the worker to check whether all its executors have also left congestion,
-     * if yes, it will unset the worker's backpressure flag on the Zookeeper
+     * This signifies a state that a disruptor queue has left the congestion.
+     * If the used ratio of a disruptor queue is lower than the low watermark,
+     * it will unset the backpressure flag.
      */
-    public static final String BACKPRESSURE_WORKER_LOW_WATERMARK="backpressure.worker.low.watermark";
-    public static final Object BACKPRESSURE_WORKER_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
-
-    /**
-     * This signifies the tuple congestion in an executor's receiving queue.
-     * When the used ratio of an executor's receiving queue is higher than the high watermark,
-     * the backpressure scheme, if enabled, should slow down the tuple sending speed of
-     * the spouts until reaching the low watermark.
-     */
-    public static final String BACKPRESSURE_EXECUTOR_HIGH_WATERMARK="backpressure.executor.high.watermark";
-    public static final Object BACKPRESSURE_EXECUTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
-
-    /**
-     * This signifies a state that an executor has left the congestion.
-     * If the used ratio of an execuotr's receive queue is lower than the low watermark,
-     * it may notify the worker to check whether all its executors have also left congestion,
-     * if yes, the worker's backpressure flag will be unset on the Zookeeper
-     */
-    public static final String BACKPRESSURE_EXECUTOR_LOW_WATERMARK="backpressure.executor.low.watermark";
-    public static final Object BACKPRESSURE_EXECUTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
+    public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
+    public static final Object BACKPRESSURE_DISRUPTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
 
     /**
      * A list of users that are allowed to interact with the topology.  To use this set

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 54bc7c0..e0053e9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -37,7 +37,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import backtype.storm.metric.api.IStatefulObject;
-import org.jgrapht.graph.DirectedSubgraph;
 
 
 /**
@@ -160,16 +159,6 @@ public class DisruptorQueue implements IStatefulObject {
         this._cb = cb;
     }
 
-    static public void notifyBackpressureChecker(Object trigger) {
-        try {
-            synchronized (trigger) {
-                trigger.notifyAll();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     /*
      * Caches until consumerStarted is called, upon which the cache is flushed to the consumer
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
index 3156d8a..dcc7328 100644
--- a/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
@@ -33,6 +33,16 @@ public class WorkerBackpressureThread extends Thread {
         this.callback = callback;
     }
 
+    static public void notifyBackpressureChecker(Object trigger) {
+        try {
+            synchronized (trigger) {
+                trigger.notifyAll();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public void run() {
         try {
             while (true) {

http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
index b2986fd..197744f 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
@@ -94,7 +94,7 @@ public class DisruptorQueueBackpressureTest extends TestCase {
         @Override
         public void highWaterMark() throws Exception {
             if (!throttleOn.get()) {
-                highWaterMarkCalledPopulation = queue.population();
+                highWaterMarkCalledPopulation = queue.getMetrics().population();
                 throttleOn.set(true);
             }
         }
@@ -102,7 +102,7 @@ public class DisruptorQueueBackpressureTest extends TestCase {
         @Override
         public void lowWaterMark() throws Exception {
              if (throttleOn.get()) {
-                 lowWaterMarkCalledPopulation = queue.writePos() - consumerCursor.get();
+                 lowWaterMarkCalledPopulation = queue.getMetrics().writePos() - consumerCursor.get();
                  throttleOn.set(false);
              }
         }
@@ -112,4 +112,4 @@ public class DisruptorQueueBackpressureTest extends TestCase {
         return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
                 queueSize), new BlockingWaitStrategy(), 10L);
     }
-}
\ No newline at end of file
+}