You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/02 16:44:47 UTC

[4/8] storm git commit: STORM-756 Introduce ShellBoltMessageQueue

STORM-756 Introduce ShellBoltMessageQueue

* ShellBoltMessageQueue contains two different queues
** one is for taskids (unbounded)
** another one is for bolt msg (bounded)
* Poll priority between two queue: task ids is higher than bolt msg
** poll() returns task ids whenever available, and returns bolt msg if
task ids is not available
* poll() behaves like LinkedBlockingQueue.pool() with timeout manner
** awaits while not available, wake up when available or timed-out


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56dc7b9d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56dc7b9d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56dc7b9d

Branch: refs/heads/master
Commit: 56dc7b9d25f6b80856c541a71a036b2574d58ae8
Parents: 124f664
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 24 14:56:43 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 24 14:56:43 2015 +0900

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  31 +++--
 .../storm/utils/ShellBoltMessageQueue.java      | 121 +++++++++++++++++++
 2 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/56dc7b9d/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 1d97d53..215094b 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -26,6 +26,7 @@ import backtype.storm.multilang.BoltMsg;
 import backtype.storm.multilang.ShellMsg;
 import backtype.storm.topology.ReportedFailedException;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.ShellBoltMessageQueue;
 import backtype.storm.utils.ShellProcess;
 import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -37,6 +38,8 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 /**
  * A bolt that shells out to another process to process tuples. ShellBolt
  * communicates with that process over stdio using a special protocol. An ~100
@@ -75,8 +78,7 @@ public class ShellBolt implements IBolt {
     private ShellProcess _process;
     private volatile boolean _running = true;
     private volatile Throwable _exception;
-    private LinkedBlockingQueue<BoltMsg> _pendingWrites = new LinkedBlockingQueue<>();
-    private LinkedBlockingQueue<List<Integer>> _pendingTaskIds = new LinkedBlockingQueue<>();
+    private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue();
     private Random _rand;
 
     private Thread _readerThread;
@@ -106,8 +108,9 @@ public class ShellBolt implements IBolt {
                         final OutputCollector collector) {
         Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
         if (maxPending != null) {
-           this._pendingWrites = new LinkedBlockingQueue<>(((Number)maxPending).intValue());
+            this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue());
         }
+
         _rand = new Random();
         _collector = collector;
 
@@ -149,7 +152,7 @@ public class ShellBolt implements IBolt {
         try {
             BoltMsg boltMsg = createBoltMessage(input, genId);
 
-            _pendingWrites.put(boltMsg);
+            _pendingWrites.putBoltMsg(boltMsg);
         } catch(InterruptedException e) {
             String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
             throw new RuntimeException("Error during multilang processing " + processInfo, e);
@@ -211,7 +214,7 @@ public class ShellBolt implements IBolt {
         if(shellMsg.getTask() == 0) {
             List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
             if (shellMsg.areTaskIdsNeeded()) {
-                _pendingTaskIds.put(outtasks);
+                _pendingWrites.putTaskIds(outtasks);
             }
         } else {
             _collector.emitDirect((int) shellMsg.getTask(),
@@ -373,17 +376,13 @@ public class ShellBolt implements IBolt {
                         sendHeartbeatFlag.compareAndSet(true, false);
                     }
 
-                    List<Integer> taskIds = _pendingTaskIds.peek();
-                    if (taskIds != null) {
-                        taskIds = _pendingTaskIds.poll();
-                        _process.writeTaskIds(taskIds);
-                        continue;
-                    }
-
-                    BoltMsg write = _pendingWrites.peek();
-                    if (write != null) {
-                        write = _pendingWrites.poll();
-                        _process.writeBoltMsg(write);
+                    Object write = _pendingWrites.poll(1, SECONDS);
+                    if (write instanceof BoltMsg) {
+                        _process.writeBoltMsg((BoltMsg) write);
+                    } else if (write instanceof List<?>) {
+                        _process.writeTaskIds((List<Integer>)write);
+                    } else if (write != null) {
+                        throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
                     }
                 } catch (Throwable t) {
                     die(t);

http://git-wip-us.apache.org/repos/asf/storm/blob/56dc7b9d/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
new file mode 100644
index 0000000..b633bc5
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A data structure for ShellBolt which includes two queues (FIFO),
+ * which one is for task ids (unbounded), another one is for bolt msg (bounded).
+ */
+public class ShellBoltMessageQueue implements Serializable {
+    private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>();
+    private final LinkedBlockingQueue<BoltMsg> boltMsgQueue;
+
+    private final ReentrantLock takeLock = new ReentrantLock();
+    private final Condition notEmpty = takeLock.newCondition();
+
+    public ShellBoltMessageQueue(int boltMsgCapacity) {
+        if (boltMsgCapacity <= 0) {
+            throw new IllegalArgumentException();
+        }
+        this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity);
+    }
+
+    public ShellBoltMessageQueue() {
+        this(Integer.MAX_VALUE);
+    }
+
+    /**
+     * put list of task id to its queue
+     * @param taskIds task ids that received the tuples
+     */
+    public void putTaskIds(List<Integer> taskIds) {
+        taskIdsQueue.add(taskIds);
+        takeLock.lock();
+        try {
+            notEmpty.signal();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+    /**
+     * put bolt message to its queue
+     * @param boltMsg BoltMsg to pass to subprocess
+     * @throws InterruptedException
+     */
+    public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException {
+        boltMsgQueue.put(boltMsg);
+        takeLock.lock();
+        try {
+            notEmpty.signal();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+    /**
+     * poll() is a core feature of ShellBoltMessageQueue.
+     * It retrieves and removes the head of one queues, waiting up to the
+     * specified wait time if necessary for an element to become available.
+     * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue.
+     *
+     * @param timeout how long to wait before giving up, in units of unit
+     * @param unit a TimeUnit determining how to interpret the timeout parameter
+     * @return List\<Integer\> if task id is available,
+     * BoltMsg if task id is not available but bolt message is available,
+     * null if the specified waiting time elapses before an element is available.
+     * @throws InterruptedException
+     */
+    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
+        takeLock.lockInterruptibly();
+        long nanos = unit.toNanos(timeout);
+        try {
+            // wait for available queue
+            while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) {
+                if (nanos <= 0) {
+                    return null;
+                }
+                nanos = notEmpty.awaitNanos(nanos);
+            }
+
+            // taskIds first
+            List<Integer> taskIds = taskIdsQueue.peek();
+            if (taskIds != null) {
+                taskIds = taskIdsQueue.poll();
+                return taskIds;
+            }
+
+            // boltMsgQueue should have at least one entry at the moment
+            return boltMsgQueue.poll();
+        } finally {
+            takeLock.unlock();
+        }
+    }
+
+}