You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/03 17:27:48 UTC

[1/9] incubator-quarks git commit: Initial (explority) impl and tests

Repository: incubator-quarks
Updated Branches:
  refs/heads/master c0e59a14e -> 6f26b5b4e


Initial (explority) impl and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/ee28532a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/ee28532a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/ee28532a

Branch: refs/heads/master
Commit: ee28532a50779536c114d1b315c23ab80baba6a7
Parents: a929f0d
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Thu Apr 28 16:47:30 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:48:10 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/oplet/plumbing/Isolate.java     |  19 +-
 .../topology/plumbing/PlumbingStreams.java      | 262 ++++++++++++++++++-
 .../java/quarks/test/topology/PlumbingTest.java | 133 ++++++++++
 3 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ee28532a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
index 37d7466..b62f7b4 100644
--- a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
@@ -37,7 +37,24 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
     private static final long serialVersionUID = 1L;
     
     private Thread thread;
-    private LinkedBlockingQueue<T> tuples = new LinkedBlockingQueue<>();
+    private final LinkedBlockingQueue<T> tuples;
+    
+    /**
+     * Create a new Isolate oplet.
+     * <BR>
+     * Same as Isolate(Integer.MAX_VALUE).
+     */
+    public Isolate() {
+      this(Integer.MAX_VALUE);
+    }
+    
+    /**
+     * Create a new Isolate oplet.
+     * @param queueCapacity {@link #accept()} blocks when this capacity is reached
+     */
+    public Isolate(int queueCapacity) {
+      tuples = new LinkedBlockingQueue<>(queueCapacity);
+    }
     
     @Override
     public void initialize(OpletContext<T, T> context) {

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ee28532a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index a18411c..d03c0d4 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -18,10 +18,21 @@ under the License.
 */
 package quarks.topology.plumbing;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import quarks.function.Function;
+import quarks.function.UnaryOperator;
 import quarks.oplet.plumbing.Isolate;
 import quarks.oplet.plumbing.PressureReliever;
 import quarks.oplet.plumbing.UnorderedIsolate;
@@ -207,6 +218,7 @@ public class PlumbingStreams {
      * 
      * @param <T> Tuple type.
      * @param <K> Key type.
+     * @see #isolate(TStream, int) isolate
      */
     public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
         return stream.pipe(new PressureReliever<>(count, keyFunction));
@@ -230,6 +242,27 @@ public class PlumbingStreams {
     }
     
     /**
+     * Isolate upstream processing from downstream processing.
+     * <P>
+     * If the processing against the returned stream cannot keep up
+     * with the arrival rate of tuples on {@code stream}, upstream
+     * processing will block until there is space in the queue between
+     * the streams.
+     * </P><P>
+     * Processing of tuples occurs in the order they were received.
+     * </P>
+     *  
+     * @param stream Stream to be isolated from downstream processing.
+     * @param queueCapacity size of the queue between {@code stream} and
+     *        the returned stream.
+     * @return Stream that is isolated from {@code stream}.
+     * @see #pressureReliever(TStream, Function, int) pressureReliever
+     */
+    public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
+      return stream.pipe(new Isolate<T>(queueCapacity));
+    }
+    
+    /**
      * Perform analytics concurrently.
      * <P>
      * Process input tuples one at at time, invoking the specified
@@ -286,7 +319,79 @@ public class PlumbingStreams {
      * @return result stream
      */
     public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
-      throw new IllegalStateException("NYI / TODO");
+      Objects.requireNonNull(stream, "stream");
+      Objects.requireNonNull(mappers, "mappers");
+      Objects.requireNonNull(combiner, "combiner");
+      
+      List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+      for (Function<T,U> mapper : mappers) {
+        pipelines.add(s -> s.map(mapper));
+      }
+      
+      return concurrent(stream, pipelines, s -> s.map(combiner));
+    }
+    
+    // Q: is there any value to this implementation approach?  Or just dispose of it?
+    @SuppressWarnings("unused")
+    private static <T,U,R> TStream<R> concurrentMapSingleOp(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
+      Objects.requireNonNull(stream, "stream");
+      Objects.requireNonNull(mappers, "mappers");
+      Objects.requireNonNull(combiner, "combiner");
+      
+      // INITIAL IMPL TO GET STARTED - validate interface and test
+      // explore an impl with no new oplets
+      //
+      // This is the most lightweight impl possible wrt no intermediate streams
+      // i.e., all of the processing is handled within a single injected map()
+      // 
+      // TODO: want to create ExecutorService using provider's ThreadFactory service.
+      //       Can't get RuntimeServicesSupplier from a stream.
+      //
+      // Note, we impose this "non-null mapper result" requirement so as
+      // to enable alternative implementations that might be burdened if
+      // null results were allowed.
+      // The implementation below could easily handle null results w/o
+      // losing synchronization, with the combiner needing to deal with
+      // a null result in the list it's given.
+      
+      AtomicReference<ExecutorService> executorRef = new AtomicReference<>();
+      
+      return stream.map(tuple -> {
+        if (executorRef.get() == null) {
+          executorRef.compareAndSet(null, Executors.newFixedThreadPool(Math.min(mappers.size(), 20)));
+        }
+        ExecutorService executor = executorRef.get();
+        List<U> results = new ArrayList<>(Collections.nCopies(mappers.size(), null));
+        List<Future<?>> futures = new ArrayList<>(mappers.size());
+
+        // Submit a task for each mapper invocation
+        int ch = 0;
+        for (Function<T,U> mapper : mappers) {
+          final int resultIndx = ch++;
+          Future<?> future = executor.submit(() -> {
+            U result = mapper.apply(tuple);
+            if (result == null)
+              throw new IllegalStateException("mapper index "+resultIndx+" returned null");
+            results.set(resultIndx, result); 
+          });
+          futures.add(future);
+        }
+        // Await completion of all
+        for (Future<?> future : futures) {
+          try {
+            future.get();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("mapper interrupted", e);
+          } catch (Exception e) {
+            throw new RuntimeException("mapper threw", e);
+          }
+        }
+        // Run the combiner
+        R result = combiner.apply(results);
+        return result;
+      });
+      
     }
 
     /**
@@ -348,6 +453,159 @@ public class PlumbingStreams {
      * @return result stream
      */
     public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
