You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/10 12:13:58 UTC

[2/4] incubator-quarks git commit: Add testcase

Add testcase


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/f38780ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/f38780ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/f38780ce

Branch: refs/heads/master
Commit: f38780ce2bedb6296cf8643be169c43f0b3a5d25
Parents: b41664d
Author: cazen.lee <ca...@samsung.com>
Authored: Sun May 8 14:48:56 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Sun May 8 14:48:56 2016 +0900

----------------------------------------------------------------------
 .../java/quarks/test/topology/PlumbingTest.java | 65 +++++++++++++++++++-
 1 file changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/f38780ce/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 92885de..8064263 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -601,4 +602,66 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
 //            actDuration < 0.5 * expMinSerialDuration);
 //    }
 
-}
+    @Test
+    public void testGate() throws Exception {
+        // Timing variances on shared machines can cause this test to fail
+        assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+        Topology topology = newTopology("testGate");
+
+        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+        Semaphore semaphore = new Semaphore(1);
+        raw = PlumbingStreams.gate(raw, semaphore);
+
+        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+        ArrayList<Integer> arrayResult = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            arrayResult.add(0);
+            arrayResult.add(1);
+        }
+
+        raw.sink(t -> {
+            //Add 0 to list because semaphore.acquire() in sync has occurred
+            resultAvailablePermits.add(semaphore.availablePermits());
+            semaphore.release();
+            //Add 1 to list because semaphore.release() has executed
+            resultAvailablePermits.add(semaphore.availablePermits());
+        });
+
+        Condition<List<String>> contents = topology.getTester()
+            .streamContents(raw, "a", "b", "c", "d", "e");
+        complete(topology, contents);
+
+        assertTrue("valid:" + contents.getResult(), contents.valid());
+        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+    }
+
+    @Test
+    public void testGateWithLocking() throws Exception {
+        // Timing variances on shared machines can cause this test to fail
+        assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+        Topology topology = newTopology("testGateWithLocking");
+
+        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+        Semaphore semaphore = new Semaphore(3);
+        raw = PlumbingStreams.gate(raw, semaphore);
+
+        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+        ArrayList<Integer> arrayResult = new ArrayList<>();
+        arrayResult.add(2);
+        arrayResult.add(1);
+        arrayResult.add(0);
+
+        raw.sink(t -> {
+            //Add number of availablePermits
+            resultAvailablePermits.add(semaphore.availablePermits());
+        });
+
+        Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
+        complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
+
+        assertTrue("valid:" + contents.getResult(), contents.valid());
+        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+    }
+}
\ No newline at end of file