You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/03 17:27:55 UTC

[8/9] incubator-quarks git commit: change "combiner" from a stream function to a tuple function

change "combiner" from a stream function to a tuple function

I think the stream function was an unnecessary complication/burden
on the caller.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/d4d1a1d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/d4d1a1d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/d4d1a1d0

Branch: refs/heads/master
Commit: d4d1a1d0f696c96e2c8d3303f2a46c70874edef8
Parents: 8fab576
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 2 08:53:47 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon May 2 08:53:47 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/topology/plumbing/PlumbingStreams.java  | 12 ++++++------
 .../test/java/quarks/test/topology/PlumbingTest.java    |  3 +--
 2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index f799dbe..c816eee 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -256,7 +256,7 @@ public class PlumbingStreams {
      * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
      * for (Function<T,U> mapper : mappers)
      *   pipelines.add(s -> s.map(mapper));
-     * concurrent(stream, pipelines, s -> s.map(combiner));
+     * concurrent(stream, pipelines, combiner);
      * }</pre>
      * </P>
      * 
@@ -286,7 +286,7 @@ public class PlumbingStreams {
         pipelines.add(s -> s.map(mapper));
       }
       
-      return concurrent(stream, pipelines, s -> s.map(combiner));
+      return concurrent(stream, pipelines, combiner);
     }
 
     /**
@@ -349,15 +349,15 @@ public class PlumbingStreams {
      *                 For each input tuple, a pipeline MUST create exactly one output tuple.
      *                 Tuple flow into the pipelines will cease if that requirement
      *                 is not met.
-     * @param combiner a function that creates a result stream from a stream
-     *                 whose tuples are the list of each pipeline's result.
+     * @param combiner function to create a result tuple from the list of
+     *                 results from {@code pipelines}.
      *                 The input tuple list's order is 1:1 with the {@code pipelines} list.
      *                 I.e., list entry [0] is the result from pipelines[0],
      *                 list entry [1] is the result from pipelines[1], etc.
      * @return result stream
      * @see #barrier(List, int) barrier
      */
-    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
+    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
       Objects.requireNonNull(stream, "stream");
       Objects.requireNonNull(pipelines, "pipelines");
       Objects.requireNonNull(combiner, "combiner");
@@ -381,7 +381,7 @@ public class PlumbingStreams {
       TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
       
       // Add the combiner
-      return combiner.apply(barrier);
+      return barrier.map(combiner);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 677467a..856f7ea 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -383,7 +383,7 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
         pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
         
-        Function<List<JsonObject>,Integer> tupleCombiner = list -> {
+        Function<List<JsonObject>,Integer> combiner = list -> {
             int sum = 0;
             int cnt = 0;
             System.out.println("combiner: "+list);
@@ -393,7 +393,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
             }
             return sum;
         };
-        Function<TStream<List<JsonObject>>,TStream<Integer>> combiner = stream -> stream.map(tupleCombiner);
         
         TStream<Integer> values = top.of(1, 2, 3);
         Integer[] resultTuples = new Integer[]{