-      throw new IllegalStateException("NYI / TODO");
+      Objects.requireNonNull(stream, "stream");
+      Objects.requireNonNull(pipelines, "pipelines");
+      Objects.requireNonNull(combiner, "combiner");
+      
+      // INITIAL IMPL TO GET STARTED - validate interface and test
+      // explore an impl with no new oplets
+      //
+      // A few impl options exist.  Some with feedback loop to control stepping,
+      // some without.  Feedback is OK for single JVM case, less so for
+      // multi-JVM/distributed case.
+      // 
+      // Some impls with special oplets some that avoid them.
+      //
+      // Summary of what's below:
+      // feedback loop and no new oplets:
+      //                      |-> isolate -> p1 -> map.toPair |
+      // stream -> map.gate =>|-> isolate -> p2 -> map.toPair |-> union -> map.Collector -> combiner 
+      //                      |-> isolate -> p3 -> map.toPair |
+      //                                      . . .
+      //
+      
+      // Add a gate.  This keeps all pipelines working lock-step.
+      // It also provides the guarantee needed by gatedBarrier below.
+      Semaphore gateSemaphore = new Semaphore(1);
+      stream = gate(stream, gateSemaphore).tag("concurrent.gate");
+      
+      // Add parallel fanout - with the gate the queue size really doesn't matter
+      List<TStream<T>> fanouts = parallelFanout(stream, pipelines.size(), 1);
+      for (int i = 0; i < fanouts.size(); i++) 
+        fanouts.get(i).tag("concurrent.isolated-ch"+i);
+      
+      // Add pipelines
+      List<TStream<U>> results = new ArrayList<>(pipelines.size());
+      int ch = 0;
+      for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
+        results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
+        ch++;
+      }
+      
+      // Add the barrier
+      TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
+      
+      // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
+      
+      // Add peek() to signal ok to begin next tuple
+      barrier = barrier.peek(tuple -> { gateSemaphore.release(); }).tag("concurrent.gateRelease");
+      
+      // Add the combiner to the topology
+      return combiner.apply(barrier);
+    }
+    
+    private static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
+      return stream.map(tuple -> { 
+          try {
+            semaphore.acquire();
+            return tuple;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("interrupted", e);
+          }});
+    }
+    
+    private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, int queueCapacity) {
+      // TODO want an ExecutorService to enable nThreads for mPipelines.
+      // i.e., not use "isolate", or need a way to create a collection of
+      // related "isolate" that use an executor
+      return parallelFanout(stream, numFanouts, inStream -> PlumbingStreams.isolate(inStream, queueCapacity));
+    }
+    
+    private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, UnaryOperator<TStream<T>> isolator) {
+      List<TStream<T>> fanouts = new ArrayList<>(numFanouts);
+      for (int i = 0; i < numFanouts; i++) {
+        fanouts.add(isolator.apply(stream));
+      }
+      return fanouts;
+    }
+    
+    /**
+     * Add a barrier that collects corresponding tuples from each input stream.
+     * <P>
+     * "GatedBarrier" is a special implementation that only works because
+     * its caller guarantees that one tuple will be received from each stream
+     * before a second tuple is received from any of the streams.
+     * </P><P>
+     * The result tuple is a list of input tuples, one from each
+     * input stream, at the same index as it's input stream.  i.e., result[0]
+     * is the tuple from streams[0], result[1] is the tuple from streams[1],
+     * and so on.
+     * </P><P>
+     * The operation waits indefinitely for a tuple on each input stream
+     * to be received.
+     * </P><P>
+     * TODO remove this when we have a barrier oplet.
+     * </P>  
+     * 
+     * @param <T> Tuple type
+     * @param streams streams to perform the barrier on
+     * @return stream whose tuples are each a list of tuples.
+     */
+    private static <T> TStream<List<T>> gatedBarrier(List<TStream<T>> streams) {
+      // TODO really want a multi-iport oplet for the fanin/barrier.
+      // Hack for now by using union but adding per-pipeline map() 
+      // that creates a Pair containing the inputPortId and result,
+      // so a following map can collect them into List<U> tuple.
+      //
+      // streams[0] -> map.toPair |
+      // streams[1] -> map.toPair |-> union -> map.Collector
+      // streams[2] -> map.toPair |
+      //  ...
+      
+      // Add the barrier per-pipeline "to-pair" map()
+      List<TStream<Pair<Integer,T>>> chPairStreams = new ArrayList<>(streams.size());
+      int ch = 0;
+      for (TStream<T> stream : streams) {
+        final int finalCh = ch;
+        chPairStreams.add(stream.map(u -> new Pair<Integer,T>(finalCh, u)).tag("barrier.toPair-ch"+finalCh));
+        ch++;
+      }
+      
+      // Add the barrier "fanin" union()
+      TStream<Pair<Integer,T>> union = chPairStreams.get(0).union(new HashSet<>(chPairStreams)).tag("barrier.union");
+      
+      // union = union.peek(pair -> System.out.println("concurrent.barrier.union pair<ch,U> "+pair));
+      
+      // Add the barrier collector map()
+      AtomicInteger barrierCnt = new AtomicInteger();
+      AtomicReference<List<T>> barrierChResults = new AtomicReference<>();
+      
+      TStream<List<T>> barrier = union.map(pair -> {
+          List<T> chResults = barrierChResults.get();
+          if (chResults == null) {
+            chResults = new ArrayList<>(Collections.nCopies(streams.size(), null));
+            barrierChResults.set(chResults);
+          }
+          if (chResults.get(pair.k) != null)
+              throw new IllegalStateException("caller violation: barrier port "+pair.k+" already has a tuple");
+          chResults.set(pair.k, pair.v);
+          
+          if (barrierCnt.incrementAndGet() < chResults.size())
+            return null;
+          
+          barrierCnt.set(0);
+          barrierChResults.set(null);
+          
+          return chResults;
+        });
+      
+      // keep the threads associated with the supply streams isolated
+      // from any downstream processing.  this is needed here because this impl
+      // doesn't have queuing/isolation on its feeding streams.
+      barrier = PlumbingStreams.isolate(barrier, 1);
+     
+      return barrier;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ee28532a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 86c10de..f147685 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -32,6 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import com.google.gson.JsonObject;
+
+import quarks.function.Function;
 import quarks.function.Functions;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
@@ -289,5 +294,133 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         complete(top, count);
         assertTrue(contents.getResult().toString(), contents.valid());
     }
+    
+    private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
+      return value -> { 
+        try {
+          Thread.sleep(unit.toMillis(period));
+          JsonObject jo = new JsonObject();
+          jo.addProperty("channel", channel);
+          jo.addProperty("result", value);
+          return jo;
+        } catch (InterruptedException e) {
+          throw new RuntimeException("channel="+channel+" interrupted", e);
+        }
+      };
+    }
+
+    private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
+      return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
+    }
+    
+    @Test
+    public void testConcurrentMap() throws Exception {
+        Topology top = newTopology("testConcurrentMap");
+        
+        Function<Integer,JsonObject> a1 = fakeAnalytic(0, 100, TimeUnit.MILLISECONDS);
+        Function<Integer,JsonObject> a2 = fakeAnalytic(1, 100, TimeUnit.MILLISECONDS);
+        Function<Integer,JsonObject> a3 = fakeAnalytic(2, 100, TimeUnit.MILLISECONDS);
+        Function<Integer,JsonObject> a4 = fakeAnalytic(3, 100, TimeUnit.MILLISECONDS);
+        Function<Integer,JsonObject> a5 = fakeAnalytic(4, 100, TimeUnit.MILLISECONDS);
+        
+        List<Function<Integer,JsonObject>> mappers = new ArrayList<>(Arrays.asList(a1, a2, a3, a4, a5));
+        Function<List<JsonObject>,Integer> combiner = list -> {
+            int sum = 0;
+            int cnt = 0;
+            System.out.println("combiner: "+list);
+            for(JsonObject jo : list) {
+              assertEquals(cnt++, jo.getAsJsonPrimitive("channel").getAsInt());
+              sum += jo.getAsJsonPrimitive("result").getAsInt();
+            }
+            return sum;
+        };
+
+        TStream<Integer> values = top.of(1, 2, 3);
+        Integer[] resultTuples = new Integer[]{
+            1*mappers.size(),
+            2*mappers.size(),
+            3*mappers.size(),
+        };
+        
+        TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
+        
+        Condition<Long> count = top.getTester().tupleCount(result, 3);
+        Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        
+        long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
+        long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+        
+        System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check
+        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+            actDuration < 0.5 * expMinSerialDuration);
+        
+        // a tighter performance check
+        assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration, 
+            actDuration <= expMaxDuration);
+    }
+    
+    @Test
+    public void testConcurrent() throws Exception {
+        Topology top = newTopology("testConcurrent");
+        
+        int ch = 0;
+        Function<TStream<Integer>,TStream<JsonObject>> p1 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        Function<TStream<Integer>,TStream<JsonObject>> p2 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        Function<TStream<Integer>,TStream<JsonObject>> p3 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        Function<TStream<Integer>,TStream<JsonObject>> p4 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        Function<TStream<Integer>,TStream<JsonObject>> p5 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        
+        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>(Arrays.asList(p1, p2, p3, p4, p5));
+        Function<List<JsonObject>,Integer> tupleCombiner = list -> {
+            int sum = 0;
+            int cnt = 0;
+            System.out.println("combiner: "+list);
+            for(JsonObject jo : list) {
+              assertEquals(cnt++, jo.getAsJsonPrimitive("channel").getAsInt());
+              sum += jo.getAsJsonPrimitive("result").getAsInt();
+            }
+            return sum;
+        };
+        Function<TStream<List<JsonObject>>,TStream<Integer>> combiner = stream -> stream.map(tupleCombiner);
+        
+        TStream<Integer> values = top.of(1, 2, 3);
+        Integer[] resultTuples = new Integer[]{
+            1*pipelines.size(),
+            2*pipelines.size(),
+            3*pipelines.size(),
+        };
+        
+        TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
+        
+        Condition<Long> count = top.getTester().tupleCount(result, 3);
+        Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        
+        long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
+        long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+        
+        System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check
+        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+            actDuration < 0.5 * expMinSerialDuration);
+        
+        // a tighter performance check
+        assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration, 
+            actDuration <= expMaxDuration);
+    }
 
 }


