You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/02 16:44:48 UTC
[5/8] storm git commit: STORM-756 Add unit tests of
ShellBoltMessageQueue
STORM-756 Add unit tests of ShellBoltMessageQueue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/117e8186
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/117e8186
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/117e8186
Branch: refs/heads/master
Commit: 117e81861485ab02d4b97de725309132fce3f2a1
Parents: 56dc7b9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Nov 24 19:14:05 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Nov 24 19:14:05 2015 +0900
----------------------------------------------------------------------
.../storm/utils/ShellBoltMessageQueueTest.java | 67 ++++++++++++++++++++
1 file changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/117e8186/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
new file mode 100644
index 0000000..1dcbb03
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java
@@ -0,0 +1,67 @@
+package backtype.storm.utils;
+
+import backtype.storm.multilang.BoltMsg;
+import com.google.common.collect.Lists;
+import junit.framework.TestCase;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ShellBoltMessageQueueTest extends TestCase {
+ @Test
+ public void testPollTaskIdsFirst() throws InterruptedException {
+ ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+ // put bolt message first, then put task ids
+ queue.putBoltMsg(new BoltMsg());
+ ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+ queue.putTaskIds(taskIds);
+
+ Object msg = queue.poll(10, TimeUnit.SECONDS);
+
+ // task ids should be pulled first
+ assertTrue(msg instanceof List<?>);
+ assertEquals(msg, taskIds);
+ }
+
+ @Test
+ public void testPollWhileThereAreNoDataAvailable() throws InterruptedException {
+ ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+
+ long start = System.currentTimeMillis();
+ Object msg = queue.poll(1, TimeUnit.SECONDS);
+ long finish = System.currentTimeMillis();
+
+ assertNull(msg);
+ assertTrue(finish - start > 1000);
+ }
+
+ @Test
+ public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException {
+ final ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
+ final List<Integer> taskIds = Lists.newArrayList(1, 2, 3);
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // NOOP
+ }
+
+ queue.putTaskIds(taskIds);
+ }
+ });
+ t.start();
+
+ long start = System.currentTimeMillis();
+ Object msg = queue.poll(10, TimeUnit.SECONDS);
+ long finish = System.currentTimeMillis();
+
+ assertEquals(msg, taskIds);
+ assertTrue(finish - start < (10 * 1000));
+ }
+}