You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/03 17:27:52 UTC

[5/9] incubator-quarks git commit: tidy up

tidy up

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/cde26788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/cde26788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/cde26788

Branch: refs/heads/master
Commit: cde26788a61a707bc9eea845cf909d75c5c9905f
Parents: 734f146
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 15:47:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 15:47:35 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 72 ++++----------------
 .../java/quarks/test/topology/PlumbingTest.java | 30 ++++----
 2 files changed, 29 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 554fd39..ac75103 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -25,12 +25,10 @@ import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import quarks.function.Function;
-import quarks.function.UnaryOperator;
 import quarks.oplet.plumbing.Barrier;
 import quarks.oplet.plumbing.Isolate;
 import quarks.oplet.plumbing.PressureReliever;
@@ -445,32 +443,18 @@ public class PlumbingStreams {
       Objects.requireNonNull(pipelines, "pipelines");
       Objects.requireNonNull(combiner, "combiner");
       
-      // INITIAL IMPL TO GET STARTED - validate interface and test
-      // explore an impl with no new oplets
-      //
-      // A few impl options exist.  Some with feedback loop to control stepping,
-      // some without.  Feedback is OK for single JVM case, less so for
-      // multi-JVM/distributed case.
-      // 
-      // Some impls with special oplets some that avoid them.
-      //
       // Summary of what's below:
-      // feedback loop and no new oplets:
-      //                      |-> isolate -> p1 -> map.toPair |
-      // stream -> map.gate =>|-> isolate -> p2 -> map.toPair |-> union -> map.Collector -> combiner 
-      //                      |-> isolate -> p3 -> map.toPair |
-      //                                      . . .
-      //
-      
-      // Add a gate.  This keeps all pipelines working lock-step.
-      // It also provides the guarantee needed by gatedBarrier below.
-      Semaphore gateSemaphore = new Semaphore(1);
-      stream = gate(stream, gateSemaphore).tag("concurrent.gate");
+      //           |-> isolate(1) -> p1 -> |
+      // stream -> |-> isolate(1) -> p2 -> |-> barrier(10) -> combiner 
+      //           |-> isolate(1) -> p3 -> |
+      //                . . .
+
+      int barrierQueueCapacity = 10;  // don't preclude pipelines from getting ahead some.
       
-      // Add parallel fanout - with the gate the queue size really doesn't matter
-      List<TStream<T>> fanouts = parallelFanout(stream, pipelines.size(), 1);
-      for (int i = 0; i < fanouts.size(); i++) 
-        fanouts.get(i).tag("concurrent.isolated-ch"+i);
+      // Add concurrent (isolated) fanouts
+      List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
+      for (int i = 0; i < pipelines.size(); i++)
+        fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
       
       // Add pipelines
       List<TStream<U>> results = new ArrayList<>(pipelines.size());
@@ -481,43 +465,11 @@ public class PlumbingStreams {
       }
       
       // Add the barrier
-      // TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
-      TStream<List<U>> barrier = barrier(results).tag("concurrent.barrier");
-      
-      // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
-      
-      // Add peek() to signal ok to begin next tuple
-      barrier = barrier.peek(tuple -> { gateSemaphore.release(); }).tag("concurrent.gateRelease");
+      TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
       
-      // Add the combiner to the topology
+      // Add the combiner
       return combiner.apply(barrier);
     }
-    
-    private static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
-      return stream.map(tuple -> { 
-          try {
-            semaphore.acquire();
-            return tuple;
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("interrupted", e);
-          }});
-    }
-    
-    private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, int queueCapacity) {
-      // TODO want an ExecutorService to enable nThreads for mPipelines.
-      // i.e., not use "isolate", or need a way to create a collection of
-      // related "isolate" that use an executor
-      return parallelFanout(stream, numFanouts, inStream -> PlumbingStreams.isolate(inStream, queueCapacity));
-    }
-    
-    private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, UnaryOperator<TStream<T>> isolator) {
-      List<TStream<T>> fanouts = new ArrayList<>(numFanouts);
-      for (int i = 0; i < numFanouts; i++) {
-        fanouts.add(isolator.apply(stream));
-      }
-      return fanouts;
-    }
 
     /**
      * A tuple synchronization barrier.

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f147685..677467a 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -317,13 +316,18 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
     public void testConcurrentMap() throws Exception {
         Topology top = newTopology("testConcurrentMap");
         
-        Function<Integer,JsonObject> a1 = fakeAnalytic(0, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a2 = fakeAnalytic(1, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a3 = fakeAnalytic(2, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a4 = fakeAnalytic(3, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a5 = fakeAnalytic(4, 100, TimeUnit.MILLISECONDS);
+        int ch = 0;
+        List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        // a couple much faster just in case something's amiss with queues
+        mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
         
-        List<Function<Integer,JsonObject>> mappers = new ArrayList<>(Arrays.asList(a1, a2, a3, a4, a5));
         Function<List<JsonObject>,Integer> combiner = list -> {
             int sum = 0;
             int cnt = 0;
@@ -372,13 +376,13 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         Topology top = newTopology("testConcurrent");
         
         int ch = 0;
-        Function<TStream<Integer>,TStream<JsonObject>> p1 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p2 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p3 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p4 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p5 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
         
-        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>(Arrays.asList(p1, p2, p3, p4, p5));
         Function<List<JsonObject>,Integer> tupleCombiner = list -> {
             int sum = 0;
             int cnt = 0;