[3/9] incubator-quarks git commit: proposed API for comment

Posted by dl...@apache.org.
proposed API for comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/95c72478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/95c72478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/95c72478

Branch: refs/heads/master
Commit: 95c724789e2a3b399dfce6a2c97b30cbee8a15fa
Parents: 410b68c
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Apr 26 13:50:27 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:48:10 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 59 ++++++++++++++++++++
 1 file changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/95c72478/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 02cba06..0e4e5e9 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -18,6 +18,7 @@ under the License.
 */
 package quarks.topology.plumbing;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import quarks.function.Function;
@@ -25,6 +26,7 @@ import quarks.oplet.plumbing.Isolate;
 import quarks.oplet.plumbing.PressureReliever;
 import quarks.oplet.plumbing.UnorderedIsolate;
 import quarks.topology.TStream;
+import quarks.topology.TopologyProvider;
 
 /**
  * Plumbing utilities for {@link TStream}.
@@ -215,5 +217,62 @@ public class PlumbingStreams {
         return stream.pipe(
                 ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
     }
+    
+    /**
+     * Perform analytics concurrently.
+     * <P>
+     * Process input tuples one at at time, invoking the specified
+     * analytics ({@code mappers}) concurrently, combine the results,
+     * and then process the next input tuple in the same manner.
+     * </P><P>
+     * Logically, instead of doing this:
+     * <pre>{@code
+     * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+     * }</pre>
+     * create a graph that's logically like this:
+     * <pre>{@code
+     * - 
+     *                       /->  A1  ->\
+     * sensorReadings<T> ->  |->  A2   ->| -> result<R>
+     *                       \->  A3  ->/
+     * }</pre>
+     * </P><P>
+     * The typical use case for this is when an application has a collection
+     * of independent analytics to perform on each tuple and the analytics
+     * are sufficiently long running such that performing them concurrently
+     * is desired.
+     * </P><P>
+     * Note, this is in contrast to "parallel" stream processing,
+     * which in Java8 Streams and other contexts means processing multiple
+     * tuples in parallel, each on a replicated processing pipeline.
+     * </P><P>
+     * Threadsafety - one of the following must be true:
+     * <ul>
+     * <li>the tuples from {@code stream} are threadsafe</li>
+     * <li>the {@code mappers} do not modify the input tuples</li>
+     * <li>the {@code mappers} provide their own synchronization controls
+     *     to protect concurrent modifications of the input tuples</li>
+     * </ul>
+     * </P><P>
+     * Logically, a thread is allocated for each of the {@code mappers}.
+     * The actual degree of concurrency may be {@link TopologyProvider} dependent.
+     * </P>
+     * 
+     * @param <T> Tuple type on input stream.
+     * @param <U> Tuple type generated by mappers.
+     * @param <R> Tuple type of the result.
+     * 
+     * @param stream input stream
+     * @param mappers functions to be run concurrently
+     * @param combiner function to create a result tuple from the list of
+     *                 results from {@code mappers}.  
+     *                 The input list order is 1:1 with the {@code mappers} list.
+     *                 I.e., list entry [0] is the result from mappers[0],
+     *                 list entry [1] is the result from mappers[1], etc.
+     * @return result stream
+     */
+    public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
+      throw new IllegalStateException("NYI / TODO");
+    }
 
 }


[8/9] incubator-quarks git commit: change "combiner" from a stream function to a tuple function

Posted by dl...@apache.org.
change "combiner" from a stream function to a tuple function

I think the stream function was an unnecessary complication/burden
on the caller.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/d4d1a1d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/d4d1a1d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/d4d1a1d0

Branch: refs/heads/master
Commit: d4d1a1d0f696c96e2c8d3303f2a46c70874edef8
Parents: 8fab576
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 2 08:53:47 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon May 2 08:53:47 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/topology/plumbing/PlumbingStreams.java  | 12 ++++++------
 .../test/java/quarks/test/topology/PlumbingTest.java    |  3 +--
 2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index f799dbe..c816eee 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -256,7 +256,7 @@ public class PlumbingStreams {
      * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
      * for (Function<T,U> mapper : mappers)
      *   pipelines.add(s -> s.map(mapper));
-     * concurrent(stream, pipelines, s -> s.map(combiner));
+     * concurrent(stream, pipelines, combiner);
      * }</pre>
      * </P>
      * 
@@ -286,7 +286,7 @@ public class PlumbingStreams {
         pipelines.add(s -> s.map(mapper));
       }
       
-      return concurrent(stream, pipelines, s -> s.map(combiner));
+      return concurrent(stream, pipelines, combiner);
     }
 
     /**
@@ -349,15 +349,15 @@ public class PlumbingStreams {
      *                 For each input tuple, a pipeline MUST create exactly one output tuple.
      *                 Tuple flow into the pipelines will cease if that requirement
      *                 is not met.
-     * @param combiner a function that creates a result stream from a stream
-     *                 whose tuples are the list of each pipeline's result.
+     * @param combiner function to create a result tuple from the list of
+     *                 results from {@code pipelines}.
      *                 The input tuple list's order is 1:1 with the {@code pipelines} list.
      *                 I.e., list entry [0] is the result from pipelines[0],
      *                 list entry [1] is the result from pipelines[1], etc.
      * @return result stream
      * @see #barrier(List, int) barrier
      */
-    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
+    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
       Objects.requireNonNull(stream, "stream");
       Objects.requireNonNull(pipelines, "pipelines");
       Objects.requireNonNull(combiner, "combiner");
