You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/17 01:12:20 UTC
[1/7] storm git commit: fix issue of high cpu usage when bolt idle
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch 56de46c3e -> 864a9cda7
fix issue of high cpu usage when bolt idle
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/72e24346
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/72e24346
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/72e24346
Branch: refs/heads/0.10.x-branch
Commit: 72e243464787f8dccd1aa32f5eb40087d6f23d02
Parents: 56de46c
Author: errordaiwa <xi...@outlook.com>
Authored: Thu Jul 9 10:40:20 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 07:47:16 2015 +0900
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/72e24346/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 932af16..1f2110d 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -94,7 +94,7 @@ public class DisruptorQueue implements IStatefulObject {
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
- final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
+ final long availableSequence = _barrier.waitFor(nextSequence, 1000, TimeUnit.MILLISECONDS);
if(availableSequence >= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
}
[3/7] storm git commit: use PositiveIntegerValidator to verify
disruptor queue wait timeout config
Posted by ka...@apache.org.
use PositiveIntegerValidator to verify disruptor queue wait timeout config
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b97a3835
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b97a3835
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b97a3835
Branch: refs/heads/0.10.x-branch
Commit: b97a38352fb4c0ed20ad16d0d77d7fb52325749c
Parents: 8b2dd5f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 17 07:54:58 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 07:54:58 2015 +0900
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b97a3835/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index b472ef7..808572f 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1398,7 +1398,7 @@ public class Config extends HashMap<String, Object> {
* vs. CPU usage
*/
public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
- public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = Number.class;
+ public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.PositiveIntegerValidator;
public static void setClasspath(Map conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
[7/7] storm git commit: add STORM-935 and STORM-503 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-935 and STORM-503 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/864a9cda
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/864a9cda
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/864a9cda
Branch: refs/heads/0.10.x-branch
Commit: 864a9cda71b3f8749ecf75e493931bc5329f8e7f
Parents: c6cd6e7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jul 17 08:11:55 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 08:11:55 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/864a9cda/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1ebd80d..511fd19 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -171,6 +171,8 @@
## 0.9.6
* STORM-763: nimbus reassigned worker A to another machine, but other worker's netty client can't connect to the new worker A
+ * STORM-935: Update Disruptor queue version to 2.10.4
+ * STORM-503: Short disruptor queue wait time leads to high CPU usage when idle
## 0.9.5
* STORM-790: Log "task is null" instead of let worker died when task is null in transfer-fn
[6/7] storm git commit: change default disruptor queue timeout to
1000ms
Posted by ka...@apache.org.
change default disruptor queue timeout to 1000ms
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c6cd6e78
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c6cd6e78
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c6cd6e78
Branch: refs/heads/0.10.x-branch
Commit: c6cd6e782ea63784c774858b2948cedda17074d4
Parents: 0594825
Author: errordaiwa <xi...@outlook.com>
Authored: Thu Jul 16 09:02:17 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 08:01:16 2015 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c6cd6e78/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 1064e21..c3fa372 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -198,6 +198,6 @@ topology.testing.always.try.serialize: false
topology.classpath: null
topology.environment: null
topology.bolts.outgoing.overflow.buffer.enable: false
-topology.disruptor.wait.timeout.millis: 10
+topology.disruptor.wait.timeout.millis: 1000
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
[2/7] storm git commit: make disruptor queue wait timeout configurable
Posted by ka...@apache.org.
make disruptor queue wait timeout configurable
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b2dd5f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b2dd5f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b2dd5f8
Branch: refs/heads/0.10.x-branch
Commit: 8b2dd5f8f8647bcaa50a895c19f3a880e08eda1d
Parents: 72e2434
Author: errordaiwa <xi...@outlook.com>
Authored: Fri Jul 10 11:00:28 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 07:47:21 2015 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/clj/backtype/storm/daemon/executor.clj | 1 +
storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 ++
storm-core/src/clj/backtype/storm/disruptor.clj | 4 ++--
storm-core/src/jvm/backtype/storm/Config.java | 7 +++++++
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 8 ++++++--
.../test/jvm/backtype/storm/utils/DisruptorQueueTest.java | 2 +-
7 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 03f0ce2..1064e21 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -198,5 +198,6 @@ topology.testing.always.try.serialize: false
topology.classpath: null
topology.environment: null
topology.bolts.outgoing.overflow.buffer.enable: false
+topology.disruptor.wait.timeout.millis: 10
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 454fd0d..75414d0 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -223,6 +223,7 @@
batch-transfer->worker (disruptor/disruptor-queue
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
+ (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:claim-strategy :single-threaded
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
]
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 7157cf7..2e64fb4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -153,6 +153,7 @@
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
+ (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))]))
(into {})
))
@@ -194,6 +195,7 @@
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+ (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index a723601..e96e49d 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -45,10 +45,10 @@
;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
;; unblocking the consumer
(defnk disruptor-queue
- [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+ [^String queue-name buffer-size timeout :claim-strategy :multi-threaded :wait-strategy :block]
(DisruptorQueue. queue-name
((CLAIM-STRATEGY claim-strategy) buffer-size)
- (mk-wait-strategy wait-strategy)))
+ (mk-wait-strategy wait-strategy) timeout))
(defn clojure-handler
[afn]
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 4e15352..b472ef7 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1393,6 +1393,13 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
+ /**
+ * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
+ * vs. CPU usage
+ */
+ public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+ public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = Number.class;
+
public static void setClasspath(Map conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 1f2110d..195c2f7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -60,8 +60,10 @@ public class DisruptorQueue implements IStatefulObject {
private static String PREFIX = "disruptor-";
private String _queueName = "";
+
+ private long _waitTimeout;
- public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
+ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, long timeout) {
this._queueName = PREFIX + queueName;
_buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
_consumer = new Sequence();
@@ -77,6 +79,8 @@ public class DisruptorQueue implements IStatefulObject {
throw new RuntimeException("This code should be unreachable!", e);
}
}
+
+ _waitTimeout = timeout;
}
public String getName() {
@@ -94,7 +98,7 @@ public class DisruptorQueue implements IStatefulObject {
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
- final long availableSequence = _barrier.waitFor(nextSequence, 1000, TimeUnit.MILLISECONDS);
+ final long availableSequence = _barrier.waitFor(nextSequence, _waitTimeout, TimeUnit.MILLISECONDS);
if(availableSequence >= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8b2dd5f8/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 61af50d..ddc0982 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -154,6 +154,6 @@ public class DisruptorQueueTest extends TestCase {
private static DisruptorQueue createQueue(String name, int queueSize) {
return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
- queueSize), new BlockingWaitStrategy());
+ queueSize), new BlockingWaitStrategy(), 10L);
}
}
[5/7] storm git commit: Merge branch 'STORM-929' into STORM-935
Posted by ka...@apache.org.
Merge branch 'STORM-929' into STORM-935
Conflicts:
storm-core/src/jvm/backtype/storm/Config.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05948253
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05948253
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05948253
Branch: refs/heads/0.10.x-branch
Commit: 0594825303397289eee258915b05d0c0a697a717
Parents: da76088
Author: errordaiwa <xi...@outlook.com>
Authored: Tue Jul 14 12:40:45 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 08:01:01 2015 +0900
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/05948253/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index c4c936a..4d3f18b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -98,8 +98,11 @@ public class DisruptorQueue implements IStatefulObject {
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
- final long availableSequence = _barrier.waitFor(nextSequence);
- if(availableSequence >= nextSequence) {
+ final long availableSequence =
+ _waitTimeout == 0L ? _barrier.waitFor(nextSequence) : _barrier.waitFor(nextSequence, _waitTimeout,
+ TimeUnit.MILLISECONDS);
+
+ if (availableSequence >= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
}
} catch (AlertException e) {
[4/7] storm git commit: update version of Disruptor queue to 2.10.4,
consumer of queue will use waitfor method without timeout
Posted by ka...@apache.org.
update version of Disruptor queue to 2.10.4, consumer of queue will use waitfor method without timeout
Conflicts:
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da76088a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da76088a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da76088a
Branch: refs/heads/0.10.x-branch
Commit: da76088a9bb6d6f223bf03ba4bc7c0ebd98defd8
Parents: b97a383
Author: errordaiwa <xi...@outlook.com>
Authored: Mon Jul 13 17:46:16 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jul 17 07:58:29 2015 +0900
----------------------------------------------------------------------
pom.xml | 2 +-
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/da76088a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5d9182c..da361f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -206,7 +206,7 @@
<snakeyaml.version>1.11</snakeyaml.version>
<httpclient.version>4.3.3</httpclient.version>
<clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
- <disruptor.version>2.10.1</disruptor.version>
+ <disruptor.version>2.10.4</disruptor.version>
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
<netty.version>3.9.0.Final</netty.version>
http://git-wip-us.apache.org/repos/asf/storm/blob/da76088a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 195c2f7..c4c936a 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -98,7 +98,7 @@ public class DisruptorQueue implements IStatefulObject {
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
- final long availableSequence = _barrier.waitFor(nextSequence, _waitTimeout, TimeUnit.MILLISECONDS);
+ final long availableSequence = _barrier.waitFor(nextSequence);
if(availableSequence >= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
}