You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/02 16:44:48 UTC

[5/8] storm git commit: STORM-756 Add unit tests of ShellBoltMessageQueue

STORM-756 Add unit tests of ShellBoltMessageQueue


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/117e8186
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/117e8186
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/117e8186

Branch: refs/heads/master
Commit: 117e81861485ab02d4b97de725309132fce3f2a1
Parents: 56dc7b9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 24 19:14:05 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 24 19:14:05 2015 +0900

----------------------------------------------------------------------
 .../storm/utils/ShellBoltMessageQueueTest.java  | 67 ++++++++++++++++++++
 1 file changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/117e8186/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
new file mode 100644
index 0000000..1dcbb03
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
@@ -0,0 +1,67 @@
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+import com.google.common.collect.Lists;
+import junit.framework.TestCase;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ShellBoltMessageQueueTest extends TestCase {
+    @Test
+    public void testPollTaskIdsFirst() throws InterruptedException {
+        ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+        // put bolt message first, then put task ids
+        queue.putBoltMsg(new BoltMsg());
+        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+        queue.putTaskIds(taskIds);
+
+        Object msg = queue.poll(10, TimeUnit.SECONDS);
+
+        // task ids should be pulled first
+        assertTrue(msg instanceof List<?>);
+        assertEquals(msg, taskIds);
+    }
+
+    @Test
+    public void testPollWhileThereAreNoDataAvailable() throws InterruptedException {
+        ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+        long start = System.currentTimeMillis();
+        Object msg = queue.poll(1, TimeUnit.SECONDS);
+        long finish = System.currentTimeMillis();
+
+        assertNull(msg);
+        assertTrue(finish - start > 1000);
+    }
+
+    @Test
+    public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException {
+        final ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+        final List<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // NOOP
+                }
+
+                queue.putTaskIds(taskIds);
+            }
+        });
+        t.start();
+
+        long start = System.currentTimeMillis();
+        Object msg = queue.poll(10, TimeUnit.SECONDS);
+        long finish = System.currentTimeMillis();
+
+        assertEquals(msg, taskIds);
+        assertTrue(finish - start < (10 * 1000));
+    }
+}