@@ -381,7 +381,7 @@ public class PlumbingStreams {
       TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
       
       // Add the combiner
-      return combiner.apply(barrier);
+      return barrier.map(combiner);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 677467a..856f7ea 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -383,7 +383,7 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
         pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
         
-        Function<List<JsonObject>,Integer> tupleCombiner = list -> {
+        Function<List<JsonObject>,Integer> combiner = list -> {
             int sum = 0;
             int cnt = 0;
             System.out.println("combiner: "+list);
@@ -393,7 +393,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
             }
             return sum;
         };
-        Function<TStream<List<JsonObject>>,TStream<Integer>> combiner = stream -> stream.map(tupleCombiner);
         
         TStream<Integer> values = top.of(1, 2, 3);
         Integer[] resultTuples = new Integer[]{


[7/9] incubator-quarks git commit: doc tweak

Posted by dl...@apache.org.
doc tweak

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/8fab5768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/8fab5768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/8fab5768

Branch: refs/heads/master
Commit: 8fab5768c01780599aad5208633de0e25893dbe4
Parents: 5a520e4
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Sun May 1 11:21:04 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Sun May 1 11:21:04 2016 -0400

----------------------------------------------------------------------
 .../quarks/topology/plumbing/PlumbingStreams.java    | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8fab5768/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index a267d00..f799dbe 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -306,6 +306,15 @@ public class PlumbingStreams {
      *                      |-> A1pipeline ->|
      * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
      *                      |-> A3pipeline ->|
+     * 
+     * }</pre>
+     * more specifically a graph like this:
+     * <pre>{@code
+     * -
+     *           |-> isolate(1) -> pipeline1 -> |
+     * stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner 
+     *           |-> isolate(1) -> pipeline3 -> |
+     *                . . .
      * }</pre>
      * </P><P>
      * The typical use case for this is when an application has a collection
@@ -353,12 +362,6 @@ public class PlumbingStreams {
       Objects.requireNonNull(pipelines, "pipelines");
       Objects.requireNonNull(combiner, "combiner");
       
-      // Summary of what's below:
-      //           |-> isolate(1) -> p1 -> |
-      // stream -> |-> isolate(1) -> p2 -> |-> barrier(10) -> combiner 
-      //           |-> isolate(1) -> p3 -> |
-      //                . . .
-
       int barrierQueueCapacity = 10;  // don't preclude pipelines from getting ahead some.
       
       // Add concurrent (isolated) fanouts


[9/9] incubator-quarks git commit: Merge pull request #99

Posted by dl...@apache.org.
Merge pull request #99

This closes #99


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/6f26b5b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/6f26b5b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/6f26b5b4

Branch: refs/heads/master
Commit: 6f26b5b4e715d794dba13c65b876eed49f34cc64
Parents: c0e59a1 d4d1a1d
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 3 11:27:33 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 11:27:33 2016 -0400

----------------------------------------------------------------------
 .../src/main/java/quarks/oplet/core/FanIn.java  | 114 ++++++++++
 .../src/main/java/quarks/oplet/core/Union.java  |  19 +-
 .../java/quarks/oplet/plumbing/Barrier.java     | 113 ++++++++++
 .../java/quarks/oplet/plumbing/Isolate.java     |  28 ++-
 .../src/main/java/quarks/topology/TStream.java  |  17 ++
 .../topology/plumbing/PlumbingStreams.java      | 213 ++++++++++++++++++-
 .../java/quarks/test/topology/PlumbingTest.java | 136 ++++++++++++
 .../topology/spi/graph/ConnectorStream.java     |  30 ++-
 8 files changed, 651 insertions(+), 19 deletions(-)
----------------------------------------------------------------------



[2/9] incubator-quarks git commit: add stream-based concurrent(), clarify non-null tuple result requirement

Posted by dl...@apache.org.
add stream-based concurrent(), clarify non-null tuple result requirement


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/a929f0d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/a929f0d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/a929f0d6

Branch: refs/heads/master
Commit: a929f0d6861fc0f8336616ed8217a174dc23c2b4
Parents: 95c7247
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed Apr 27 15:07:46 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:48:10 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 87 ++++++++++++++++++--
 1 file changed, 81 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a929f0d6/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 0e4e5e9..a18411c 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -34,7 +34,18 @@ import quarks.topology.TopologyProvider;
  * but are not part of the logic of the application.
  */
 public class PlumbingStreams {
-    
+  
+    // Use apache.math3.Pair ?
+    private static class Pair<K,V> { 
+      K k;
+      V v;
+      Pair(K k, V v) {
+        this.k = k;
+        this.v = v;
+      }
+      public String toString() { return "k="+k+" v="+v; };
+    };
+  
     /**
      * Insert a blocking delay between tuples.
      * Returned stream is the input stream delayed by {@code delay}.
@@ -232,9 +243,9 @@ public class PlumbingStreams {
      * create a graph that's logically like this:
      * <pre>{@code
      * - 
-     *                       /->  A1  ->\
-     * sensorReadings<T> ->  |->  A2   ->| -> result<R>
-     *                       \->  A3  ->/
+     *                      |->  A1  ->|
+     * sensorReadings<T> -> |->  A2  ->| -> result<R>
+     *                      |->  A3  ->|
      * }</pre>
      * </P><P>
      * The typical use case for this is when an application has a collection
@@ -263,9 +274,12 @@ public class PlumbingStreams {
      * @param <R> Tuple type of the result.
      * 
      * @param stream input stream
-     * @param mappers functions to be run concurrently
+     * @param mappers functions to be run concurrently.  Each mapper MUST
+     *                 return a non-null result.
+     *                 A runtime error will be generated if a null result
+     *                 is returned.
      * @param combiner function to create a result tuple from the list of
-     *                 results from {@code mappers}.  
+     *                 results from {@code mappers}.
      *                 The input list order is 1:1 with the {@code mappers} list.
      *                 I.e., list entry [0] is the result from mappers[0],
      *                 list entry [1] is the result from mappers[1], etc.
@@ -275,4 +289,65 @@ public class PlumbingStreams {
       throw new IllegalStateException("NYI / TODO");
     }
 
+    /**
+     * Perform analytics concurrently.
+     * <P>
+     * Process input tuples one at at time, invoking the specified
+     * analytics ({@code pipelines}) concurrently, combine the results,
+     * and then process the next input tuple in the same manner.
+     * </P><P>
+     * Logically, instead of doing this:
+     * <pre>{@code
+     * sensorReadings<T> -> A1pipeline -> A2pipeline -> A3pipeline -> results<R>
+     * }</pre>
+     * create a graph that's logically like this:
+     * <pre>{@code
+     * - 
+     *                      |->  A1pipeline  ->|
+     * sensorReadings<T> -> |->  A2pipeline  ->| -> result<R>
+     *                      |->  A3pipeline  ->|
+     * }</pre>
+     * </P><P>
+     * The typical use case for this is when an application has a collection
+     * of independent analytics to perform on each tuple and the analytics
+     * are sufficiently long running such that performing them concurrently
+     * is desired.
+     * </P><P>
+     * Note, this is in contrast to "parallel" stream processing,
+     * which in Java8 Streams and other contexts means processing multiple
+     * tuples in parallel, each on a replicated processing pipeline.
+     * </P><P>
+     * Threadsafety - one of the following must be true:
+     * <ul>
+     * <li>the tuples from {@code stream} are threadsafe</li>
+     * <li>the {@code pipelines} do not modify the input tuples</li>
+     * <li>the {@code pipelines} provide their own synchronization controls
+     *     to protect concurrent modifications of the input tuples</li>
+     * </ul>
+     * </P><P>
+     * Logically, a thread is allocated for each of the {@code pipelines}.
+     * The actual degree of concurrency may be {@link TopologyProvider} dependent.
+     * </P>
+     * 
+     * @param <T> Tuple type on input stream.
+     * @param <U> Tuple type generated by pipelines.
+     * @param <R> Tuple type of the result.
+     * 
+     * @param stream input stream
+     * @param pipelines a list of functions to add a pipeline to the topology.
+     *                 Each {@code pipeline.apply()} is called with {@code stream}
+     *                 as the input, yielding the pipeline's result stream.
+     *                 For each input tuple, a pipeline MUST create exactly one output tuple.
+     *                 Tuple flow into the pipelines will cease if that requirement
+     *                 is not met.
+     * @param combiner a function that creates a result stream from a stream
+     *                 whose tuples are the list of each pipeline's result.
+     *                 The input tuple list's order is 1:1 with the {@code pipelines} list.
+     *                 I.e., list entry [0] is the result from pipelines[0],
+     *                 list entry [1] is the result from pipelines[1], etc.
+     * @return result stream
+     */
+    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
+      throw new IllegalStateException("NYI / TODO");
+    }
 }


[4/9] incubator-quarks git commit: Add TStream.fanin(), FanIn and Barrier oplets

Posted by dl...@apache.org.
Add TStream.fanin(), FanIn and Barrier oplets

Expose Barrier via PlumbingStreams.barrier().
Update concurrent() to use barrier.
Remove Runnable from Isolate's interface (its an implementation detail)

FanIn behavior is driven by a specified tuple receiver BiFunction.
Barrier is a type of FanIn.
Union is a type of FanIn with a trivial tuple receiver function.

Chose to add the more constrained TStream.fanin(FanIn,others) instead of
a ~wide-open op(Oplet, others).  Those can be added later as needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/734f1463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/734f1463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/734f1463

Branch: refs/heads/master
Commit: 734f146376120f0b4b30811ff66caf182c22493e
Parents: ee28532
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 12:39:36 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:50:29 2016 -0400

----------------------------------------------------------------------
 .../src/main/java/quarks/oplet/core/FanIn.java  | 114 +++++++++++++++++
 .../src/main/java/quarks/oplet/core/Union.java  |  19 ++-
 .../java/quarks/oplet/plumbing/Barrier.java     | 113 +++++++++++++++++
 .../java/quarks/oplet/plumbing/Isolate.java     |  11 +-
 .../src/main/java/quarks/topology/TStream.java  |  17 +++
 .../topology/plumbing/PlumbingStreams.java      | 124 ++++++-------------
 .../topology/spi/graph/ConnectorStream.java     |  30 ++++-
 7 files changed, 325 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/core/FanIn.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/core/FanIn.java b/api/oplet/src/main/java/quarks/oplet/core/FanIn.java
new file mode 100644
index 0000000..b1c5f48
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/core/FanIn.java
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import quarks.function.BiFunction;
+import quarks.function.Consumer;
+import quarks.oplet.OpletContext;
+
+/**
+ * FanIn oplet, merges multiple input ports into a single output port.
+ * <P>
+ * For each tuple received, {@code receiver.apply(T tuple, Integer index)}
+ * is called. {@code index} is the tuple's input stream's index, where
+ * {@code this} is index 0 followed by {@code others} in their order.
+ * {@code receiver} either returns a tuple to emit on the output
+ * stream or null.
+ * </P> 
+ */
+public class FanIn<T,U> extends AbstractOplet<T, U> {
+    private BiFunction<T, Integer, U> receiver;
+    private List<Consumer<T>> iportConsumers;
+    private Consumer<U> destination;
+    
+    public FanIn() {
+    }
+    
+    public FanIn(BiFunction<T, Integer, U> receiver) {
+      this.receiver = receiver;
+    }
+
+    @Override
+    public void initialize(OpletContext<T, U> context) {
+        super.initialize(context);
+        destination = context.getOutputs().get(0);
+       
+        // Create a consumer for each input port.
+        int numIports = getOpletContext().getInputCount();
+        if (iportConsumers == null) {
+          // each iport invokes the receiver
+          iportConsumers = new ArrayList<>(numIports);
+          for (int i = 0; i < numIports; i++)
+            iportConsumers.add(consumer(i));
+          iportConsumers = Collections.unmodifiableList(iportConsumers);
+        }
+    }
+    
+    /**
+     * Set the receiver function.  Must be called no later than as part
+     * of {@link #initialize(OpletContext)}.
+     * @param receiver
+     */
+    protected void setReceiver(BiFunction<T, Integer, U> receiver) {
+      this.receiver = receiver;
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public List<? extends Consumer<T>> getInputs() {
+      return iportConsumers;
+    }
+
+    /**
+     * Create a Consumer for the input port that invokes the
+     * receiver and submits a generated tuple, if any, to the output.
+     * @param iportIndex
+     * @return the Consumer
+     */
+    protected Consumer<T> consumer(int iportIndex) {
+      return tuple -> { 
+        U result = receiver.apply(tuple, iportIndex);
+        if (result != null)
+          submit(result);
+      };
+    }
+
+    protected Consumer<U> getDestination() {
+        return destination;
+    }
+    
+    /**
+     * Submit a tuple to single output.
+     * @param tuple Tuple to be submitted.
+     */
+    protected void submit(U tuple) {
+        getDestination().accept(tuple);
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/core/Union.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/core/Union.java b/api/oplet/src/main/java/quarks/oplet/core/Union.java
index c4ba423..e33257a 100644
--- a/api/oplet/src/main/java/quarks/oplet/core/Union.java
+++ b/api/oplet/src/main/java/quarks/oplet/core/Union.java
@@ -18,10 +18,7 @@ under the License.
 */
 package quarks.oplet.core;
 
-import java.util.Collections;
-import java.util.List;
-
-import quarks.function.Consumer;
+import quarks.oplet.OpletContext;
 
 /**
  * Union oplet, merges multiple input ports
@@ -30,19 +27,19 @@ import quarks.function.Consumer;
  * Processing for each input is identical
  * and just submits the tuple to the single output.
  */
-public final class Union<T> extends AbstractOplet<T, T> {
+public final class Union<T> extends FanIn<T, T> {
 
     @Override
     public void start() {
     }
 
-    /**
-     * For each input set the output directly to the only output.
-     */
+    
     @Override
-    public List<? extends Consumer<T>> getInputs() {
-        Consumer<T> output = getOpletContext().getOutputs().get(0);
-        return Collections.nCopies(getOpletContext().getInputCount(), output);
+    public void initialize(OpletContext<T, T> context) {
+      super.initialize(context);
+      
+      // forward every tuple to the output port
+      setReceiver((tuple, iportIndex) -> tuple);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java b/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java
new file mode 100644
index 0000000..d6f67b9
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java
@@ -0,0 +1,113 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.plumbing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import quarks.function.BiFunction;
+import quarks.oplet.OpletContext;
+import quarks.oplet.core.FanIn;
+
+/**
+ * A tuple synchronization barrier.
+ * <P>
+ * {@code Barrier} has n input ports with tuple type {@code T}
+ * and one output port with tuple type {@code List<T>}.
+ * Once the oplet receives one tuple on each of its input ports,
+ * it generates an output tuple containing one tuple from each input port.
+ * It then awaits receiving the next collection of tuples.
+ * Input port 0's tuple is in list[0], port 1's tuple in list[1], and so on.
+ * </P><P>
+ * Each input port has an associated queue of size {@code queueCapacity}.
+ * The input port's {@code Consumer<T>.accept()} will block if it's queue is full. 
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ */
+public class Barrier<T> extends FanIn<T, List<T>> {
+    
+    private final int queueCapacity;
+    private Thread thread;
+    private List<LinkedBlockingQueue<T>> iportQueues;
+    
+    /**
+     * Create a new instance.
+     * @param queueCapacity size of each input port's blocking queue
+     */
+    public Barrier(int queueCapacity) {
+      this.queueCapacity = queueCapacity;
+    }
+    
+    @Override
+    public void initialize(OpletContext<T, List<T>> context) {
+        super.initialize(context);
+        
+        thread = context.getService(ThreadFactory.class).newThread(() -> run());
+        
+        int numIports = getOpletContext().getInputCount();
+        iportQueues = new ArrayList<>(numIports);
+        for (int i = 0; i < numIports; i++)
+          iportQueues.add(new LinkedBlockingQueue<>(queueCapacity));
+        
+        setReceiver(receiver());
+    }
+   
+    @Override
+    public void start() {
+        thread.start();
+    }
+    
+    protected BiFunction<T,Integer,List<T>> receiver() {
+      return (tuple, iportIndex) -> {
+        accept(tuple, iportIndex);
+        return null;
+      };
+    }
+    
+    protected void accept(T tuple, int iportIndex) {
+      try {
+        iportQueues.get(iportIndex).put(tuple);
+      } catch(InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    private void run() {
+        while (!Thread.interrupted()) {
+            try {
+              List<T> list = new ArrayList<>(iportQueues.size());
+              for (LinkedBlockingQueue<T> iport : iportQueues) {
+                list.add(iport.take());
+              }
+              submit(list);
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+    }
+    
+    @Override
+    public void close() {
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
index b62f7b4..b32ab61 100644
--- a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
@@ -33,7 +33,7 @@ import quarks.oplet.core.Pipe;
  *
  * @param <T> Type of the tuple.
  */
-public class Isolate<T> extends Pipe<T,T> implements Runnable {
+public class Isolate<T> extends Pipe<T,T> {
     private static final long serialVersionUID = 1L;
     
     private Thread thread;
@@ -50,7 +50,9 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
     
     /**
      * Create a new Isolate oplet.
-     * @param queueCapacity {@link #accept()} blocks when this capacity is reached
+     * @param queueCapacity size of the queue between the input stream
+     *          and the output stream.
+     *          {@link #accept(Object) accept} blocks when the queue is full.
      */
     public Isolate(int queueCapacity) {
       tuples = new LinkedBlockingQueue<>(queueCapacity);
@@ -59,7 +61,7 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
     @Override
     public void initialize(OpletContext<T, T> context) {
         super.initialize(context);
-        thread = context.getService(ThreadFactory.class).newThread(this);
+        thread = context.getService(ThreadFactory.class).newThread(() -> run());
     }
    
     @Override
@@ -78,8 +80,7 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
         }      
     }
 
-    @Override
-    public void run() {
+    private void run() {
         while (!Thread.interrupted()) {
             try {
                 submit(tuples.take());

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/topology/src/main/java/quarks/topology/TStream.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/TStream.java b/api/topology/src/main/java/quarks/topology/TStream.java
index b230475..f4d1a44 100644
--- a/api/topology/src/main/java/quarks/topology/TStream.java
+++ b/api/topology/src/main/java/quarks/topology/TStream.java
@@ -30,6 +30,7 @@ import quarks.function.Function;
 import quarks.function.Predicate;
 import quarks.function.ToIntFunction;
 import quarks.function.UnaryOperator;
+import quarks.oplet.core.FanIn;
 import quarks.oplet.core.Pipe;
 import quarks.oplet.core.Sink;
 
@@ -300,6 +301,22 @@ public interface TStream<T> extends TopologyElement {
     <U> TStream<U> pipe(Pipe<T, U> pipe);
 
     /**
+     * Declare a stream that contains the output of the specified 
+     * {@link FanIn} oplet applied to this stream and {@code others}.
+     * 
+     * @param <U> Tuple type of the returned streams.
+     * @param fanin The {@link FanIn} oplet.
+     * @param others The other input streams. 
+     *        Must not be empty or contain duplicates or {@code this}
+     * 
+     * @return a stream that contains the tuples emitted by the oplet.
+     * @see #union(Set)
+     * @see #pipe(Pipe)
+     * @see #sink(Sink)
+     */
+    <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others);
+
+    /**
      * Declare a new stream that modifies each tuple from this stream into one
      * (or zero) tuple of the same type {@code T}. For each tuple {@code t}
      * on this stream, the returned stream will contain a tuple that is the

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index d03c0d4..554fd39 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -20,7 +20,6 @@ package quarks.topology.plumbing;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
@@ -28,11 +27,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import quarks.function.Function;
 import quarks.function.UnaryOperator;
+import quarks.oplet.plumbing.Barrier;
 import quarks.oplet.plumbing.Isolate;
 import quarks.oplet.plumbing.PressureReliever;
 import quarks.oplet.plumbing.UnorderedIsolate;
@@ -46,17 +45,6 @@ import quarks.topology.TopologyProvider;
  */
 public class PlumbingStreams {
   
-    // Use apache.math3.Pair ?
-    private static class Pair<K,V> { 
-      K k;
-      V v;
-      Pair(K k, V v) {
-        this.k = k;
-        this.v = v;
-      }
-      public String toString() { return "k="+k+" v="+v; };
-    };
-  
     /**
      * Insert a blocking delay between tuples.
      * Returned stream is the input stream delayed by {@code delay}.
@@ -493,7 +481,8 @@ public class PlumbingStreams {
       }
       
       // Add the barrier
-      TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
+      // TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
+      TStream<List<U>> barrier = barrier(results).tag("concurrent.barrier");
       
       // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
       
@@ -529,83 +518,48 @@ public class PlumbingStreams {
       }
       return fanouts;
     }
-    
+
+    /**
+     * A tuple synchronization barrier.
+     * <P>
+     * Same as {@code barrier(others, 1)}
+     * </P>
+     * @see #barrier(List, int)
+     */
+    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
+      return barrier(streams, 1);
+    }
+
     /**
-     * Add a barrier that collects corresponding tuples from each input stream.
+     * A tuple synchronization barrier.
      * <P>
-     * "GatedBarrier" is a special implementation that only works because
-     * its caller guarantees that one tuple will be received from each stream
-     * before a second tuple is received from any of the streams.
+     * A barrier has n input streams with tuple type {@code T}
+     * and one output stream with tuple type {@code List<T>}.
+     * Once the barrier receives one tuple on each of its input streams,
+     * it generates an output tuple containing one tuple from each input stream.
+     * It then waits until it has received another tuple from each input stream.
      * </P><P>
-     * The result tuple is a list of input tuples, one from each
-     * input stream, at the same index as it's input stream.  i.e., result[0]
-     * is the tuple from streams[0], result[1] is the tuple from streams[1],
-     * and so on.
+     * Input stream 0's tuple is in the output tuple's list[0],
+     * stream 1's tuple in list[1], and so on.
      * </P><P>
-     * The operation waits indefinitely for a tuple on each input stream
-     * to be received.
+     * The barrier's output stream is isolated from the input streams.
      * </P><P>
-     * TODO remove this when we have a barrier oplet.
-     * </P>  
+     * The barrier has a queue of size {@code queueCapacity} for each
+     * input stream.  When a tuple for an input stream is received it is
+     * added to its queue.  The stream will block if the queue is full.
+     * </P>
+     *
+     * @param <T> Type of the tuple.
      * 
-     * @param <T> Tuple type
-     * @param streams streams to perform the barrier on
-     * @return stream whose tuples are each a list of tuples.
+     * @param streams the list of input streams
+     * @param queueCapacity the size of each input stream's queue
+     * @return the output stream
+     * @see Barrier
      */
-    private static <T> TStream<List<T>> gatedBarrier(List<TStream<T>> streams) {
-      // TODO really want a multi-iport oplet for the fanin/barrier.
-      // Hack for now by using union but adding per-pipeline map() 
-      // that creates a Pair containing the inputPortId and result,
-      // so a following map can collect them into List<U> tuple.
-      //
-      // streams[0] -> map.toPair |
-      // streams[1] -> map.toPair |-> union -> map.Collector
-      // streams[2] -> map.toPair |
-      //  ...
-      
-      // Add the barrier per-pipeline "to-pair" map()
-      List<TStream<Pair<Integer,T>>> chPairStreams = new ArrayList<>(streams.size());
-      int ch = 0;
-      for (TStream<T> stream : streams) {
-        final int finalCh = ch;
-        chPairStreams.add(stream.map(u -> new Pair<Integer,T>(finalCh, u)).tag("barrier.toPair-ch"+finalCh));
-        ch++;
-      }
-      
-      // Add the barrier "fanin" union()
-      TStream<Pair<Integer,T>> union = chPairStreams.get(0).union(new HashSet<>(chPairStreams)).tag("barrier.union");
-      
-      // union = union.peek(pair -> System.out.println("concurrent.barrier.union pair<ch,U> "+pair));
-      
-      // Add the barrier collector map()
-      AtomicInteger barrierCnt = new AtomicInteger();
-      AtomicReference<List<T>> barrierChResults = new AtomicReference<>();
-      
-      TStream<List<T>> barrier = union.map(pair -> {
-          List<T> chResults = barrierChResults.get();
-          if (chResults == null) {
-            chResults = new ArrayList<>(Collections.nCopies(streams.size(), null));
-            barrierChResults.set(chResults);
-          }
-          if (chResults.get(pair.k) != null)
-              throw new IllegalStateException("caller violation: barrier port "+pair.k+" already has a tuple");
-          chResults.set(pair.k, pair.v);
-          
-          if (barrierCnt.incrementAndGet() < chResults.size())
-            return null;
-          
-          barrierCnt.set(0);
-          barrierChResults.set(null);
-          
-          return chResults;
-        });
-      
-      // keep the threads associated with the supply streams isolated
-      // from any downstream processing.  this is needed here because this impl
-      // doesn't have queuing/isolation on its feeding streams.
-      barrier = PlumbingStreams.isolate(barrier, 1);
-     
-      return barrier;
+    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
+      List<TStream<T>> others = new ArrayList<>(streams);
+      TStream<T> s1 = others.remove(0);
+      return s1.fanin(new Barrier<T>(queueCapacity), others);
     }
-
+ 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
----------------------------------------------------------------------
diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
index 7a71004..51068ff 100644
--- a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
+++ b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
@@ -41,6 +41,8 @@ import quarks.function.ToIntFunction;
 import quarks.graph.Connector;
 import quarks.graph.Graph;
 import quarks.graph.Vertex;
+import quarks.oplet.Oplet;
+import quarks.oplet.core.FanIn;
 import quarks.oplet.core.Pipe;
 import quarks.oplet.core.Sink;
 import quarks.oplet.core.Split;
@@ -63,8 +65,8 @@ import quarks.window.Windows;
 /**
  * A stream that directly adds oplets to the graph.
  *
- * @param <G>
- * @param <T>
+ * @param <G> topology type
+ * @param <T> tuple type
  */
 public class ConnectorStream<G extends Topology, T> extends AbstractTStream<G, T> {
 
@@ -160,6 +162,30 @@ public class ConnectorStream<G extends Topology, T> extends AbstractTStream<G, T
     }
 
     @Override
+    public <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others) {
+      if (others.isEmpty() || others.size() == 1 && others.contains(this)) 
+        throw new IllegalArgumentException("others");  // use pipe()
+      if (new HashSet<>(others).size() != others.size())
+        throw new IllegalArgumentException("others has dups");
+      
+      for (TStream<T> other : others)
+          verify(other);
+      
+      others = new ArrayList<>(others);
+      others.add(0, this);
+      
+      Vertex<Oplet<T,U>, T, U> fanInVertex = graph().insert(fanin, others.size(), 1);
+      int inputPort = 0;
+      for (TStream<T> other : others) {
+          @SuppressWarnings("unchecked")
+          ConnectorStream<G,T> cs = (ConnectorStream<G, T>) other;
+          cs.connector.connect(fanInVertex, inputPort++);
+      }
+          
+      return derived(fanInVertex.getConnectors().get(0));
+    }
+
+    @Override
     public <K> TWindow<T, K> last(int count, Function<T, K> keyFunction) {
         TWindowImpl<T, K> window = new TWindowImpl<T, K>(count, this, keyFunction);
         return window;


[6/9] incubator-quarks git commit: tidy up

Posted by dl...@apache.org.
tidy up

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/5a520e49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/5a520e49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/5a520e49

Branch: refs/heads/master
Commit: 5a520e49909f0371d53d2dec9ef3357a98b53911
Parents: cde2678
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 17:06:30 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 17:06:30 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 116 +++----------------
 1 file changed, 13 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/5a520e49/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index ac75103..a267d00 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -19,14 +19,9 @@ under the License.
 package quarks.topology.plumbing;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import quarks.function.Function;
 import quarks.oplet.plumbing.Barrier;
@@ -251,41 +246,18 @@ public class PlumbingStreams {
     /**
      * Perform analytics concurrently.
      * <P>
-     * Process input tuples one at at time, invoking the specified
-     * analytics ({@code mappers}) concurrently, combine the results,
-     * and then process the next input tuple in the same manner.
+     * This is a convenience function that calls
+     * {@link #concurrent(TStream, List, Function)} after
+     * creating {@code pipeline} and {@code combiner} functions
+     * from the supplied {@code mappers} and {@code combiner} arguments.
      * </P><P>
-     * Logically, instead of doing this:
+     * That is, it is logically, if not exactly, the same as:
      * <pre>{@code
-     * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+     * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+     * for (Function<T,U> mapper : mappers)
+     *   pipelines.add(s -> s.map(mapper));
+     * concurrent(stream, pipelines, s -> s.map(combiner));
      * }</pre>
-     * create a graph that's logically like this:
-     * <pre>{@code
-     * - 
-     *                      |->  A1  ->|
-     * sensorReadings<T> -> |->  A2  ->| -> result<R>
-     *                      |->  A3  ->|
-     * }</pre>
-     * </P><P>
-     * The typical use case for this is when an application has a collection
-     * of independent analytics to perform on each tuple and the analytics
-     * are sufficiently long running such that performing them concurrently
-     * is desired.
-     * </P><P>
-     * Note, this is in contrast to "parallel" stream processing,
-     * which in Java8 Streams and other contexts means processing multiple
-     * tuples in parallel, each on a replicated processing pipeline.
-     * </P><P>
-     * Threadsafety - one of the following must be true:
-     * <ul>
-     * <li>the tuples from {@code stream} are threadsafe</li>
-     * <li>the {@code mappers} do not modify the input tuples</li>
-     * <li>the {@code mappers} provide their own synchronization controls
-     *     to protect concurrent modifications of the input tuples</li>
-     * </ul>
-     * </P><P>
-     * Logically, a thread is allocated for each of the {@code mappers}.
-     * The actual degree of concurrency may be {@link TopologyProvider} dependent.
      * </P>
      * 
      * @param <T> Tuple type on input stream.
@@ -316,69 +288,6 @@ public class PlumbingStreams {
       
       return concurrent(stream, pipelines, s -> s.map(combiner));
     }
-    
-    // Q: is there any value to this implementation approach?  Or just dispose of it?
-    @SuppressWarnings("unused")
-    private static <T,U,R> TStream<R> concurrentMapSingleOp(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
-      Objects.requireNonNull(stream, "stream");
-      Objects.requireNonNull(mappers, "mappers");
-      Objects.requireNonNull(combiner, "combiner");
-      
-      // INITIAL IMPL TO GET STARTED - validate interface and test
-      // explore an impl with no new oplets
-      //
-      // This is the most lightweight impl possible wrt no intermediate streams
-      // i.e., all of the processing is handled within a single injected map()
-      // 
-      // TODO: want to create ExecutorService using provider's ThreadFactory service.
-      //       Can't get RuntimeServicesSupplier from a stream.
-      //
-      // Note, we impose this "non-null mapper result" requirement so as
-      // to enable alternative implementations that might be burdened if
-      // null results were allowed.
-      // The implementation below could easily handle null results w/o
-      // losing synchronization, with the combiner needing to deal with
-      // a null result in the list it's given.
-      
-      AtomicReference<ExecutorService> executorRef = new AtomicReference<>();
-      
-      return stream.map(tuple -> {
-        if (executorRef.get() == null) {
-          executorRef.compareAndSet(null, Executors.newFixedThreadPool(Math.min(mappers.size(), 20)));
-        }
-        ExecutorService executor = executorRef.get();
-        List<U> results = new ArrayList<>(Collections.nCopies(mappers.size(), null));
-        List<Future<?>> futures = new ArrayList<>(mappers.size());
-
-        // Submit a task for each mapper invocation
-        int ch = 0;
-        for (Function<T,U> mapper : mappers) {
-          final int resultIndx = ch++;
-          Future<?> future = executor.submit(() -> {
-            U result = mapper.apply(tuple);
-            if (result == null)
-              throw new IllegalStateException("mapper index "+resultIndx+" returned null");
-            results.set(resultIndx, result); 
-          });
-          futures.add(future);
-        }
-        // Await completion of all
-        for (Future<?> future : futures) {
-          try {
-            future.get();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("mapper interrupted", e);
-          } catch (Exception e) {
-            throw new RuntimeException("mapper threw", e);
-          }
-        }
-        // Run the combiner
-        R result = combiner.apply(results);
-        return result;
-      });
-      
-    }
 
     /**
      * Perform analytics concurrently.
@@ -394,9 +303,9 @@ public class PlumbingStreams {
      * create a graph that's logically like this:
      * <pre>{@code
      * - 
-     *                      |->  A1pipeline  ->|
-     * sensorReadings<T> -> |->  A2pipeline  ->| -> result<R>
-     *                      |->  A3pipeline  ->|
+     *                      |-> A1pipeline ->|
+     * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
+     *                      |-> A3pipeline ->|
      * }</pre>
      * </P><P>
      * The typical use case for this is when an application has a collection
@@ -437,6 +346,7 @@ public class PlumbingStreams {
      *                 I.e., list entry [0] is the result from pipelines[0],
      *                 list entry [1] is the result from pipelines[1], etc.
      * @return result stream
+     * @see #barrier(List, int) barrier
      */
     public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
       Objects.requireNonNull(stream, "stream");


[5/9] incubator-quarks git commit: tidy up

Posted by dl...@apache.org.
tidy up

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/cde26788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/cde26788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/cde26788

Branch: refs/heads/master
Commit: cde26788a61a707bc9eea845cf909d75c5c9905f
Parents: 734f146
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 15:47:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 15:47:35 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 72 ++++----------------
 .../java/quarks/test/topology/PlumbingTest.java | 30 ++++----
 2 files changed, 29 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 554fd39..ac75103 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -25,12 +25,10 @@ import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import quarks.function.Function;
-import quarks.function.UnaryOperator;
 import quarks.oplet.plumbing.Barrier;
 import quarks.oplet.plumbing.Isolate;
 import quarks.oplet.plumbing.PressureReliever;
@@ -445,32 +443,18 @@ public class PlumbingStreams {
       Objects.requireNonNull(pipelines, "pipelines");
       Objects.requireNonNull(combiner, "combiner");
       
-      // INITIAL IMPL TO GET STARTED - validate interface and test
-      // explore an impl with no new oplets
-      //
-      // A few impl options exist.  Some with feedback loop to control stepping,
-      // some without.  Feedback is OK for single JVM case, less so for
-      // multi-JVM/distributed case.
-      // 
-      // Some impls with special oplets some that avoid them.
-      //
       // Summary of what's below:
-      // feedback loop and no new oplets:
-      //                      |-> isolate -> p1 -> map.toPair |
-      // stream -> map.gate =>|-> isolate -> p2 -> map.toPair |-> union -> map.Collector -> combiner 
-      //                      |-> isolate -> p3 -> map.toPair |
-      //                                      . . .
-      //
-      
-      // Add a gate.  This keeps all pipelines working lock-step.
-      // It also provides the guarantee needed by gatedBarrier below.
-      Semaphore gateSemaphore = new Semaphore(1);
-      stream = gate(stream, gateSemaphore).tag("concurrent.gate");
+      //           |-> isolate(1) -> p1 -> |
+      // stream -> |-> isolate(1) -> p2 -> |-> barrier(10) -> combiner 
+      //           |-> isolate(1) -> p3 -> |
+      //                . . .
+
+      int barrierQueueCapacity = 10;  // don't preclude pipelines from getting ahead some.
       
-      // Add parallel fanout - with the gate the queue size really doesn't matter
-      List<TStream<T>> fanouts = parallelFanout(stream, pipelines.size(), 1);
-      for (int i = 0; i < fanouts.size(); i++) 
-        fanouts.get(i).tag("concurrent.isolated-ch"+i);
+      // Add concurrent (isolated) fanouts
+      List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
+      for (int i = 0; i < pipelines.size(); i++)
+        fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
       
       // Add pipelines
       List<TStream<U>> results = new ArrayList<>(pipelines.size());
@@ -481,43 +465,11 @@ public class PlumbingStreams {
       }
       
       // Add the barrier
-      // TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
-      TStream<List<U>> barrier = barrier(results).tag("concurrent.barrier");
-      
-      // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
-      
-      // Add peek() to signal ok to begin next tuple
-      barrier = barrier.peek(tuple -> { gateSemaphore.release(); }).tag("concurrent.gateRelease");
+      TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
       
-      // Add the combiner to the topology
+      // Add the combiner
       return combiner.apply(barrier);
     }
-    
-    private static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
-      return stream.map(tuple -> { 
-          try {
-            semaphore.acquire();
-            return tuple;
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("interrupted", e);
-          }});
-    }
-    
-    private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, int queueCapacity) {
-      // TODO want an ExecutorService to enable nThreads for mPipelines.
-      // i.e., not use "isolate", or need a way to create a collection of
-      // related "isolate" that use an executor
-      return parallelFanout(stream, numFanouts, inStream -> PlumbingStreams.isolate(inStream, queueCapacity));
-    }
-    
-    private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, UnaryOperator<TStream<T>> isolator) {
-      List<TStream<T>> fanouts = new ArrayList<>(numFanouts);
-      for (int i = 0; i < numFanouts; i++) {
-        fanouts.add(isolator.apply(stream));
-      }
-      return fanouts;
-    }
 
     /**
      * A tuple synchronization barrier.

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f147685..677467a 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -317,13 +316,18 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
     public void testConcurrentMap() throws Exception {
         Topology top = newTopology("testConcurrentMap");
         
-        Function<Integer,JsonObject> a1 = fakeAnalytic(0, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a2 = fakeAnalytic(1, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a3 = fakeAnalytic(2, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a4 = fakeAnalytic(3, 100, TimeUnit.MILLISECONDS);
-        Function<Integer,JsonObject> a5 = fakeAnalytic(4, 100, TimeUnit.MILLISECONDS);
+        int ch = 0;
+        List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+        // a couple much faster just in case something's amiss with queues
+        mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
+        mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
         
-        List<Function<Integer,JsonObject>> mappers = new ArrayList<>(Arrays.asList(a1, a2, a3, a4, a5));
         Function<List<JsonObject>,Integer> combiner = list -> {
             int sum = 0;
             int cnt = 0;
@@ -372,13 +376,13 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         Topology top = newTopology("testConcurrent");
         
         int ch = 0;
-        Function<TStream<Integer>,TStream<JsonObject>> p1 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p2 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p3 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p4 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
-        Function<TStream<Integer>,TStream<JsonObject>> p5 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+        pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
         
-        List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>(Arrays.asList(p1, p2, p3, p4, p5));
         Function<List<JsonObject>,Integer> tupleCombiner = list -> {
             int sum = 0;
             int cnt = 0;