You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/10 12:13:58 UTC
[2/4] incubator-quarks git commit: Add testcase
Add testcase
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/f38780ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/f38780ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/f38780ce
Branch: refs/heads/master
Commit: f38780ce2bedb6296cf8643be169c43f0b3a5d25
Parents: b41664d
Author: cazen.lee <ca...@samsung.com>
Authored: Sun May 8 14:48:56 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Sun May 8 14:48:56 2016 +0900
----------------------------------------------------------------------
.../java/quarks/test/topology/PlumbingTest.java | 65 +++++++++++++++++++-
1 file changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/f38780ce/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 92885de..8064263 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -601,4 +602,66 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
// actDuration < 0.5 * expMinSerialDuration);
// }
-}
+ @Test
+ public void testGate() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+ Topology topology = newTopology("testGate");
+
+ TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+ Semaphore semaphore = new Semaphore(1);
+ raw = PlumbingStreams.gate(raw, semaphore);
+
+ ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+ ArrayList<Integer> arrayResult = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ arrayResult.add(0);
+ arrayResult.add(1);
+ }
+
+ raw.sink(t -> {
+ //Add 0 to list because semaphore.acquire() in sync has occurred
+ resultAvailablePermits.add(semaphore.availablePermits());
+ semaphore.release();
+ //Add 1 to list because semaphore.release() has executed
+ resultAvailablePermits.add(semaphore.availablePermits());
+ });
+
+ Condition<List<String>> contents = topology.getTester()
+ .streamContents(raw, "a", "b", "c", "d", "e");
+ complete(topology, contents);
+
+ assertTrue("valid:" + contents.getResult(), contents.valid());
+ assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+ }
+
+ @Test
+ public void testGateWithLocking() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+ Topology topology = newTopology("testGateWithLocking");
+
+ TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+ Semaphore semaphore = new Semaphore(3);
+ raw = PlumbingStreams.gate(raw, semaphore);
+
+ ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+ ArrayList<Integer> arrayResult = new ArrayList<>();
+ arrayResult.add(2);
+ arrayResult.add(1);
+ arrayResult.add(0);
+
+ raw.sink(t -> {
+ //Add number of availablePermits
+ resultAvailablePermits.add(semaphore.availablePermits());
+ });
+
+ Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
+ complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
+
+ assertTrue("valid:" + contents.getResult(), contents.valid());
+ assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+ }
+}
\ No newline at end of file