You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/03 20:21:29 UTC
[1/3] storm git commit: Revert "Merge branch 'STORM-756-v2' of
https://github.com/HeartSaVioR/storm into STORM-756"
Repository: storm
Updated Branches:
refs/heads/master 18f68f7a9 -> ce2d49b92
Revert "Merge branch 'STORM-756-v2' of https://github.com/HeartSaVioR/storm into STORM-756"
This reverts commit cd00dde37d09ca9ff94bb7f3cad08f66cfe74b81, reversing
changes made to a55b0503297f7cd7a4ba3f67ef1a6b2e2b33e4c2.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d2090d79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d2090d79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d2090d79
Branch: refs/heads/master
Commit: d2090d799f8b5bb06a17094545d39c43f12ba6c7
Parents: 18f68f7
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Dec 3 13:07:33 2015 -0600
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Dec 3 13:07:33 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 1 -
storm-core/src/jvm/backtype/storm/Config.java | 1 -
.../src/jvm/backtype/storm/task/ShellBolt.java | 13 +-
.../storm/utils/ShellBoltMessageQueue.java | 121 -------------------
.../test/clj/backtype/storm/multilang_test.clj | 2 +-
.../storm/utils/ShellBoltMessageQueueTest.java | 84 -------------
6 files changed, 8 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c1124bd..295ac7c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -206,7 +206,6 @@ topology.tasks: null
# maximum amount of time a message has to complete before it's considered failed
topology.message.timeout.secs: 30
topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
-topology.shellbolt.max.pending: 100
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 01317ee..c30ffff 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1707,7 +1707,6 @@ public class Config extends HashMap<String, Object> {
/**
* Max pending tuples in one ShellBolt
*/
- @NotNull
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 84a2b8a..cf6a330 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.multilang.BoltMsg;
import backtype.storm.multilang.ShellMsg;
import backtype.storm.topology.ReportedFailedException;
import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.ShellBoltMessageQueue;
import backtype.storm.utils.ShellProcess;
import clojure.lang.RT;
import com.google.common.util.concurrent.MoreExecutors;
@@ -78,7 +77,7 @@ public class ShellBolt implements IBolt {
private ShellProcess _process;
private volatile boolean _running = true;
private volatile Throwable _exception;
- private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue();
+ private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
private Random _rand;
private Thread _readerThread;
@@ -108,9 +107,8 @@ public class ShellBolt implements IBolt {
final OutputCollector collector) {
Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
if (maxPending != null) {
- this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue());
+ this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
}
-
_rand = new Random();
_collector = collector;
@@ -156,7 +154,7 @@ public class ShellBolt implements IBolt {
try {
BoltMsg boltMsg = createBoltMessage(input, genId);
- _pendingWrites.putBoltMsg(boltMsg);
+ _pendingWrites.put(boltMsg);
} catch(InterruptedException e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException("Error during multilang processing " + processInfo, e);
@@ -218,7 +216,7 @@ public class ShellBolt implements IBolt {
if(shellMsg.getTask() == 0) {
List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
if (shellMsg.areTaskIdsNeeded()) {
- _pendingWrites.putTaskIds(outtasks);
+ _pendingWrites.put(outtasks);
}
} else {
_collector.emitDirect((int) shellMsg.getTask(),
@@ -324,6 +322,8 @@ public class ShellBolt implements IBolt {
sendHeartbeatFlag.compareAndSet(false, true);
}
+
+
}
private class BoltReaderRunnable implements Runnable {
@@ -388,6 +388,7 @@ public class ShellBolt implements IBolt {
} else if (write != null) {
throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
}
+ } catch (InterruptedException e) {
} catch (Throwable t) {
die(t);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
deleted file mode 100644
index b633bc5..0000000
--- a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.utils;
-
-import backtype.storm.multilang.BoltMsg;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A data structure for ShellBolt which includes two queues (FIFO),
- * which one is for task ids (unbounded), another one is for bolt msg (bounded).
- */
-public class ShellBoltMessageQueue implements Serializable {
- private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>();
- private final LinkedBlockingQueue<BoltMsg> boltMsgQueue;
-
- private final ReentrantLock takeLock = new ReentrantLock();
- private final Condition notEmpty = takeLock.newCondition();
-
- public ShellBoltMessageQueue(int boltMsgCapacity) {
- if (boltMsgCapacity <= 0) {
- throw new IllegalArgumentException();
- }
- this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity);
- }
-
- public ShellBoltMessageQueue() {
- this(Integer.MAX_VALUE);
- }
-
- /**
- * put list of task id to its queue
- * @param taskIds task ids that received the tuples
- */
- public void putTaskIds(List<Integer> taskIds) {
- taskIdsQueue.add(taskIds);
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
-
- /**
- * put bolt message to its queue
- * @param boltMsg BoltMsg to pass to subprocess
- * @throws InterruptedException
- */
- public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException {
- boltMsgQueue.put(boltMsg);
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
-
- /**
- * poll() is a core feature of ShellBoltMessageQueue.
- * It retrieves and removes the head of one queues, waiting up to the
- * specified wait time if necessary for an element to become available.
- * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue.
- *
- * @param timeout how long to wait before giving up, in units of unit
- * @param unit a TimeUnit determining how to interpret the timeout parameter
- * @return List\<Integer\> if task id is available,
- * BoltMsg if task id is not available but bolt message is available,
- * null if the specified waiting time elapses before an element is available.
- * @throws InterruptedException
- */
- public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
- takeLock.lockInterruptibly();
- long nanos = unit.toNanos(timeout);
- try {
- // wait for available queue
- while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) {
- if (nanos <= 0) {
- return null;
- }
- nanos = notEmpty.awaitNanos(nanos);
- }
-
- // taskIds first
- List<Integer> taskIds = taskIdsQueue.peek();
- if (taskIds != null) {
- taskIds = taskIdsQueue.poll();
- return taskIds;
- }
-
- // boltMsgQueue should have at least one entry at the moment
- return boltMsgQueue.poll();
- } finally {
- takeLock.unlock();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/test/clj/backtype/storm/multilang_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj
index b42a56f..ff8f2f1 100644
--- a/storm-core/test/clj/backtype/storm/multilang_test.clj
+++ b/storm-core/test/clj/backtype/storm/multilang_test.clj
@@ -47,7 +47,7 @@
"test"
{TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
topology)
- (Thread/sleep 31000)
+ (Thread/sleep 11000)
(.killTopology nimbus "test")
(Thread/sleep 11000)
)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
deleted file mode 100644
index 229efa1..0000000
--- a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.utils;
-
-import backtype.storm.multilang.BoltMsg;
-import com.google.common.collect.Lists;
-import junit.framework.TestCase;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class ShellBoltMessageQueueTest extends TestCase {
- @Test
- public void testPollTaskIdsFirst() throws InterruptedException {
- ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
-
- // put bolt message first, then put task ids
- queue.putBoltMsg(new BoltMsg());
- ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
- queue.putTaskIds(taskIds);
-
- Object msg = queue.poll(10, TimeUnit.SECONDS);
-
- // task ids should be pulled first
- assertTrue(msg instanceof List<?>);
- assertEquals(msg, taskIds);
- }
-
- @Test
- public void testPollWhileThereAreNoDataAvailable() throws InterruptedException {
- ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
-
- long start = System.currentTimeMillis();
- Object msg = queue.poll(1, TimeUnit.SECONDS);
- long finish = System.currentTimeMillis();
-
- assertNull(msg);
- assertTrue(finish - start > 1000);
- }
-
- @Test
- public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException {
- final ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
- final List<Integer> taskIds = Lists.newArrayList(1, 2, 3);
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // NOOP
- }
-
- queue.putTaskIds(taskIds);
- }
- });
- t.start();
-
- long start = System.currentTimeMillis();
- Object msg = queue.poll(10, TimeUnit.SECONDS);
- long finish = System.currentTimeMillis();
-
- assertEquals(msg, taskIds);
- assertTrue(finish - start < (10 * 1000));
- }
-}
[2/3] storm git commit: Revert "Added STORM-756 to Changelog"
Posted by da...@apache.org.
Revert "Added STORM-756 to Changelog"
This reverts commit 041fbe34978b20cf98e740a4d914a4e839bb33e2.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6d84e9f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6d84e9f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6d84e9f
Branch: refs/heads/master
Commit: f6d84e9fdf1fe21f2229f144a08dceae6259e7e2
Parents: d2090d7
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Dec 3 13:08:59 2015 -0600
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Dec 3 13:08:59 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f6d84e9f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6684d7e..8420f1a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,5 @@
## 0.11.0
* STORM-1361: Apache License missing from two Cassandra files
- * STORM-756: Handle taskids response as soon as possible
* STORM-1218: Use markdown for JavaDoc.
* STORM-1075: Storm Cassandra connector.
* STORM-965: excessive logging in storm when non-kerberos client tries to connect
[3/3] storm git commit: Merge branch 'revert-storm-756'
Posted by da...@apache.org.
Merge branch 'revert-storm-756'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce2d49b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce2d49b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce2d49b9
Branch: refs/heads/master
Commit: ce2d49b924b6690d7704eccbb91ebbbea0a601bb
Parents: 18f68f7 f6d84e9
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Dec 3 13:20:48 2015 -0600
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Thu Dec 3 13:20:48 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 -
conf/defaults.yaml | 1 -
storm-core/src/jvm/backtype/storm/Config.java | 1 -
.../src/jvm/backtype/storm/task/ShellBolt.java | 13 +-
.../storm/utils/ShellBoltMessageQueue.java | 121 -------------------
.../test/clj/backtype/storm/multilang_test.clj | 2 +-
.../storm/utils/ShellBoltMessageQueueTest.java | 84 -------------
7 files changed, 8 insertions(+), 215 deletions(-)
----------------------------------------------------------------------