You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/09/23 00:23:38 UTC
[13/50] storm git commit: Address the comments on config,
notifyBPC and upmerge, etc.
Address the comments on config, notifyBPC and upmerge, etc.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5374036e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5374036e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5374036e
Branch: refs/heads/STORM-1040
Commit: 5374036ed5764435f5eba6e265c036a97788db6c
Parents: cf50518
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Sep 18 12:23:43 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Sep 18 12:23:43 2015 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 6 ++--
.../src/clj/backtype/storm/daemon/executor.clj | 11 +++---
.../src/clj/backtype/storm/daemon/worker.clj | 6 ++--
storm-core/src/clj/backtype/storm/disruptor.clj | 4 ---
storm-core/src/jvm/backtype/storm/Config.java | 37 +++++---------------
.../backtype/storm/utils/DisruptorQueue.java | 11 ------
.../storm/utils/WorkerBackpressureThread.java | 10 ++++++
.../utils/DisruptorQueueBackpressureTest.java | 6 ++--
8 files changed, 32 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8434a1f..68cc724 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -147,10 +147,8 @@ task.credentials.poll.secs: 30
# now should be null by default
topology.backpressure.enable: true
-backpressure.worker.high.watermark: 0.9
-backpressure.worker.low.watermark: 0.4
-backpressure.executor.high.watermark: 0.9
-backpressure.executor.low.watermark: 0.4
+backpressure.disruptor.high.watermark: 0.9
+backpressure.disruptor.low.watermark: 0.4
zmq.threads: 1
zmq.linger.millis: 5000
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 29afa2d..0683f38 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -28,7 +28,7 @@
(:import [backtype.storm.grouping CustomStreamGrouping])
(:import [backtype.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
(:import [backtype.storm.generated GlobalStreamId])
- (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue])
+ (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
(:import [com.lmax.disruptor InsufficientCapacityException])
(:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer])
(:import [backtype.storm.daemon Shutdownable])
@@ -278,12 +278,12 @@
(if (not @(:backpressure executor-data))
(do (reset! (:backpressure executor-data) true)
(log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
- (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data))))))
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
(fn []
"When receive queue is below lowWaterMark"
(if @(:backpressure executor-data)
(do (reset! (:backpressure executor-data) false)
- (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data))))))))
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
(defn start-batch-transfer->worker-handler! [worker executor-data]
(let [worker-transfer-fn (:transfer-fn worker)
@@ -377,8 +377,8 @@
disruptor-handler (mk-disruptor-backpressure-handler executor-data)
_ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
- _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-WORKER-HIGH-WATERMARK))
- (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-WORKER-LOW-WATERMARK))
+ _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
+ (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
(.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
;; starting the batch-transfer->worker ensures that anything publishing to that queue
@@ -645,7 +645,6 @@
(log-message "Activating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
- ;; (log-message "Spout executor " (:executor-id executor-data) " found throttle-on, now suspends sending tuples")
(fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
(do
(when @last-active
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e19b2c2..f795daa 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -140,7 +140,7 @@
"When worker's queue is above highWaterMark, we set its backpressure flag"
(if (not @(:backpressure worker))
(do (reset! (:backpressure worker) true)
- (DisruptorQueue/notifyBackpressureChecker (:backpressure-trigger worker))))) ;; set backpressure no matter how the executors are
+ (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))))) ;; set backpressure no matter how the executors are
(fn []
"If worker's queue is below low watermark, we do nothing since we want the
WorkerBackPressureThread to also check for all the executors' status"
@@ -493,8 +493,8 @@
disruptor-handler (mk-disruptor-backpressure-handler worker)
_ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)
- _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-WORKER-HIGH-WATERMARK))
- (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-WORKER-LOW-WATERMARK))
+ _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
+ (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
(.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)))
backpressure-handler (mk-backpressure-handler @executors)
backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler)
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 7ea9725..25e0050 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -88,10 +88,6 @@
[^DisruptorQueue q o]
(.tryPublish q o))
-(defn notify-backpressure-checker
- [trigger]
- (DisruptorQueue/notifyBackpressureChecker trigger))
-
(defn consume-batch
[^DisruptorQueue queue handler]
(.consumeBatch queue handler))
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 52c1cc8..5bc31f4 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1053,40 +1053,21 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_BACKPRESSURE_ENABLE_SCHEMA = Boolean.class;
/**
- * This signifies the tuple congestion in a worker's out-going queue.
- * When the used ratio of a worker's outgoing queue is higher than the high watermark,
+ * This signifies the tuple congestion in a disruptor queue.
+ * When the used ratio of a disruptor queue is higher than the high watermark,
* the backpressure scheme, if enabled, should slow down the tuple sending speed of
* the spouts until reaching the low watermark.
*/
- public static final String BACKPRESSURE_WORKER_HIGH_WATERMARK="backpressure.worker.high.watermark";
- public static final Object BACKPRESSURE_WORKER_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
+ public static final String BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK="backpressure.disruptor.high.watermark";
+ public static final Object BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
/**
- * This signifies a state that a worker has left the congestion.
- * If the used ratio of a worker's outgoing queue is lower than the low watermark,
- * it notifies the worker to check whether all its executors have also left congestion,
- * if yes, it will unset the worker's backpressure flag on the Zookeeper
+ * This signifies a state that a disruptor queue has left the congestion.
+ * If the used ratio of a disruptor queue is lower than the low watermark,
+ * it will unset the backpressure flag.
*/
- public static final String BACKPRESSURE_WORKER_LOW_WATERMARK="backpressure.worker.low.watermark";
- public static final Object BACKPRESSURE_WORKER_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
-
- /**
- * This signifies the tuple congestion in an executor's receiving queue.
- * When the used ratio of an executor's receiving queue is higher than the high watermark,
- * the backpressure scheme, if enabled, should slow down the tuple sending speed of
- * the spouts until reaching the low watermark.
- */
- public static final String BACKPRESSURE_EXECUTOR_HIGH_WATERMARK="backpressure.executor.high.watermark";
- public static final Object BACKPRESSURE_EXECUTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
-
- /**
- * This signifies a state that an executor has left the congestion.
- * If the used ratio of an execuotr's receive queue is lower than the low watermark,
- * it may notify the worker to check whether all its executors have also left congestion,
- * if yes, the worker's backpressure flag will be unset on the Zookeeper
- */
- public static final String BACKPRESSURE_EXECUTOR_LOW_WATERMARK="backpressure.executor.low.watermark";
- public static final Object BACKPRESSURE_EXECUTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
+ public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
+ public static final Object BACKPRESSURE_DISRUPTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
/**
* A list of users that are allowed to interact with the topology. To use this set
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 54bc7c0..e0053e9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -37,7 +37,6 @@ import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
-import org.jgrapht.graph.DirectedSubgraph;
/**
@@ -160,16 +159,6 @@ public class DisruptorQueue implements IStatefulObject {
this._cb = cb;
}
- static public void notifyBackpressureChecker(Object trigger) {
- try {
- synchronized (trigger) {
- trigger.notifyAll();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
/*
* Caches until consumerStarted is called, upon which the cache is flushed to the consumer
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
index 3156d8a..dcc7328 100644
--- a/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/backtype/storm/utils/WorkerBackpressureThread.java
@@ -33,6 +33,16 @@ public class WorkerBackpressureThread extends Thread {
this.callback = callback;
}
+ static public void notifyBackpressureChecker(Object trigger) {
+ try {
+ synchronized (trigger) {
+ trigger.notifyAll();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public void run() {
try {
while (true) {
http://git-wip-us.apache.org/repos/asf/storm/blob/5374036e/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
index b2986fd..197744f 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java
@@ -94,7 +94,7 @@ public class DisruptorQueueBackpressureTest extends TestCase {
@Override
public void highWaterMark() throws Exception {
if (!throttleOn.get()) {
- highWaterMarkCalledPopulation = queue.population();
+ highWaterMarkCalledPopulation = queue.getMetrics().population();
throttleOn.set(true);
}
}
@@ -102,7 +102,7 @@ public class DisruptorQueueBackpressureTest extends TestCase {
@Override
public void lowWaterMark() throws Exception {
if (throttleOn.get()) {
- lowWaterMarkCalledPopulation = queue.writePos() - consumerCursor.get();
+ lowWaterMarkCalledPopulation = queue.getMetrics().writePos() - consumerCursor.get();
throttleOn.set(false);
}
}
@@ -112,4 +112,4 @@ public class DisruptorQueueBackpressureTest extends TestCase {
return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
queueSize), new BlockingWaitStrategy(), 10L);
}
-}
\ No newline at end of file
+}