You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/04 23:44:25 UTC

[1/3] storm git commit: Revert "Merge branch 'revert-storm-756'"

Repository: storm
Updated Branches:
  refs/heads/master 19b8b7d13 -> 79a2a2a58


Revert "Merge branch 'revert-storm-756'"

This reverts commit ce2d49b924b6690d7704eccbb91ebbbea0a601bb, reversing
changes made to 18f68f7a967f7173b9c7c7444bedd53b13ad65fb.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87f8fa1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87f8fa1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87f8fa1c

Branch: refs/heads/master
Commit: 87f8fa1c559b8764715fd6f82f8524565d8398aa
Parents: ce2d49b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Dec 4 08:03:46 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Dec 4 08:03:46 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 conf/defaults.yaml                              |   1 +
 storm-core/src/jvm/backtype/storm/Config.java   |   1 +
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  13 +-
 .../storm/utils/ShellBoltMessageQueue.java      | 121 +++++++++++++++++++
 .../test/clj/backtype/storm/multilang_test.clj  |   2 +-
 .../storm/utils/ShellBoltMessageQueueTest.java  |  84 +++++++++++++
 7 files changed, 215 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8420f1a..6684d7e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
 ## 0.11.0
  * STORM-1361: Apache License missing from two Cassandra files
+ * STORM-756: Handle taskids response as soon as possible
  * STORM-1218: Use markdown for JavaDoc.
  * STORM-1075: Storm Cassandra connector.
  * STORM-965: excessive logging in storm when non-kerberos client tries to connect

http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 295ac7c..c1124bd 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -206,6 +206,7 @@ topology.tasks: null
 # maximum amount of time a message has to complete before it's considered failed
 topology.message.timeout.secs: 30
 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
+topology.shellbolt.max.pending: 100
 topology.skip.missing.kryo.registrations: false
 topology.max.task.parallelism: null
 topology.max.spout.pending: null

