You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/02 16:44:44 UTC
[1/8] storm git commit: STORM-756 Handle taskids response ASAP
Repository: storm
Updated Branches:
refs/heads/master a55b05032 -> 041fbe349
STORM-756 Handle taskids response ASAP
* create new queue which stores only taskids responses
** BoltReaderRunnable thread is no longer blocked by _pendingWrites.put()
* let BoltWriterRunnable sends messages with respecting priorities
** heartbeat > taskids > tuple
* set sleep time from multilang_test long enough
** that topology is activate and it processes some tuples for a while
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a16eca7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a16eca7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a16eca7e
Branch: refs/heads/master
Commit: a16eca7e0b35c35be064a742f834add1e4f20879
Parents: a8d253a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Nov 22 18:25:51 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Nov 22 23:09:57 2015 +0900
----------------------------------------------------------------------
.../src/jvm/backtype/storm/task/ShellBolt.java | 32 +++++++++++---------
.../test/clj/backtype/storm/multilang_test.clj | 2 +-
2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a16eca7e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index dda99ca..8baf2c6 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -37,8 +37,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
/**
* A bolt that shells out to another process to process tuples. ShellBolt
* communicates with that process over stdio using a special protocol. An ~100
@@ -77,7 +75,8 @@ public class ShellBolt implements IBolt {
private ShellProcess _process;
private volatile boolean _running = true;
private volatile Throwable _exception;
- private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
+ private LinkedBlockingQueue<BoltMsg> _pendingWrites = new LinkedBlockingQueue<>();
+ private LinkedBlockingQueue<List<Integer>> _pendingTaskIds = new LinkedBlockingQueue<>();
private Random _rand;
private Thread _readerThread;
@@ -107,7 +106,7 @@ public class ShellBolt implements IBolt {
final OutputCollector collector) {
Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
if (maxPending != null) {
- this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
+ this._pendingWrites = new LinkedBlockingQueue<>(((Number)maxPending).intValue());
}
_rand = new Random();
_collector = collector;
@@ -212,7 +211,7 @@ public class ShellBolt implements IBolt {
if(shellMsg.getTask() == 0) {
List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
if (shellMsg.areTaskIdsNeeded()) {
- _pendingWrites.put(outtasks);
+ _pendingTaskIds.put(outtasks);
}
} else {
_collector.emitDirect((int) shellMsg.getTask(),
@@ -318,8 +317,6 @@ public class ShellBolt implements IBolt {
sendHeartbeatFlag.compareAndSet(false, true);
}
-
-
}
private class BoltReaderRunnable implements Runnable {
@@ -376,15 +373,22 @@ public class ShellBolt implements IBolt {
sendHeartbeatFlag.compareAndSet(true, false);
}
- Object write = _pendingWrites.poll(1, SECONDS);
- if (write instanceof BoltMsg) {
- _process.writeBoltMsg((BoltMsg) write);
- } else if (write instanceof List<?>) {
- _process.writeTaskIds((List<Integer>)write);
- } else if (write != null) {
- throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
+ List<Integer> taskIds = _pendingTaskIds.peek();
+ if (taskIds != null) {
+ taskIds = _pendingTaskIds.poll();
+ _process.writeTaskIds(taskIds);
+ continue;
+ }
+
+ BoltMsg write = _pendingWrites.peek();
+ if (write != null) {
+ write = _pendingWrites.poll();
+ _process.writeBoltMsg(write);
}
+ /*
} catch (InterruptedException e) {
+ // NOOP
+ */
} catch (Throwable t) {
die(t);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a16eca7e/storm-core/test/clj/backtype/storm/multilang_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj
index ff8f2f1..b42a56f 100644
--- a/storm-core/test/clj/backtype/storm/multilang_test.clj
+++ b/storm-core/test/clj/backtype/storm/multilang_test.clj
@@ -47,7 +47,7 @@
"test"
{TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
topology)
- (Thread/sleep 11000)
+ (Thread/sleep 31000)
(.killTopology nimbus "test")
(Thread/sleep 11000)
)))
[5/8] storm git commit: STORM-756 Add unit tests of
ShellBoltMessageQueue
Posted by bo...@apache.org.
STORM-756 Add unit tests of ShellBoltMessageQueue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/117e8186
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/117e8186
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/117e8186
Branch: refs/heads/master
Commit: 117e81861485ab02d4b97de725309132fce3f2a1
Parents: 56dc7b9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 24 19:14:05 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 24 19:14:05 2015 +0900
----------------------------------------------------------------------
.../storm/utils/ShellBoltMessageQueueTest.java | 67 ++++++++++++++++++++
1 file changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/117e8186/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
new file mode 100644
index 0000000..1dcbb03
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
@@ -0,0 +1,67 @@
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+import com.google.common.collect.Lists;
+import junit.framework.TestCase;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ShellBoltMessageQueueTest extends TestCase {
+ @Test
+ public void testPollTaskIdsFirst() throws InterruptedException {
+ ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+ // put bolt message first, then put task ids
+ queue.putBoltMsg(new BoltMsg());
+ ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+ queue.putTaskIds(taskIds);
+
+ Object msg = queue.poll(10, TimeUnit.SECONDS);
+
+ // task ids should be pulled first
+ assertTrue(msg instanceof List<?>);
+ assertEquals(msg, taskIds);
+ }
+
+ @Test
+ public void testPollWhileThereAreNoDataAvailable() throws InterruptedException {
+ ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+ long start = System.currentTimeMillis();
+ Object msg = queue.poll(1, TimeUnit.SECONDS);
+ long finish = System.currentTimeMillis();
+
+ assertNull(msg);
+ assertTrue(finish - start > 1000);
+ }
+
+ @Test
+ public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException {
+ final ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+ final List<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // NOOP
+ }
+
+ queue.putTaskIds(taskIds);
+ }
+ });
+ t.start();
+
+ long start = System.currentTimeMillis();
+ Object msg = queue.poll(10, TimeUnit.SECONDS);
+ long finish = System.currentTimeMillis();
+
+ assertEquals(msg, taskIds);
+ assertTrue(finish - start < (10 * 1000));
+ }
+}
[3/8] storm git commit: Remove unneeded comment
Posted by bo...@apache.org.
Remove unneeded comment
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/124f664a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/124f664a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/124f664a
Branch: refs/heads/master
Commit: 124f664af6d8cdf995ed4061b1f1a90c84e7c18e
Parents: d91c393
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Nov 22 23:15:19 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Nov 22 23:15:19 2015 +0900
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/124f664a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 8baf2c6..1d97d53 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -385,10 +385,6 @@ public class ShellBolt implements IBolt {
write = _pendingWrites.poll();
_process.writeBoltMsg(write);
}
- /*
- } catch (InterruptedException e) {
- // NOOP
- */
} catch (Throwable t) {
die(t);
}
[4/8] storm git commit: STORM-756 Introduce ShellBoltMessageQueue
Posted by bo...@apache.org.
STORM-756 Introduce ShellBoltMessageQueue
* ShellBoltMessageQueue contains two different queues
** one is for taskids (unbounded)
** another one is for bolt msg (bounded)
* Poll priority between two queue: task ids is higher than bolt msg
** poll() returns task ids whenever available, and returns bolt msg if
task ids is not available
* poll() behaves like LinkedBlockingQueue.pool() with timeout manner
** awaits while not available, wake up when available or timed-out
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56dc7b9d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56dc7b9d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56dc7b9d
Branch: refs/heads/master
Commit: 56dc7b9d25f6b80856c541a71a036b2574d58ae8
Parents: 124f664
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 24 14:56:43 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 24 14:56:43 2015 +0900
----------------------------------------------------------------------
.../src/jvm/backtype/storm/task/ShellBolt.java | 31 +++--
.../storm/utils/ShellBoltMessageQueue.java | 121 +++++++++++++++++++
2 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/56dc7b9d/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 1d97d53..215094b 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -26,6 +26,7 @@ import backtype.storm.multilang.BoltMsg;
import backtype.storm.multilang.ShellMsg;
import backtype.storm.topology.ReportedFailedException;
import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.ShellBoltMessageQueue;
import backtype.storm.utils.ShellProcess;
import clojure.lang.RT;
import com.google.common.util.concurrent.MoreExecutors;
@@ -37,6 +38,8 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
/**
* A bolt that shells out to another process to process tuples. ShellBolt
* communicates with that process over stdio using a special protocol. An ~100
@@ -75,8 +78,7 @@ public class ShellBolt implements IBolt {
private ShellProcess _process;
private volatile boolean _running = true;
private volatile Throwable _exception;
- private LinkedBlockingQueue<BoltMsg> _pendingWrites = new LinkedBlockingQueue<>();
- private LinkedBlockingQueue<List<Integer>> _pendingTaskIds = new LinkedBlockingQueue<>();
+ private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue();
private Random _rand;
private Thread _readerThread;
@@ -106,8 +108,9 @@ public class ShellBolt implements IBolt {
final OutputCollector collector) {
Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
if (maxPending != null) {
- this._pendingWrites = new LinkedBlockingQueue<>(((Number)maxPending).intValue());
+ this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue());
}
+
_rand = new Random();
_collector = collector;
@@ -149,7 +152,7 @@ public class ShellBolt implements IBolt {
try {
BoltMsg boltMsg = createBoltMessage(input, genId);
- _pendingWrites.put(boltMsg);
+ _pendingWrites.putBoltMsg(boltMsg);
} catch(InterruptedException e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException("Error during multilang processing " + processInfo, e);
@@ -211,7 +214,7 @@ public class ShellBolt implements IBolt {
if(shellMsg.getTask() == 0) {
List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
if (shellMsg.areTaskIdsNeeded()) {
- _pendingTaskIds.put(outtasks);
+ _pendingWrites.putTaskIds(outtasks);
}
} else {
_collector.emitDirect((int) shellMsg.getTask(),
@@ -373,17 +376,13 @@ public class ShellBolt implements IBolt {
sendHeartbeatFlag.compareAndSet(true, false);
}
- List<Integer> taskIds = _pendingTaskIds.peek();
- if (taskIds != null) {
- taskIds = _pendingTaskIds.poll();
- _process.writeTaskIds(taskIds);
- continue;
- }
-
- BoltMsg write = _pendingWrites.peek();
- if (write != null) {
- write = _pendingWrites.poll();
- _process.writeBoltMsg(write);
+ Object write = _pendingWrites.poll(1, SECONDS);
+ if (write instanceof BoltMsg) {
+ _process.writeBoltMsg((BoltMsg) write);
+ } else if (write instanceof List<?>) {
+ _process.writeTaskIds((List<Integer>)write);
+ } else if (write != null) {
+ throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
}
} catch (Throwable t) {
die(t);
http://git-wip-us.apache.org/repos/asf/storm/blob/56dc7b9d/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
new file mode 100644
index 0000000..b633bc5
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A data structure for ShellBolt which includes two queues (FIFO),
+ * which one is for task ids (unbounded), another one is for bolt msg (bounded).
+ */
+public class ShellBoltMessageQueue implements Serializable {
+ private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>();
+ private final LinkedBlockingQueue<BoltMsg> boltMsgQueue;
+
+ private final ReentrantLock takeLock = new ReentrantLock();
+ private final Condition notEmpty = takeLock.newCondition();
+
+ public ShellBoltMessageQueue(int boltMsgCapacity) {
+ if (boltMsgCapacity <= 0) {
+ throw new IllegalArgumentException();
+ }
+ this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity);
+ }
+
+ public ShellBoltMessageQueue() {
+ this(Integer.MAX_VALUE);
+ }
+
+ /**
+ * put list of task id to its queue
+ * @param taskIds task ids that received the tuples
+ */
+ public void putTaskIds(List<Integer> taskIds) {
+ taskIdsQueue.add(taskIds);
+ takeLock.lock();
+ try {
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ /**
+ * put bolt message to its queue
+ * @param boltMsg BoltMsg to pass to subprocess
+ * @throws InterruptedException
+ */
+ public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException {
+ boltMsgQueue.put(boltMsg);
+ takeLock.lock();
+ try {
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ /**
+ * poll() is a core feature of ShellBoltMessageQueue.
+ * It retrieves and removes the head of one queues, waiting up to the
+ * specified wait time if necessary for an element to become available.
+ * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue.
+ *
+ * @param timeout how long to wait before giving up, in units of unit
+ * @param unit a TimeUnit determining how to interpret the timeout parameter
+ * @return List\<Integer\> if task id is available,
+ * BoltMsg if task id is not available but bolt message is available,
+ * null if the specified waiting time elapses before an element is available.
+ * @throws InterruptedException
+ */
+ public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
+ takeLock.lockInterruptibly();
+ long nanos = unit.toNanos(timeout);
+ try {
+ // wait for available queue
+ while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) {
+ if (nanos <= 0) {
+ return null;
+ }
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+
+ // taskIds first
+ List<Integer> taskIds = taskIdsQueue.peek();
+ if (taskIds != null) {
+ taskIds = taskIdsQueue.poll();
+ return taskIds;
+ }
+
+ // boltMsgQueue should have at least one entry at the moment
+ return boltMsgQueue.poll();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+}
[7/8] storm git commit: Merge branch 'STORM-756-v2' of
https://github.com/HeartSaVioR/storm into STORM-756
Posted by bo...@apache.org.
Merge branch 'STORM-756-v2' of https://github.com/HeartSaVioR/storm into STORM-756
STORM-756: Handle taskids response as soon as possible
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd00dde3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd00dde3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd00dde3
Branch: refs/heads/master
Commit: cd00dde37d09ca9ff94bb7f3cad08f66cfe74b81
Parents: a55b050 ca02690
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Dec 2 09:30:16 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Dec 2 09:30:16 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/jvm/backtype/storm/Config.java | 1 +
.../src/jvm/backtype/storm/task/ShellBolt.java | 13 +-
.../storm/utils/ShellBoltMessageQueue.java | 121 +++++++++++++++++++
.../test/clj/backtype/storm/multilang_test.clj | 2 +-
.../storm/utils/ShellBoltMessageQueueTest.java | 84 +++++++++++++
6 files changed, 214 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd00dde3/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd00dde3/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd00dde3/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
[6/8] storm git commit: Forgot to add license comment
Posted by bo...@apache.org.
Forgot to add license comment
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca02690a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca02690a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca02690a
Branch: refs/heads/master
Commit: ca02690aa551586389e572d77b6e467abf2d1b10
Parents: 117e818
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 24 19:17:33 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 24 19:17:33 2015 +0900
----------------------------------------------------------------------
.../storm/utils/ShellBoltMessageQueueTest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ca02690a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
index 1dcbb03..229efa1 100644
--- a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package backtype.storm.utils;
import backtype.storm.multilang.BoltMsg;
[2/8] storm git commit: STORM-756 set default value of
topology.shellbolt.max.pending
Posted by bo...@apache.org.
STORM-756 set default value of topology.shellbolt.max.pending
* Bounded _pendingWrites makes ShellBolt play well with ABP
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d91c393f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d91c393f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d91c393f
Branch: refs/heads/master
Commit: d91c393f092ad2e050f14bc3c6ab9e9658545c26
Parents: a16eca7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Nov 22 18:56:50 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Nov 22 23:12:36 2015 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
storm-core/src/jvm/backtype/storm/Config.java | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d91c393f/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b520745..34746d6 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -205,6 +205,7 @@ topology.tasks: null
# maximum amount of time a message has to complete before it's considered failed
topology.message.timeout.secs: 30
topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
+topology.shellbolt.max.pending: 100
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
http://git-wip-us.apache.org/repos/asf/storm/blob/d91c393f/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index a1da8fe..7b18595 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1652,6 +1652,7 @@ public class Config extends HashMap<String, Object> {
/**
* Max pending tuples in one ShellBolt
*/
+ @NotNull
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
[8/8] storm git commit: Added STORM-756 to Changelog
Posted by bo...@apache.org.
Added STORM-756 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/041fbe34
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/041fbe34
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/041fbe34
Branch: refs/heads/master
Commit: 041fbe34978b20cf98e740a4d914a4e839bb33e2
Parents: cd00dde
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Dec 2 09:30:43 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Dec 2 09:30:43 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/041fbe34/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a3a2fe6..261de38 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-756: Handle taskids response as soon as possible
* STORM-1218: Use markdown for JavaDoc.
* STORM-1075: Storm Cassandra connector.
* STORM-1341: Let topology have own heartbeat timeout for multilang subprocess