You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/10 12:13:57 UTC
[1/4] incubator-quarks git commit: Add Gate plumbing
Repository: incubator-quarks
Updated Branches:
refs/heads/master 87cff0e43 -> edd17a331
Add Gate plumbing
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/b41664db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/b41664db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/b41664db
Branch: refs/heads/master
Commit: b41664db2b561e101b16211a063e74f7671475c2
Parents: fa48c5f
Author: cazen.lee <ca...@samsung.com>
Authored: Sun May 8 14:47:32 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Sun May 8 14:47:32 2016 +0900
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 57 +++++++++++++++++++-
1 file changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/b41664db/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index ad6367a..a9f4c1a 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -532,5 +533,57 @@ public class PlumbingStreams {
AtomicInteger cnt = new AtomicInteger();
return tuple -> cnt.getAndIncrement() % width;
}
-
-}
+ /**
+ * Control the flow of tuples to an output stream.
+ * <P>
+ * A {@link Semaphore}
+ * is used to control the flow of tuples
+ * through the {@code gate}
+ * . The gate acquires a permit from the
+ * semaphore to pass the tuple through, blocking until a permit is
+ * acquired (and applying backpressure upstream while blocked).
+ * Elsewhere, some code calls {@link Semaphore#release(int)}
+ * to make permits available.
+ * </P><P>
+ * If a TopologyProvider is used that can distribute a topology's
+ * streams to different JVM's the gate and the code releasing the
+ * permits must be in the same JVM.
+ * </P><P>
+ * Sample use:
+ * <BR>
+ * Suppose you wanted to control processing such that concurrent
+ * pipelines processed each tuple in lock-step.
+ * I.e., You want all of the pipelines to start processing a tuple
+ * at the same time and not start a new tuple until the current
+ * tuple had been fully processed by each of them:
+ * <pre>{@code
+ * TStream<Integer> readings = ...;
+ * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
+ * TStream<Integer> gated = gate(readings, gateControl);
+ * // Create the concurrent pipeline combiner and have it
+ * // signal that concurrent processing of the tuple has completed.
+ * // In this sample the combiner just returns the received list of
+ * // each pipeline result.
+ * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
+ * stream -> stream.map(
+ * list -> { * gateControl.release(); * return list; * }
+ * );
+ * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
+ * }</pre>
+ * </P>
+ * @param stream the input stream
+ * @param semaphore gate control
+ * @return gated stream
+ */
+ public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
+ return stream.map(tuple -> {
+ try {
+ semaphore.acquire();
+ return tuple;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("interrupted", e);
+ }
+ });
+ }
+}
\ No newline at end of file
[2/4] incubator-quarks git commit: Add testcase
Posted by dl...@apache.org.
Add testcase
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/f38780ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/f38780ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/f38780ce
Branch: refs/heads/master
Commit: f38780ce2bedb6296cf8643be169c43f0b3a5d25
Parents: b41664d
Author: cazen.lee <ca...@samsung.com>
Authored: Sun May 8 14:48:56 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Sun May 8 14:48:56 2016 +0900
----------------------------------------------------------------------
.../java/quarks/test/topology/PlumbingTest.java | 65 +++++++++++++++++++-
1 file changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/f38780ce/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 92885de..8064263 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -601,4 +602,66 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
// actDuration < 0.5 * expMinSerialDuration);
// }
-}
+ @Test
+ public void testGate() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+ Topology topology = newTopology("testGate");
+
+ TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+ Semaphore semaphore = new Semaphore(1);
+ raw = PlumbingStreams.gate(raw, semaphore);
+
+ ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+ ArrayList<Integer> arrayResult = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ arrayResult.add(0);
+ arrayResult.add(1);
+ }
+
+ raw.sink(t -> {
+ //Add 0 to list because semaphore.acquire() in sync has occurred
+ resultAvailablePermits.add(semaphore.availablePermits());
+ semaphore.release();
+ //Add 1 to list because semaphore.release() has executed
+ resultAvailablePermits.add(semaphore.availablePermits());
+ });
+
+ Condition<List<String>> contents = topology.getTester()
+ .streamContents(raw, "a", "b", "c", "d", "e");
+ complete(topology, contents);
+
+ assertTrue("valid:" + contents.getResult(), contents.valid());
+ assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+ }
+
+ @Test
+ public void testGateWithLocking() throws Exception {
+ // Timing variances on shared machines can cause this test to fail
+ assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+ Topology topology = newTopology("testGateWithLocking");
+
+ TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+ Semaphore semaphore = new Semaphore(3);
+ raw = PlumbingStreams.gate(raw, semaphore);
+
+ ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+ ArrayList<Integer> arrayResult = new ArrayList<>();
+ arrayResult.add(2);
+ arrayResult.add(1);
+ arrayResult.add(0);
+
+ raw.sink(t -> {
+ //Add number of availablePermits
+ resultAvailablePermits.add(semaphore.availablePermits());
+ });
+
+ Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
+ complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
+
+ assertTrue("valid:" + contents.getResult(), contents.valid());
+ assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+ }
+}
\ No newline at end of file
[4/4] incubator-quarks git commit: Merge pull request #109
Posted by dl...@apache.org.
Merge pull request #109
This closes #109
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/edd17a33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/edd17a33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/edd17a33
Branch: refs/heads/master
Commit: edd17a331f27d342b468e62cb96aae39da830c4c
Parents: 87cff0e 8389ad7
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 10 08:13:44 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 10 08:13:44 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 57 +++++++++++++++++-
.../java/quarks/test/topology/PlumbingTest.java | 61 +++++++++++++++++++-
2 files changed, 115 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/edd17a33/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/edd17a33/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
[3/4] incubator-quarks git commit: remove condition check for timing
variances failure
Posted by dl...@apache.org.
remove condition check for timing variances failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/8389ad73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/8389ad73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/8389ad73
Branch: refs/heads/master
Commit: 8389ad73cbea27670e4e9ade96a56ed24236decb
Parents: f38780c
Author: Cazen Lee <Ca...@korea.com>
Authored: Mon May 9 23:29:34 2016 +0900
Committer: Cazen Lee <Ca...@korea.com>
Committed: Mon May 9 23:29:34 2016 +0900
----------------------------------------------------------------------
.../src/test/java/quarks/test/topology/PlumbingTest.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8389ad73/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 8064263..e12e3ed 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -604,8 +604,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
@Test
public void testGate() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
Topology topology = newTopology("testGate");
TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
@@ -638,8 +636,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
@Test
public void testGateWithLocking() throws Exception {
- // Timing variances on shared machines can cause this test to fail
- assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
Topology topology = newTopology("testGateWithLocking");
TStream<String> raw = topology.strings("a", "b", "c", "d", "e");