You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/10 12:13:57 UTC

[1/4] incubator-quarks git commit: Add Gate plumbing

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 87cff0e43 -> edd17a331


Add Gate plumbing


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/b41664db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/b41664db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/b41664db

Branch: refs/heads/master
Commit: b41664db2b561e101b16211a063e74f7671475c2
Parents: fa48c5f
Author: cazen.lee <ca...@samsung.com>
Authored: Sun May 8 14:47:32 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Sun May 8 14:47:32 2016 +0900

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 57 +++++++++++++++++++-
 1 file changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/b41664db/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index ad6367a..a9f4c1a 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -532,5 +533,57 @@ public class PlumbingStreams {
       AtomicInteger cnt = new AtomicInteger();
       return tuple -> cnt.getAndIncrement() % width;
     }
- 
-}
+    /**
+     * Control the flow of tuples to an output stream.
+     * <P>
+     * A {@link Semaphore}
+     * is used to control the flow of tuples
+     * through the {@code gate}
+     * . The gate acquires a permit from the
+     * semaphore to pass the tuple through, blocking until a permit is
+     * acquired (and applying backpressure upstream while blocked).
+     * Elsewhere, some code calls {@link Semaphore#release(int)}
+     * to make permits available.
+     * </P><P>
+     * If a TopologyProvider is used that can distribute a topology's
+     * streams to different JVM's the gate and the code releasing the
+     * permits must be in the same JVM.
+     * </P><P>
+     * Sample use:
+     * <BR>
+     * Suppose you wanted to control processing such that concurrent
+     * pipelines processed each tuple in lock-step.
+     * I.e., You want all of the pipelines to start processing a tuple
+     * at the same time and not start a new tuple until the current
+     * tuple had been fully processed by each of them:
+     * <pre>{@code
+     * TStream<Integer> readings = ...;
+     * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
+     * TStream<Integer> gated = gate(readings, gateControl);
+     * // Create the concurrent pipeline combiner and have it
+     * // signal that concurrent processing of the tuple has completed.
+     * // In this sample the combiner just returns the received list of
+     * // each pipeline result.
+     * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
+     * stream -> stream.map(
+     * list -> { * gateControl.release(); * return list; * }
+     * );
+     * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
+     * }</pre>
+     * </P>
+     * @param stream the input stream
+     * @param semaphore gate control
+     * @return gated stream
+     */
+    public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
+        return stream.map(tuple -> {
+            try {
+                semaphore.acquire();
+                return tuple;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("interrupted", e);
+            }
+        });
+    }
+}
\ No newline at end of file


[2/4] incubator-quarks git commit: Add testcase

Posted by dl...@apache.org.
Add testcase


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/f38780ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/f38780ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/f38780ce

Branch: refs/heads/master
Commit: f38780ce2bedb6296cf8643be169c43f0b3a5d25
Parents: b41664d
Author: cazen.lee <ca...@samsung.com>
Authored: Sun May 8 14:48:56 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Sun May 8 14:48:56 2016 +0900

----------------------------------------------------------------------
 .../java/quarks/test/topology/PlumbingTest.java | 65 +++++++++++++++++++-
 1 file changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/f38780ce/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 92885de..8064263 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -601,4 +602,66 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
 //            actDuration < 0.5 * expMinSerialDuration);
 //    }
 
-}
+    @Test
+    public void testGate() throws Exception {
+        // Timing variances on shared machines can cause this test to fail
+        assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+        Topology topology = newTopology("testGate");
+
+        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+        Semaphore semaphore = new Semaphore(1);
+        raw = PlumbingStreams.gate(raw, semaphore);
+
+        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+        ArrayList<Integer> arrayResult = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            arrayResult.add(0);
+            arrayResult.add(1);
+        }
+
+        raw.sink(t -> {
+            //Add 0 to list because semaphore.acquire() in sync has occurred
+            resultAvailablePermits.add(semaphore.availablePermits());
+            semaphore.release();
+            //Add 1 to list because semaphore.release() has executed
+            resultAvailablePermits.add(semaphore.availablePermits());
+        });
+
+        Condition<List<String>> contents = topology.getTester()
+            .streamContents(raw, "a", "b", "c", "d", "e");
+        complete(topology, contents);
+
+        assertTrue("valid:" + contents.getResult(), contents.valid());
+        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+    }
+
+    @Test
+    public void testGateWithLocking() throws Exception {
+        // Timing variances on shared machines can cause this test to fail
+        assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
+        Topology topology = newTopology("testGateWithLocking");
+
+        TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
+
+        Semaphore semaphore = new Semaphore(3);
+        raw = PlumbingStreams.gate(raw, semaphore);
+
+        ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
+        ArrayList<Integer> arrayResult = new ArrayList<>();
+        arrayResult.add(2);
+        arrayResult.add(1);
+        arrayResult.add(0);
+
+        raw.sink(t -> {
+            //Add number of availablePermits
+            resultAvailablePermits.add(semaphore.availablePermits());
+        });
+
+        Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
+        complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
+
+        assertTrue("valid:" + contents.getResult(), contents.valid());
+        assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
+    }
+}
\ No newline at end of file


[4/4] incubator-quarks git commit: Merge pull request #109

Posted by dl...@apache.org.
Merge pull request #109

This closes #109


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/edd17a33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/edd17a33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/edd17a33

Branch: refs/heads/master
Commit: edd17a331f27d342b468e62cb96aae39da830c4c
Parents: 87cff0e 8389ad7
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 10 08:13:44 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 10 08:13:44 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 57 +++++++++++++++++-
 .../java/quarks/test/topology/PlumbingTest.java | 61 +++++++++++++++++++-
 2 files changed, 115 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/edd17a33/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/edd17a33/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------


[3/4] incubator-quarks git commit: remove condition check for timing variances failure

Posted by dl...@apache.org.
remove condition check for timing variances failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/8389ad73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/8389ad73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/8389ad73

Branch: refs/heads/master
Commit: 8389ad73cbea27670e4e9ade96a56ed24236decb
Parents: f38780c
Author: Cazen Lee <Ca...@korea.com>
Authored: Mon May 9 23:29:34 2016 +0900
Committer: Cazen Lee <Ca...@korea.com>
Committed: Mon May 9 23:29:34 2016 +0900

----------------------------------------------------------------------
 .../src/test/java/quarks/test/topology/PlumbingTest.java         | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8389ad73/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 8064263..e12e3ed 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -604,8 +604,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
 
     @Test
     public void testGate() throws Exception {
-        // Timing variances on shared machines can cause this test to fail
-        assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
         Topology topology = newTopology("testGate");
 
         TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
@@ -638,8 +636,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
 
     @Test
     public void testGateWithLocking() throws Exception {
-        // Timing variances on shared machines can cause this test to fail
-        assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
         Topology topology = newTopology("testGateWithLocking");
 
         TStream<String> raw = topology.strings("a", "b", "c", "d", "e");