You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/03 17:27:55 UTC
[8/9] incubator-quarks git commit: change "combiner" from a stream
function to a tuple function
change "combiner" from a stream function to a tuple function
I think the stream function was an unnecessary complication/burden
on the caller.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/d4d1a1d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/d4d1a1d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/d4d1a1d0
Branch: refs/heads/master
Commit: d4d1a1d0f696c96e2c8d3303f2a46c70874edef8
Parents: 8fab576
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 2 08:53:47 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon May 2 08:53:47 2016 -0400
----------------------------------------------------------------------
.../java/quarks/topology/plumbing/PlumbingStreams.java | 12 ++++++------
.../test/java/quarks/test/topology/PlumbingTest.java | 3 +--
2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index f799dbe..c816eee 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -256,7 +256,7 @@ public class PlumbingStreams {
* List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
* for (Function<T,U> mapper : mappers)
* pipelines.add(s -> s.map(mapper));
- * concurrent(stream, pipelines, s -> s.map(combiner));
+ * concurrent(stream, pipelines, combiner);
* }</pre>
* </P>
*
@@ -286,7 +286,7 @@ public class PlumbingStreams {
pipelines.add(s -> s.map(mapper));
}
- return concurrent(stream, pipelines, s -> s.map(combiner));
+ return concurrent(stream, pipelines, combiner);
}
/**
@@ -349,15 +349,15 @@ public class PlumbingStreams {
* For each input tuple, a pipeline MUST create exactly one output tuple.
* Tuple flow into the pipelines will cease if that requirement
* is not met.
- * @param combiner a function that creates a result stream from a stream
- * whose tuples are the list of each pipeline's result.
+ * @param combiner function to create a result tuple from the list of
+ * results from {@code pipelines}.
* The input tuple list's order is 1:1 with the {@code pipelines} list.
* I.e., list entry [0] is the result from pipelines[0],
* list entry [1] is the result from pipelines[1], etc.
* @return result stream
* @see #barrier(List, int) barrier
*/
- public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
+ public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
Objects.requireNonNull(stream, "stream");
Objects.requireNonNull(pipelines, "pipelines");
Objects.requireNonNull(combiner, "combiner");
@@ -381,7 +381,7 @@ public class PlumbingStreams {
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
// Add the combiner
- return combiner.apply(barrier);
+ return barrier.map(combiner);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 677467a..856f7ea 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -383,7 +383,7 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- Function<List<JsonObject>,Integer> tupleCombiner = list -> {
+ Function<List<JsonObject>,Integer> combiner = list -> {
int sum = 0;
int cnt = 0;
System.out.println("combiner: "+list);
@@ -393,7 +393,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
}
return sum;
};
- Function<TStream<List<JsonObject>>,TStream<Integer>> combiner = stream -> stream.map(tupleCombiner);
TStream<Integer> values = top.of(1, 2, 3);
Integer[] resultTuples = new Integer[]{