http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index c30ffff..01317ee 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1707,6 +1707,7 @@ public class Config extends HashMap<String, Object> {
     /**
      * Max pending tuples in one ShellBolt
      */
+    @NotNull
     @isInteger
     @isPositiveNumber
     public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";

http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index cf6a330..84a2b8a 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -26,6 +26,7 @@ import backtype.storm.multilang.BoltMsg;
 import backtype.storm.multilang.ShellMsg;
 import backtype.storm.topology.ReportedFailedException;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.ShellBoltMessageQueue;
 import backtype.storm.utils.ShellProcess;
 import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -77,7 +78,7 @@ public class ShellBolt implements IBolt {
     private ShellProcess _process;
     private volatile boolean _running = true;
     private volatile Throwable _exception;
-    private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
+    private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue();
     private Random _rand;
 
     private Thread _readerThread;
@@ -107,8 +108,9 @@ public class ShellBolt implements IBolt {
                         final OutputCollector collector) {
         Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
         if (maxPending != null) {
-           this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
+            this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue());
         }
+
         _rand = new Random();
         _collector = collector;
 
@@ -154,7 +156,7 @@ public class ShellBolt implements IBolt {
         try {
             BoltMsg boltMsg = createBoltMessage(input, genId);
 
-            _pendingWrites.put(boltMsg);
+            _pendingWrites.putBoltMsg(boltMsg);
         } catch(InterruptedException e) {
             String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
             throw new RuntimeException("Error during multilang processing " + processInfo, e);
@@ -216,7 +218,7 @@ public class ShellBolt implements IBolt {
         if(shellMsg.getTask() == 0) {
             List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
             if (shellMsg.areTaskIdsNeeded()) {
-                _pendingWrites.put(outtasks);
+                _pendingWrites.putTaskIds(outtasks);
             }
         } else {
             _collector.emitDirect((int) shellMsg.getTask(),
@@ -322,8 +324,6 @@ public class ShellBolt implements IBolt {
 
             sendHeartbeatFlag.compareAndSet(false, true);
         }
-
-
     }
 
     private class BoltReaderRunnable implements Runnable {
@@ -388,7 +388,6 @@ public class ShellBolt implements IBolt {
                     } else if (write != null) {
                         throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
                     }
-                } catch (InterruptedException e) {
                 } catch (Throwable t) {
                     die(t);
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
new file mode 100644
index 0000000..b633bc5
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A data structure for ShellBolt which includes two queues (FIFO),
+ * which one is for task ids (unbounded), another one is for bolt msg (bounded).
+ */
+public class ShellBoltMessageQueue implements Serializable {
+    private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>();
+    private final LinkedBlockingQueue<BoltMsg> boltMsgQueue;
+
+    private final ReentrantLock takeLock = new ReentrantLock();
+    private final Condition notEmpty = takeLock.newCondition();
+
+    public ShellBoltMessageQueue(int boltMsgCapacity) {
+        if (boltMsgCapacity <= 0) {
+            throw new IllegalArgumentException();
+        }
+        this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity);
+    }
+
+    public ShellBoltMessageQueue() {
+        this(Integer.MAX_VALUE);
+    }
+
+    /**
+     * put list of task id to its queue
+     * @param taskIds task ids that received the tuples
+     */
+    public void putTaskIds(List<Integer> taskIds) {
+        taskIdsQueue.add(taskIds);
+        takeLock.lock();
+        try {
+            notEmpty.signal();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+    /**
+     * put bolt message to its queue
+     * @param boltMsg BoltMsg to pass to subprocess
+     * @throws InterruptedException
+     */
+    public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException {
+        boltMsgQueue.put(boltMsg);
+        takeLock.lock();
+        try {
+            notEmpty.signal();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+    /**
+     * poll() is a core feature of ShellBoltMessageQueue.
+     * It retrieves and removes the head of one queues, waiting up to the
+     * specified wait time if necessary for an element to become available.
+     * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue.
+     *
+     * @param timeout how long to wait before giving up, in units of unit
+     * @param unit a TimeUnit determining how to interpret the timeout parameter
+     * @return List\<Integer\> if task id is available,
+     * BoltMsg if task id is not available but bolt message is available,
+     * null if the specified waiting time elapses before an element is available.
+     * @throws InterruptedException
+     */
+    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
+        takeLock.lockInterruptibly();
+        long nanos = unit.toNanos(timeout);
+        try {
+            // wait for available queue
+            while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) {
+                if (nanos <= 0) {
+                    return null;
+                }
+                nanos = notEmpty.awaitNanos(nanos);
+            }
+
+            // taskIds first
+            List<Integer> taskIds = taskIdsQueue.peek();
+            if (taskIds != null) {
+                taskIds = taskIdsQueue.poll();
+                return taskIds;
+            }
+
+            // boltMsgQueue should have at least one entry at the moment
+            return boltMsgQueue.poll();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/test/clj/backtype/storm/multilang_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj
index ff8f2f1..b42a56f 100644
--- a/storm-core/test/clj/backtype/storm/multilang_test.clj
+++ b/storm-core/test/clj/backtype/storm/multilang_test.clj
@@ -47,7 +47,7 @@
                "test"
                {TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
                topology)
-       (Thread/sleep 11000)
+       (Thread/sleep 31000)
        (.killTopology nimbus "test")
        (Thread/sleep 11000)
        )))

http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
new file mode 100644
index 0000000..229efa1
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+import com.google.common.collect.Lists;
+import junit.framework.TestCase;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ShellBoltMessageQueueTest extends TestCase {
+    @Test
+    public void testPollTaskIdsFirst() throws InterruptedException {
+        ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+        // put bolt message first, then put task ids
+        queue.putBoltMsg(new BoltMsg());
+        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+        queue.putTaskIds(taskIds);
+
+        Object msg = queue.poll(10, TimeUnit.SECONDS);
+
+        // task ids should be pulled first
+        assertTrue(msg instanceof List<?>);
+        assertEquals(msg, taskIds);
+    }
+
+    @Test
+    public void testPollWhileThereAreNoDataAvailable() throws InterruptedException {
+        ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+        long start = System.currentTimeMillis();
+        Object msg = queue.poll(1, TimeUnit.SECONDS);
+        long finish = System.currentTimeMillis();
+
+        assertNull(msg);
+        assertTrue(finish - start > 1000);
+    }
+
+    @Test
+    public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException {
+        final ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+        final List<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // NOOP
+                }
+
+                queue.putTaskIds(taskIds);
+            }
+        });
+        t.start();
+
+        long start = System.currentTimeMillis();
+        Object msg = queue.poll(10, TimeUnit.SECONDS);
+        long finish = System.currentTimeMillis();
+
+        assertEquals(msg, taskIds);
+        assertTrue(finish - start < (10 * 1000));
+    }
+}


[3/3] storm git commit: Merge branch 'STORM-756-v2-retry-1' of https://github.com/HeartSaVioR/storm into storm-756-remerge

Posted by da...@apache.org.
Merge branch 'STORM-756-v2-retry-1' of https://github.com/HeartSaVioR/storm into storm-756-remerge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/79a2a2a5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/79a2a2a5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/79a2a2a5

Branch: refs/heads/master
Commit: 79a2a2a58888ca5c7dad379e7316aacfe2c465aa
Parents: 19b8b7d f989081
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Fri Dec 4 16:33:09 2015 -0600
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Fri Dec 4 16:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 conf/defaults.yaml                              |   1 +
 storm-core/src/jvm/backtype/storm/Config.java   |   1 +
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  13 +-
 .../storm/utils/ShellBoltMessageQueue.java      | 121 +++++++++++++++++++
 .../test/clj/backtype/storm/multilang_test.clj  |   2 +-
 .../storm/utils/ShellBoltMessageQueueTest.java  |  85 +++++++++++++
 7 files changed, 216 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/79a2a2a5/CHANGELOG.md
----------------------------------------------------------------------
diff --cc CHANGELOG.md
index 2185747,6684d7e..5254cbb
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@@ -1,6 -1,6 +1,7 @@@
  ## 0.11.0
 + * STORM-876: Blobstore/DistCache Support
   * STORM-1361: Apache License missing from two Cassandra files
+  * STORM-756: Handle taskids response as soon as possible
   * STORM-1218: Use markdown for JavaDoc.
   * STORM-1075: Storm Cassandra connector.
   * STORM-965: excessive logging in storm when non-kerberos client tries to connect

http://git-wip-us.apache.org/repos/asf/storm/blob/79a2a2a5/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/79a2a2a5/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------


[2/3] storm git commit: fix test: wait duration could be exactly 1000 ms

Posted by da...@apache.org.
fix test: wait duration could be exactly 1000 ms


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9890817
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9890817
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9890817

Branch: refs/heads/master
Commit: f9890817884fa3c3592ee09e085c0cc58dc52149
Parents: 87f8fa1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Dec 4 08:16:31 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Dec 4 08:16:31 2015 +0900

----------------------------------------------------------------------
 .../test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f9890817/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
index 229efa1..f77877f 100644
--- a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
@@ -50,9 +50,10 @@ public class ShellBoltMessageQueueTest extends TestCase {
         long start = System.currentTimeMillis();
         Object msg = queue.poll(1, TimeUnit.SECONDS);
         long finish = System.currentTimeMillis();
+        long waitDuration = finish - start;
 
         assertNull(msg);
-        assertTrue(finish - start > 1000);
+        assertTrue("wait duration should be equal or greater than 1000, current: " + waitDuration, waitDuration >= 1000);
     }
 
     @Test