You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/03 17:27:48 UTC
[1/9] incubator-quarks git commit: Initial (explority) impl and tests
Repository: incubator-quarks
Updated Branches:
refs/heads/master c0e59a14e -> 6f26b5b4e
Initial (explority) impl and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/ee28532a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/ee28532a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/ee28532a
Branch: refs/heads/master
Commit: ee28532a50779536c114d1b315c23ab80baba6a7
Parents: a929f0d
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Thu Apr 28 16:47:30 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:48:10 2016 -0400
----------------------------------------------------------------------
.../java/quarks/oplet/plumbing/Isolate.java | 19 +-
.../topology/plumbing/PlumbingStreams.java | 262 ++++++++++++++++++-
.../java/quarks/test/topology/PlumbingTest.java | 133 ++++++++++
3 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ee28532a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
index 37d7466..b62f7b4 100644
--- a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
@@ -37,7 +37,24 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
private static final long serialVersionUID = 1L;
private Thread thread;
- private LinkedBlockingQueue<T> tuples = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue<T> tuples;
+
+ /**
+ * Create a new Isolate oplet.
+ * <BR>
+ * Same as Isolate(Integer.MAX_VALUE).
+ */
+ public Isolate() {
+ this(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new Isolate oplet.
+ * @param queueCapacity {@link #accept()} blocks when this capacity is reached
+ */
+ public Isolate(int queueCapacity) {
+ tuples = new LinkedBlockingQueue<>(queueCapacity);
+ }
@Override
public void initialize(OpletContext<T, T> context) {
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ee28532a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index a18411c..d03c0d4 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -18,10 +18,21 @@ under the License.
*/
package quarks.topology.plumbing;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import quarks.function.Function;
+import quarks.function.UnaryOperator;
import quarks.oplet.plumbing.Isolate;
import quarks.oplet.plumbing.PressureReliever;
import quarks.oplet.plumbing.UnorderedIsolate;
@@ -207,6 +218,7 @@ public class PlumbingStreams {
*
* @param <T> Tuple type.
* @param <K> Key type.
+ * @see #isolate(TStream, int) isolate
*/
public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
return stream.pipe(new PressureReliever<>(count, keyFunction));
@@ -230,6 +242,27 @@ public class PlumbingStreams {
}
/**
+ * Isolate upstream processing from downstream processing.
+ * <P>
+ * If the processing against the returned stream cannot keep up
+ * with the arrival rate of tuples on {@code stream}, upstream
+ * processing will block until there is space in the queue between
+ * the streams.
+ * </P><P>
+ * Processing of tuples occurs in the order they were received.
+ * </P>
+ *
+ * @param stream Stream to be isolated from downstream processing.
+ * @param queueCapacity size of the queue between {@code stream} and
+ * the returned stream.
+ * @return Stream that is isolated from {@code stream}.
+ * @see #pressureReliever(TStream, Function, int) pressureReliever
+ */
+ public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
+ return stream.pipe(new Isolate<T>(queueCapacity));
+ }
+
+ /**
* Perform analytics concurrently.
* <P>
* Process input tuples one at at time, invoking the specified
@@ -286,7 +319,79 @@ public class PlumbingStreams {
* @return result stream
*/
public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
- throw new IllegalStateException("NYI / TODO");
+ Objects.requireNonNull(stream, "stream");
+ Objects.requireNonNull(mappers, "mappers");
+ Objects.requireNonNull(combiner, "combiner");
+
+ List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+ for (Function<T,U> mapper : mappers) {
+ pipelines.add(s -> s.map(mapper));
+ }
+
+ return concurrent(stream, pipelines, s -> s.map(combiner));
+ }
+
+ // Q: is there any value to this implementation approach? Or just dispose of it?
+ @SuppressWarnings("unused")
+ private static <T,U,R> TStream<R> concurrentMapSingleOp(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
+ Objects.requireNonNull(stream, "stream");
+ Objects.requireNonNull(mappers, "mappers");
+ Objects.requireNonNull(combiner, "combiner");
+
+ // INITIAL IMPL TO GET STARTED - validate interface and test
+ // explore an impl with no new oplets
+ //
+ // This is the most lightweight impl possible wrt no intermediate streams
+ // i.e., all of the processing is handled within a single injected map()
+ //
+ // TODO: want to create ExecutorService using provider's ThreadFactory service.
+ // Can't get RuntimeServicesSupplier from a stream.
+ //
+ // Note, we impose this "non-null mapper result" requirement so as
+ // to enable alternative implementations that might be burdened if
+ // null results were allowed.
+ // The implementation below could easily handle null results w/o
+ // losing synchronization, with the combiner needing to deal with
+ // a null result in the list it's given.
+
+ AtomicReference<ExecutorService> executorRef = new AtomicReference<>();
+
+ return stream.map(tuple -> {
+ if (executorRef.get() == null) {
+ executorRef.compareAndSet(null, Executors.newFixedThreadPool(Math.min(mappers.size(), 20)));
+ }
+ ExecutorService executor = executorRef.get();
+ List<U> results = new ArrayList<>(Collections.nCopies(mappers.size(), null));
+ List<Future<?>> futures = new ArrayList<>(mappers.size());
+
+ // Submit a task for each mapper invocation
+ int ch = 0;
+ for (Function<T,U> mapper : mappers) {
+ final int resultIndx = ch++;
+ Future<?> future = executor.submit(() -> {
+ U result = mapper.apply(tuple);
+ if (result == null)
+ throw new IllegalStateException("mapper index "+resultIndx+" returned null");
+ results.set(resultIndx, result);
+ });
+ futures.add(future);
+ }
+ // Await completion of all
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("mapper interrupted", e);
+ } catch (Exception e) {
+ throw new RuntimeException("mapper threw", e);
+ }
+ }
+ // Run the combiner
+ R result = combiner.apply(results);
+ return result;
+ });
+
}
/**
@@ -348,6 +453,159 @@ public class PlumbingStreams {
* @return result stream
*/
public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
- throw new IllegalStateException("NYI / TODO");
+ Objects.requireNonNull(stream, "stream");
+ Objects.requireNonNull(pipelines, "pipelines");
+ Objects.requireNonNull(combiner, "combiner");
+
+ // INITIAL IMPL TO GET STARTED - validate interface and test
+ // explore an impl with no new oplets
+ //
+ // A few impl options exist. Some with feedback loop to control stepping,
+ // some without. Feedback is OK for single JVM case, less so for
+ // multi-JVM/distributed case.
+ //
+ // Some impls with special oplets some that avoid them.
+ //
+ // Summary of what's below:
+ // feedback loop and no new oplets:
+ // |-> isolate -> p1 -> map.toPair |
+ // stream -> map.gate =>|-> isolate -> p2 -> map.toPair |-> union -> map.Collector -> combiner
+ // |-> isolate -> p3 -> map.toPair |
+ // . . .
+ //
+
+ // Add a gate. This keeps all pipelines working lock-step.
+ // It also provides the guarantee needed by gatedBarrier below.
+ Semaphore gateSemaphore = new Semaphore(1);
+ stream = gate(stream, gateSemaphore).tag("concurrent.gate");
+
+ // Add parallel fanout - with the gate the queue size really doesn't matter
+ List<TStream<T>> fanouts = parallelFanout(stream, pipelines.size(), 1);
+ for (int i = 0; i < fanouts.size(); i++)
+ fanouts.get(i).tag("concurrent.isolated-ch"+i);
+
+ // Add pipelines
+ List<TStream<U>> results = new ArrayList<>(pipelines.size());
+ int ch = 0;
+ for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
+ results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
+ ch++;
+ }
+
+ // Add the barrier
+ TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
+
+ // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
+
+ // Add peek() to signal ok to begin next tuple
+ barrier = barrier.peek(tuple -> { gateSemaphore.release(); }).tag("concurrent.gateRelease");
+
+ // Add the combiner to the topology
+ return combiner.apply(barrier);
+ }
+
+ private static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
+ return stream.map(tuple -> {
+ try {
+ semaphore.acquire();
+ return tuple;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("interrupted", e);
+ }});
+ }
+
+ private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, int queueCapacity) {
+ // TODO want an ExecutorService to enable nThreads for mPipelines.
+ // i.e., not use "isolate", or need a way to create a collection of
+ // related "isolate" that use an executor
+ return parallelFanout(stream, numFanouts, inStream -> PlumbingStreams.isolate(inStream, queueCapacity));
+ }
+
+ private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, UnaryOperator<TStream<T>> isolator) {
+ List<TStream<T>> fanouts = new ArrayList<>(numFanouts);
+ for (int i = 0; i < numFanouts; i++) {
+ fanouts.add(isolator.apply(stream));
+ }
+ return fanouts;
+ }
+
+ /**
+ * Add a barrier that collects corresponding tuples from each input stream.
+ * <P>
+ * "GatedBarrier" is a special implementation that only works because
+ * its caller guarantees that one tuple will be received from each stream
+ * before a second tuple is received from any of the streams.
+ * </P><P>
+ * The result tuple is a list of input tuples, one from each
+ * input stream, at the same index as it's input stream. i.e., result[0]
+ * is the tuple from streams[0], result[1] is the tuple from streams[1],
+ * and so on.
+ * </P><P>
+ * The operation waits indefinitely for a tuple on each input stream
+ * to be received.
+ * </P><P>
+ * TODO remove this when we have a barrier oplet.
+ * </P>
+ *
+ * @param <T> Tuple type
+ * @param streams streams to perform the barrier on
+ * @return stream whose tuples are each a list of tuples.
+ */
+ private static <T> TStream<List<T>> gatedBarrier(List<TStream<T>> streams) {
+ // TODO really want a multi-iport oplet for the fanin/barrier.
+ // Hack for now by using union but adding per-pipeline map()
+ // that creates a Pair containing the inputPortId and result,
+ // so a following map can collect them into List<U> tuple.
+ //
+ // streams[0] -> map.toPair |
+ // streams[1] -> map.toPair |-> union -> map.Collector
+ // streams[2] -> map.toPair |
+ // ...
+
+ // Add the barrier per-pipeline "to-pair" map()
+ List<TStream<Pair<Integer,T>>> chPairStreams = new ArrayList<>(streams.size());
+ int ch = 0;
+ for (TStream<T> stream : streams) {
+ final int finalCh = ch;
+ chPairStreams.add(stream.map(u -> new Pair<Integer,T>(finalCh, u)).tag("barrier.toPair-ch"+finalCh));
+ ch++;
+ }
+
+ // Add the barrier "fanin" union()
+ TStream<Pair<Integer,T>> union = chPairStreams.get(0).union(new HashSet<>(chPairStreams)).tag("barrier.union");
+
+ // union = union.peek(pair -> System.out.println("concurrent.barrier.union pair<ch,U> "+pair));
+
+ // Add the barrier collector map()
+ AtomicInteger barrierCnt = new AtomicInteger();
+ AtomicReference<List<T>> barrierChResults = new AtomicReference<>();
+
+ TStream<List<T>> barrier = union.map(pair -> {
+ List<T> chResults = barrierChResults.get();
+ if (chResults == null) {
+ chResults = new ArrayList<>(Collections.nCopies(streams.size(), null));
+ barrierChResults.set(chResults);
+ }
+ if (chResults.get(pair.k) != null)
+ throw new IllegalStateException("caller violation: barrier port "+pair.k+" already has a tuple");
+ chResults.set(pair.k, pair.v);
+
+ if (barrierCnt.incrementAndGet() < chResults.size())
+ return null;
+
+ barrierCnt.set(0);
+ barrierChResults.set(null);
+
+ return chResults;
+ });
+
+ // keep the threads associated with the supply streams isolated
+ // from any downstream processing. this is needed here because this impl
+ // doesn't have queuing/isolation on its feeding streams.
+ barrier = PlumbingStreams.isolate(barrier, 1);
+
+ return barrier;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ee28532a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 86c10de..f147685 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -32,6 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Ignore;
import org.junit.Test;
+import com.google.gson.JsonObject;
+
+import quarks.function.Function;
import quarks.function.Functions;
import quarks.topology.TStream;
import quarks.topology.Topology;
@@ -289,5 +294,133 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
complete(top, count);
assertTrue(contents.getResult().toString(), contents.valid());
}
+
+ private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
+ return value -> {
+ try {
+ Thread.sleep(unit.toMillis(period));
+ JsonObject jo = new JsonObject();
+ jo.addProperty("channel", channel);
+ jo.addProperty("result", value);
+ return jo;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("channel="+channel+" interrupted", e);
+ }
+ };
+ }
+
+ private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
+ return stream -> stream.map(fakeAnalytic(channel, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
+ }
+
+ @Test
+ public void testConcurrentMap() throws Exception {
+ Topology top = newTopology("testConcurrentMap");
+
+ Function<Integer,JsonObject> a1 = fakeAnalytic(0, 100, TimeUnit.MILLISECONDS);
+ Function<Integer,JsonObject> a2 = fakeAnalytic(1, 100, TimeUnit.MILLISECONDS);
+ Function<Integer,JsonObject> a3 = fakeAnalytic(2, 100, TimeUnit.MILLISECONDS);
+ Function<Integer,JsonObject> a4 = fakeAnalytic(3, 100, TimeUnit.MILLISECONDS);
+ Function<Integer,JsonObject> a5 = fakeAnalytic(4, 100, TimeUnit.MILLISECONDS);
+
+ List<Function<Integer,JsonObject>> mappers = new ArrayList<>(Arrays.asList(a1, a2, a3, a4, a5));
+ Function<List<JsonObject>,Integer> combiner = list -> {
+ int sum = 0;
+ int cnt = 0;
+ System.out.println("combiner: "+list);
+ for(JsonObject jo : list) {
+ assertEquals(cnt++, jo.getAsJsonPrimitive("channel").getAsInt());
+ sum += jo.getAsJsonPrimitive("result").getAsInt();
+ }
+ return sum;
+ };
+
+ TStream<Integer> values = top.of(1, 2, 3);
+ Integer[] resultTuples = new Integer[]{
+ 1*mappers.size(),
+ 2*mappers.size(),
+ 3*mappers.size(),
+ };
+
+ TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
+
+ Condition<Long> count = top.getTester().tupleCount(result, 3);
+ Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+
+ long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
+ long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+
+ System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+
+ // a tighter performance check
+ assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration,
+ actDuration <= expMaxDuration);
+ }
+
+ @Test
+ public void testConcurrent() throws Exception {
+ Topology top = newTopology("testConcurrent");
+
+ int ch = 0;
+ Function<TStream<Integer>,TStream<JsonObject>> p1 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+ Function<TStream<Integer>,TStream<JsonObject>> p2 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+ Function<TStream<Integer>,TStream<JsonObject>> p3 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+ Function<TStream<Integer>,TStream<JsonObject>> p4 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+ Function<TStream<Integer>,TStream<JsonObject>> p5 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+
+ List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>(Arrays.asList(p1, p2, p3, p4, p5));
+ Function<List<JsonObject>,Integer> tupleCombiner = list -> {
+ int sum = 0;
+ int cnt = 0;
+ System.out.println("combiner: "+list);
+ for(JsonObject jo : list) {
+ assertEquals(cnt++, jo.getAsJsonPrimitive("channel").getAsInt());
+ sum += jo.getAsJsonPrimitive("result").getAsInt();
+ }
+ return sum;
+ };
+ Function<TStream<List<JsonObject>>,TStream<Integer>> combiner = stream -> stream.map(tupleCombiner);
+
+ TStream<Integer> values = top.of(1, 2, 3);
+ Integer[] resultTuples = new Integer[]{
+ 1*pipelines.size(),
+ 2*pipelines.size(),
+ 3*pipelines.size(),
+ };
+
+ TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
+
+ Condition<Long> count = top.getTester().tupleCount(result, 3);
+ Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+
+ long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
+ long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+
+ System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+
+ // a tighter performance check
+ assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration,
+ actDuration <= expMaxDuration);
+ }
}
[3/9] incubator-quarks git commit: proposed API for comment
Posted by dl...@apache.org.
proposed API for comment
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/95c72478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/95c72478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/95c72478
Branch: refs/heads/master
Commit: 95c724789e2a3b399dfce6a2c97b30cbee8a15fa
Parents: 410b68c
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Apr 26 13:50:27 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:48:10 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 59 ++++++++++++++++++++
1 file changed, 59 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/95c72478/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 02cba06..0e4e5e9 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -18,6 +18,7 @@ under the License.
*/
package quarks.topology.plumbing;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import quarks.function.Function;
@@ -25,6 +26,7 @@ import quarks.oplet.plumbing.Isolate;
import quarks.oplet.plumbing.PressureReliever;
import quarks.oplet.plumbing.UnorderedIsolate;
import quarks.topology.TStream;
+import quarks.topology.TopologyProvider;
/**
* Plumbing utilities for {@link TStream}.
@@ -215,5 +217,62 @@ public class PlumbingStreams {
return stream.pipe(
ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
}
+
+ /**
+ * Perform analytics concurrently.
+ * <P>
+ * Process input tuples one at at time, invoking the specified
+ * analytics ({@code mappers}) concurrently, combine the results,
+ * and then process the next input tuple in the same manner.
+ * </P><P>
+ * Logically, instead of doing this:
+ * <pre>{@code
+ * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+ * }</pre>
+ * create a graph that's logically like this:
+ * <pre>{@code
+ * -
+ * /-> A1 ->\
+ * sensorReadings<T> -> |-> A2 ->| -> result<R>
+ * \-> A3 ->/
+ * }</pre>
+ * </P><P>
+ * The typical use case for this is when an application has a collection
+ * of independent analytics to perform on each tuple and the analytics
+ * are sufficiently long running such that performing them concurrently
+ * is desired.
+ * </P><P>
+ * Note, this is in contrast to "parallel" stream processing,
+ * which in Java8 Streams and other contexts means processing multiple
+ * tuples in parallel, each on a replicated processing pipeline.
+ * </P><P>
+ * Threadsafety - one of the following must be true:
+ * <ul>
+ * <li>the tuples from {@code stream} are threadsafe</li>
+ * <li>the {@code mappers} do not modify the input tuples</li>
+ * <li>the {@code mappers} provide their own synchronization controls
+ * to protect concurrent modifications of the input tuples</li>
+ * </ul>
+ * </P><P>
+ * Logically, a thread is allocated for each of the {@code mappers}.
+ * The actual degree of concurrency may be {@link TopologyProvider} dependent.
+ * </P>
+ *
+ * @param <T> Tuple type on input stream.
+ * @param <U> Tuple type generated by mappers.
+ * @param <R> Tuple type of the result.
+ *
+ * @param stream input stream
+ * @param mappers functions to be run concurrently
+ * @param combiner function to create a result tuple from the list of
+ * results from {@code mappers}.
+ * The input list order is 1:1 with the {@code mappers} list.
+ * I.e., list entry [0] is the result from mappers[0],
+ * list entry [1] is the result from mappers[1], etc.
+ * @return result stream
+ */
+ public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
+ throw new IllegalStateException("NYI / TODO");
+ }
}
[8/9] incubator-quarks git commit: change "combiner" from a stream
function to a tuple function
Posted by dl...@apache.org.
change "combiner" from a stream function to a tuple function
I think the stream function was an unnecessary complication/burden
on the caller.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/d4d1a1d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/d4d1a1d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/d4d1a1d0
Branch: refs/heads/master
Commit: d4d1a1d0f696c96e2c8d3303f2a46c70874edef8
Parents: 8fab576
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 2 08:53:47 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon May 2 08:53:47 2016 -0400
----------------------------------------------------------------------
.../java/quarks/topology/plumbing/PlumbingStreams.java | 12 ++++++------
.../test/java/quarks/test/topology/PlumbingTest.java | 3 +--
2 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index f799dbe..c816eee 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -256,7 +256,7 @@ public class PlumbingStreams {
* List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
* for (Function<T,U> mapper : mappers)
* pipelines.add(s -> s.map(mapper));
- * concurrent(stream, pipelines, s -> s.map(combiner));
+ * concurrent(stream, pipelines, combiner);
* }</pre>
* </P>
*
@@ -286,7 +286,7 @@ public class PlumbingStreams {
pipelines.add(s -> s.map(mapper));
}
- return concurrent(stream, pipelines, s -> s.map(combiner));
+ return concurrent(stream, pipelines, combiner);
}
/**
@@ -349,15 +349,15 @@ public class PlumbingStreams {
* For each input tuple, a pipeline MUST create exactly one output tuple.
* Tuple flow into the pipelines will cease if that requirement
* is not met.
- * @param combiner a function that creates a result stream from a stream
- * whose tuples are the list of each pipeline's result.
+ * @param combiner function to create a result tuple from the list of
+ * results from {@code pipelines}.
* The input tuple list's order is 1:1 with the {@code pipelines} list.
* I.e., list entry [0] is the result from pipelines[0],
* list entry [1] is the result from pipelines[1], etc.
* @return result stream
* @see #barrier(List, int) barrier
*/
- public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
+ public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
Objects.requireNonNull(stream, "stream");
Objects.requireNonNull(pipelines, "pipelines");
Objects.requireNonNull(combiner, "combiner");
@@ -381,7 +381,7 @@ public class PlumbingStreams {
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
// Add the combiner
- return combiner.apply(barrier);
+ return barrier.map(combiner);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/d4d1a1d0/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 677467a..856f7ea 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -383,7 +383,7 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- Function<List<JsonObject>,Integer> tupleCombiner = list -> {
+ Function<List<JsonObject>,Integer> combiner = list -> {
int sum = 0;
int cnt = 0;
System.out.println("combiner: "+list);
@@ -393,7 +393,6 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
}
return sum;
};
- Function<TStream<List<JsonObject>>,TStream<Integer>> combiner = stream -> stream.map(tupleCombiner);
TStream<Integer> values = top.of(1, 2, 3);
Integer[] resultTuples = new Integer[]{
[7/9] incubator-quarks git commit: doc tweak
Posted by dl...@apache.org.
doc tweak
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/8fab5768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/8fab5768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/8fab5768
Branch: refs/heads/master
Commit: 8fab5768c01780599aad5208633de0e25893dbe4
Parents: 5a520e4
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Sun May 1 11:21:04 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Sun May 1 11:21:04 2016 -0400
----------------------------------------------------------------------
.../quarks/topology/plumbing/PlumbingStreams.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8fab5768/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index a267d00..f799dbe 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -306,6 +306,15 @@ public class PlumbingStreams {
* |-> A1pipeline ->|
* sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
* |-> A3pipeline ->|
+ *
+ * }</pre>
+ * more specifically a graph like this:
+ * <pre>{@code
+ * -
+ * |-> isolate(1) -> pipeline1 -> |
+ * stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner
+ * |-> isolate(1) -> pipeline3 -> |
+ * . . .
* }</pre>
* </P><P>
* The typical use case for this is when an application has a collection
@@ -353,12 +362,6 @@ public class PlumbingStreams {
Objects.requireNonNull(pipelines, "pipelines");
Objects.requireNonNull(combiner, "combiner");
- // Summary of what's below:
- // |-> isolate(1) -> p1 -> |
- // stream -> |-> isolate(1) -> p2 -> |-> barrier(10) -> combiner
- // |-> isolate(1) -> p3 -> |
- // . . .
-
int barrierQueueCapacity = 10; // don't preclude pipelines from getting ahead some.
// Add concurrent (isolated) fanouts
[9/9] incubator-quarks git commit: Merge pull request #99
Posted by dl...@apache.org.
Merge pull request #99
This closes #99
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/6f26b5b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/6f26b5b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/6f26b5b4
Branch: refs/heads/master
Commit: 6f26b5b4e715d794dba13c65b876eed49f34cc64
Parents: c0e59a1 d4d1a1d
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 3 11:27:33 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 11:27:33 2016 -0400
----------------------------------------------------------------------
.../src/main/java/quarks/oplet/core/FanIn.java | 114 ++++++++++
.../src/main/java/quarks/oplet/core/Union.java | 19 +-
.../java/quarks/oplet/plumbing/Barrier.java | 113 ++++++++++
.../java/quarks/oplet/plumbing/Isolate.java | 28 ++-
.../src/main/java/quarks/topology/TStream.java | 17 ++
.../topology/plumbing/PlumbingStreams.java | 213 ++++++++++++++++++-
.../java/quarks/test/topology/PlumbingTest.java | 136 ++++++++++++
.../topology/spi/graph/ConnectorStream.java | 30 ++-
8 files changed, 651 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
[2/9] incubator-quarks git commit: add stream-based concurrent(),
clarify non-null tuple result requirement
Posted by dl...@apache.org.
add stream-based concurrent(), clarify non-null tuple result requirement
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/a929f0d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/a929f0d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/a929f0d6
Branch: refs/heads/master
Commit: a929f0d6861fc0f8336616ed8217a174dc23c2b4
Parents: 95c7247
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed Apr 27 15:07:46 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:48:10 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 87 ++++++++++++++++++--
1 file changed, 81 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a929f0d6/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 0e4e5e9..a18411c 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -34,7 +34,18 @@ import quarks.topology.TopologyProvider;
* but are not part of the logic of the application.
*/
public class PlumbingStreams {
-
+
+ // Use apache.math3.Pair ?
+ private static class Pair<K,V> {
+ K k;
+ V v;
+ Pair(K k, V v) {
+ this.k = k;
+ this.v = v;
+ }
+ public String toString() { return "k="+k+" v="+v; };
+ };
+
/**
* Insert a blocking delay between tuples.
* Returned stream is the input stream delayed by {@code delay}.
@@ -232,9 +243,9 @@ public class PlumbingStreams {
* create a graph that's logically like this:
* <pre>{@code
* -
- * /-> A1 ->\
- * sensorReadings<T> -> |-> A2 ->| -> result<R>
- * \-> A3 ->/
+ * |-> A1 ->|
+ * sensorReadings<T> -> |-> A2 ->| -> result<R>
+ * |-> A3 ->|
* }</pre>
* </P><P>
* The typical use case for this is when an application has a collection
@@ -263,9 +274,12 @@ public class PlumbingStreams {
* @param <R> Tuple type of the result.
*
* @param stream input stream
- * @param mappers functions to be run concurrently
+ * @param mappers functions to be run concurrently. Each mapper MUST
+ * return a non-null result.
+ * A runtime error will be generated if a null result
+ * is returned.
* @param combiner function to create a result tuple from the list of
- * results from {@code mappers}.
+ * results from {@code mappers}.
* The input list order is 1:1 with the {@code mappers} list.
* I.e., list entry [0] is the result from mappers[0],
* list entry [1] is the result from mappers[1], etc.
@@ -275,4 +289,65 @@ public class PlumbingStreams {
throw new IllegalStateException("NYI / TODO");
}
+ /**
+ * Perform analytics concurrently.
+ * <P>
+ * Process input tuples one at at time, invoking the specified
+ * analytics ({@code pipelines}) concurrently, combine the results,
+ * and then process the next input tuple in the same manner.
+ * </P><P>
+ * Logically, instead of doing this:
+ * <pre>{@code
+ * sensorReadings<T> -> A1pipeline -> A2pipeline -> A3pipeline -> results<R>
+ * }</pre>
+ * create a graph that's logically like this:
+ * <pre>{@code
+ * -
+ * |-> A1pipeline ->|
+ * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
+ * |-> A3pipeline ->|
+ * }</pre>
+ * </P><P>
+ * The typical use case for this is when an application has a collection
+ * of independent analytics to perform on each tuple and the analytics
+ * are sufficiently long running such that performing them concurrently
+ * is desired.
+ * </P><P>
+ * Note, this is in contrast to "parallel" stream processing,
+ * which in Java8 Streams and other contexts means processing multiple
+ * tuples in parallel, each on a replicated processing pipeline.
+ * </P><P>
+ * Threadsafety - one of the following must be true:
+ * <ul>
+ * <li>the tuples from {@code stream} are threadsafe</li>
+ * <li>the {@code pipelines} do not modify the input tuples</li>
+ * <li>the {@code pipelines} provide their own synchronization controls
+ * to protect concurrent modifications of the input tuples</li>
+ * </ul>
+ * </P><P>
+ * Logically, a thread is allocated for each of the {@code pipelines}.
+ * The actual degree of concurrency may be {@link TopologyProvider} dependent.
+ * </P>
+ *
+ * @param <T> Tuple type on input stream.
+ * @param <U> Tuple type generated by pipelines.
+ * @param <R> Tuple type of the result.
+ *
+ * @param stream input stream
+ * @param pipelines a list of functions to add a pipeline to the topology.
+ * Each {@code pipeline.apply()} is called with {@code stream}
+ * as the input, yielding the pipeline's result stream.
+ * For each input tuple, a pipeline MUST create exactly one output tuple.
+ * Tuple flow into the pipelines will cease if that requirement
+ * is not met.
+ * @param combiner a function that creates a result stream from a stream
+ * whose tuples are the list of each pipeline's result.
+ * The input tuple list's order is 1:1 with the {@code pipelines} list.
+ * I.e., list entry [0] is the result from pipelines[0],
+ * list entry [1] is the result from pipelines[1], etc.
+ * @return result stream
+ */
+ public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
+ throw new IllegalStateException("NYI / TODO");
+ }
}
[4/9] incubator-quarks git commit: Add TStream.fanin(),
FanIn and Barrier oplets
Posted by dl...@apache.org.
Add TStream.fanin(), FanIn and Barrier oplets
Expose Barrier via PlumbingStreams.barrier().
Update concurrent() to use barrier.
Remove Runnable from Isolate's interface (its an implementation detail)
FanIn behavior is driven by a specified tuple receiver BiFunction.
Barrier is a type of FanIn.
Union is a type of FanIn with a trivial tuple receiver function.
Chose to add the more constrained TStream.fanin(FanIn,others) instead of
a ~wide-open op(Oplet, others). Those can be added later as needed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/734f1463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/734f1463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/734f1463
Branch: refs/heads/master
Commit: 734f146376120f0b4b30811ff66caf182c22493e
Parents: ee28532
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 12:39:36 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 12:50:29 2016 -0400
----------------------------------------------------------------------
.../src/main/java/quarks/oplet/core/FanIn.java | 114 +++++++++++++++++
.../src/main/java/quarks/oplet/core/Union.java | 19 ++-
.../java/quarks/oplet/plumbing/Barrier.java | 113 +++++++++++++++++
.../java/quarks/oplet/plumbing/Isolate.java | 11 +-
.../src/main/java/quarks/topology/TStream.java | 17 +++
.../topology/plumbing/PlumbingStreams.java | 124 ++++++-------------
.../topology/spi/graph/ConnectorStream.java | 30 ++++-
7 files changed, 325 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/core/FanIn.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/core/FanIn.java b/api/oplet/src/main/java/quarks/oplet/core/FanIn.java
new file mode 100644
index 0000000..b1c5f48
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/core/FanIn.java
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import quarks.function.BiFunction;
+import quarks.function.Consumer;
+import quarks.oplet.OpletContext;
+
+/**
+ * FanIn oplet, merges multiple input ports into a single output port.
+ * <P>
+ * For each tuple received, {@code receiver.apply(T tuple, Integer index)}
+ * is called. {@code index} is the tuple's input stream's index, where
+ * {@code this} is index 0 followed by {@code others} in their order.
+ * {@code receiver} either returns a tuple to emit on the output
+ * stream or null.
+ * </P>
+ */
+public class FanIn<T,U> extends AbstractOplet<T, U> {
+ private BiFunction<T, Integer, U> receiver;
+ private List<Consumer<T>> iportConsumers;
+ private Consumer<U> destination;
+
+ public FanIn() {
+ }
+
+ public FanIn(BiFunction<T, Integer, U> receiver) {
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void initialize(OpletContext<T, U> context) {
+ super.initialize(context);
+ destination = context.getOutputs().get(0);
+
+ // Create a consumer for each input port.
+ int numIports = getOpletContext().getInputCount();
+ if (iportConsumers == null) {
+ // each iport invokes the receiver
+ iportConsumers = new ArrayList<>(numIports);
+ for (int i = 0; i < numIports; i++)
+ iportConsumers.add(consumer(i));
+ iportConsumers = Collections.unmodifiableList(iportConsumers);
+ }
+ }
+
+ /**
+ * Set the receiver function. Must be called no later than as part
+ * of {@link #initialize(OpletContext)}.
+ * @param receiver
+ */
+ protected void setReceiver(BiFunction<T, Integer, U> receiver) {
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public List<? extends Consumer<T>> getInputs() {
+ return iportConsumers;
+ }
+
+ /**
+ * Create a Consumer for the input port that invokes the
+ * receiver and submits a generated tuple, if any, to the output.
+ * @param iportIndex
+ * @return the Consumer
+ */
+ protected Consumer<T> consumer(int iportIndex) {
+ return tuple -> {
+ U result = receiver.apply(tuple, iportIndex);
+ if (result != null)
+ submit(result);
+ };
+ }
+
+ protected Consumer<U> getDestination() {
+ return destination;
+ }
+
+ /**
+ * Submit a tuple to single output.
+ * @param tuple Tuple to be submitted.
+ */
+ protected void submit(U tuple) {
+ getDestination().accept(tuple);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/core/Union.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/core/Union.java b/api/oplet/src/main/java/quarks/oplet/core/Union.java
index c4ba423..e33257a 100644
--- a/api/oplet/src/main/java/quarks/oplet/core/Union.java
+++ b/api/oplet/src/main/java/quarks/oplet/core/Union.java
@@ -18,10 +18,7 @@ under the License.
*/
package quarks.oplet.core;
-import java.util.Collections;
-import java.util.List;
-
-import quarks.function.Consumer;
+import quarks.oplet.OpletContext;
/**
* Union oplet, merges multiple input ports
@@ -30,19 +27,19 @@ import quarks.function.Consumer;
* Processing for each input is identical
* and just submits the tuple to the single output.
*/
-public final class Union<T> extends AbstractOplet<T, T> {
+public final class Union<T> extends FanIn<T, T> {
@Override
public void start() {
}
- /**
- * For each input set the output directly to the only output.
- */
+
@Override
- public List<? extends Consumer<T>> getInputs() {
- Consumer<T> output = getOpletContext().getOutputs().get(0);
- return Collections.nCopies(getOpletContext().getInputCount(), output);
+ public void initialize(OpletContext<T, T> context) {
+ super.initialize(context);
+
+ // forward every tuple to the output port
+ setReceiver((tuple, iportIndex) -> tuple);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java b/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java
new file mode 100644
index 0000000..d6f67b9
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/Barrier.java
@@ -0,0 +1,113 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.plumbing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import quarks.function.BiFunction;
+import quarks.oplet.OpletContext;
+import quarks.oplet.core.FanIn;
+
+/**
+ * A tuple synchronization barrier.
+ * <P>
+ * {@code Barrier} has n input ports with tuple type {@code T}
+ * and one output port with tuple type {@code List<T>}.
+ * Once the oplet receives one tuple on each of its input ports,
+ * it generates an output tuple containing one tuple from each input port.
+ * It then awaits receiving the next collection of tuples.
+ * Input port 0's tuple is in list[0], port 1's tuple in list[1], and so on.
+ * </P><P>
+ * Each input port has an associated queue of size {@code queueCapacity}.
+ * The input port's {@code Consumer<T>.accept()} will block if it's queue is full.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ */
+public class Barrier<T> extends FanIn<T, List<T>> {
+
+ private final int queueCapacity;
+ private Thread thread;
+ private List<LinkedBlockingQueue<T>> iportQueues;
+
+ /**
+ * Create a new instance.
+ * @param queueCapacity size of each input port's blocking queue
+ */
+ public Barrier(int queueCapacity) {
+ this.queueCapacity = queueCapacity;
+ }
+
+ @Override
+ public void initialize(OpletContext<T, List<T>> context) {
+ super.initialize(context);
+
+ thread = context.getService(ThreadFactory.class).newThread(() -> run());
+
+ int numIports = getOpletContext().getInputCount();
+ iportQueues = new ArrayList<>(numIports);
+ for (int i = 0; i < numIports; i++)
+ iportQueues.add(new LinkedBlockingQueue<>(queueCapacity));
+
+ setReceiver(receiver());
+ }
+
+ @Override
+ public void start() {
+ thread.start();
+ }
+
+ protected BiFunction<T,Integer,List<T>> receiver() {
+ return (tuple, iportIndex) -> {
+ accept(tuple, iportIndex);
+ return null;
+ };
+ }
+
+ protected void accept(T tuple, int iportIndex) {
+ try {
+ iportQueues.get(iportIndex).put(tuple);
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void run() {
+ while (!Thread.interrupted()) {
+ try {
+ List<T> list = new ArrayList<>(iportQueues.size());
+ for (LinkedBlockingQueue<T> iport : iportQueues) {
+ list.add(iport.take());
+ }
+ submit(list);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
index b62f7b4..b32ab61 100644
--- a/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/Isolate.java
@@ -33,7 +33,7 @@ import quarks.oplet.core.Pipe;
*
* @param <T> Type of the tuple.
*/
-public class Isolate<T> extends Pipe<T,T> implements Runnable {
+public class Isolate<T> extends Pipe<T,T> {
private static final long serialVersionUID = 1L;
private Thread thread;
@@ -50,7 +50,9 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
/**
* Create a new Isolate oplet.
- * @param queueCapacity {@link #accept()} blocks when this capacity is reached
+ * @param queueCapacity size of the queue between the input stream
+ * and the output stream.
+ * {@link #accept(Object) accept} blocks when the queue is full.
*/
public Isolate(int queueCapacity) {
tuples = new LinkedBlockingQueue<>(queueCapacity);
@@ -59,7 +61,7 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
@Override
public void initialize(OpletContext<T, T> context) {
super.initialize(context);
- thread = context.getService(ThreadFactory.class).newThread(this);
+ thread = context.getService(ThreadFactory.class).newThread(() -> run());
}
@Override
@@ -78,8 +80,7 @@ public class Isolate<T> extends Pipe<T,T> implements Runnable {
}
}
- @Override
- public void run() {
+ private void run() {
while (!Thread.interrupted()) {
try {
submit(tuples.take());
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/topology/src/main/java/quarks/topology/TStream.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/TStream.java b/api/topology/src/main/java/quarks/topology/TStream.java
index b230475..f4d1a44 100644
--- a/api/topology/src/main/java/quarks/topology/TStream.java
+++ b/api/topology/src/main/java/quarks/topology/TStream.java
@@ -30,6 +30,7 @@ import quarks.function.Function;
import quarks.function.Predicate;
import quarks.function.ToIntFunction;
import quarks.function.UnaryOperator;
+import quarks.oplet.core.FanIn;
import quarks.oplet.core.Pipe;
import quarks.oplet.core.Sink;
@@ -300,6 +301,22 @@ public interface TStream<T> extends TopologyElement {
<U> TStream<U> pipe(Pipe<T, U> pipe);
/**
+ * Declare a stream that contains the output of the specified
+ * {@link FanIn} oplet applied to this stream and {@code others}.
+ *
+ * @param <U> Tuple type of the returned streams.
+ * @param fanin The {@link FanIn} oplet.
+ * @param others The other input streams.
+ * Must not be empty or contain duplicates or {@code this}
+ *
+ * @return a stream that contains the tuples emitted by the oplet.
+ * @see #union(Set)
+ * @see #pipe(Pipe)
+ * @see #sink(Sink)
+ */
+ <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others);
+
+ /**
* Declare a new stream that modifies each tuple from this stream into one
* (or zero) tuple of the same type {@code T}. For each tuple {@code t}
* on this stream, the returned stream will contain a tuple that is the
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index d03c0d4..554fd39 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -20,7 +20,6 @@ package quarks.topology.plumbing;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
@@ -28,11 +27,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import quarks.function.Function;
import quarks.function.UnaryOperator;
+import quarks.oplet.plumbing.Barrier;
import quarks.oplet.plumbing.Isolate;
import quarks.oplet.plumbing.PressureReliever;
import quarks.oplet.plumbing.UnorderedIsolate;
@@ -46,17 +45,6 @@ import quarks.topology.TopologyProvider;
*/
public class PlumbingStreams {
- // Use apache.math3.Pair ?
- private static class Pair<K,V> {
- K k;
- V v;
- Pair(K k, V v) {
- this.k = k;
- this.v = v;
- }
- public String toString() { return "k="+k+" v="+v; };
- };
-
/**
* Insert a blocking delay between tuples.
* Returned stream is the input stream delayed by {@code delay}.
@@ -493,7 +481,8 @@ public class PlumbingStreams {
}
// Add the barrier
- TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
+ // TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
+ TStream<List<U>> barrier = barrier(results).tag("concurrent.barrier");
// barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
@@ -529,83 +518,48 @@ public class PlumbingStreams {
}
return fanouts;
}
-
+
+ /**
+ * A tuple synchronization barrier.
+ * <P>
+ * Same as {@code barrier(others, 1)}
+ * </P>
+ * @see #barrier(List, int)
+ */
+ public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
+ return barrier(streams, 1);
+ }
+
/**
- * Add a barrier that collects corresponding tuples from each input stream.
+ * A tuple synchronization barrier.
* <P>
- * "GatedBarrier" is a special implementation that only works because
- * its caller guarantees that one tuple will be received from each stream
- * before a second tuple is received from any of the streams.
+ * A barrier has n input streams with tuple type {@code T}
+ * and one output stream with tuple type {@code List<T>}.
+ * Once the barrier receives one tuple on each of its input streams,
+ * it generates an output tuple containing one tuple from each input stream.
+ * It then waits until it has received another tuple from each input stream.
* </P><P>
- * The result tuple is a list of input tuples, one from each
- * input stream, at the same index as it's input stream. i.e., result[0]
- * is the tuple from streams[0], result[1] is the tuple from streams[1],
- * and so on.
+ * Input stream 0's tuple is in the output tuple's list[0],
+ * stream 1's tuple in list[1], and so on.
* </P><P>
- * The operation waits indefinitely for a tuple on each input stream
- * to be received.
+ * The barrier's output stream is isolated from the input streams.
* </P><P>
- * TODO remove this when we have a barrier oplet.
- * </P>
+ * The barrier has a queue of size {@code queueCapacity} for each
+ * input stream. When a tuple for an input stream is received it is
+ * added to its queue. The stream will block if the queue is full.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
*
- * @param <T> Tuple type
- * @param streams streams to perform the barrier on
- * @return stream whose tuples are each a list of tuples.
+ * @param streams the list of input streams
+ * @param queueCapacity the size of each input stream's queue
+ * @return the output stream
+ * @see Barrier
*/
- private static <T> TStream<List<T>> gatedBarrier(List<TStream<T>> streams) {
- // TODO really want a multi-iport oplet for the fanin/barrier.
- // Hack for now by using union but adding per-pipeline map()
- // that creates a Pair containing the inputPortId and result,
- // so a following map can collect them into List<U> tuple.
- //
- // streams[0] -> map.toPair |
- // streams[1] -> map.toPair |-> union -> map.Collector
- // streams[2] -> map.toPair |
- // ...
-
- // Add the barrier per-pipeline "to-pair" map()
- List<TStream<Pair<Integer,T>>> chPairStreams = new ArrayList<>(streams.size());
- int ch = 0;
- for (TStream<T> stream : streams) {
- final int finalCh = ch;
- chPairStreams.add(stream.map(u -> new Pair<Integer,T>(finalCh, u)).tag("barrier.toPair-ch"+finalCh));
- ch++;
- }
-
- // Add the barrier "fanin" union()
- TStream<Pair<Integer,T>> union = chPairStreams.get(0).union(new HashSet<>(chPairStreams)).tag("barrier.union");
-
- // union = union.peek(pair -> System.out.println("concurrent.barrier.union pair<ch,U> "+pair));
-
- // Add the barrier collector map()
- AtomicInteger barrierCnt = new AtomicInteger();
- AtomicReference<List<T>> barrierChResults = new AtomicReference<>();
-
- TStream<List<T>> barrier = union.map(pair -> {
- List<T> chResults = barrierChResults.get();
- if (chResults == null) {
- chResults = new ArrayList<>(Collections.nCopies(streams.size(), null));
- barrierChResults.set(chResults);
- }
- if (chResults.get(pair.k) != null)
- throw new IllegalStateException("caller violation: barrier port "+pair.k+" already has a tuple");
- chResults.set(pair.k, pair.v);
-
- if (barrierCnt.incrementAndGet() < chResults.size())
- return null;
-
- barrierCnt.set(0);
- barrierChResults.set(null);
-
- return chResults;
- });
-
- // keep the threads associated with the supply streams isolated
- // from any downstream processing. this is needed here because this impl
- // doesn't have queuing/isolation on its feeding streams.
- barrier = PlumbingStreams.isolate(barrier, 1);
-
- return barrier;
+ public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
+ List<TStream<T>> others = new ArrayList<>(streams);
+ TStream<T> s1 = others.remove(0);
+ return s1.fanin(new Barrier<T>(queueCapacity), others);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/734f1463/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
----------------------------------------------------------------------
diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
index 7a71004..51068ff 100644
--- a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
+++ b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
@@ -41,6 +41,8 @@ import quarks.function.ToIntFunction;
import quarks.graph.Connector;
import quarks.graph.Graph;
import quarks.graph.Vertex;
+import quarks.oplet.Oplet;
+import quarks.oplet.core.FanIn;
import quarks.oplet.core.Pipe;
import quarks.oplet.core.Sink;
import quarks.oplet.core.Split;
@@ -63,8 +65,8 @@ import quarks.window.Windows;
/**
* A stream that directly adds oplets to the graph.
*
- * @param <G>
- * @param <T>
+ * @param <G> topology type
+ * @param <T> tuple type
*/
public class ConnectorStream<G extends Topology, T> extends AbstractTStream<G, T> {
@@ -160,6 +162,30 @@ public class ConnectorStream<G extends Topology, T> extends AbstractTStream<G, T
}
@Override
+ public <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others) {
+ if (others.isEmpty() || others.size() == 1 && others.contains(this))
+ throw new IllegalArgumentException("others"); // use pipe()
+ if (new HashSet<>(others).size() != others.size())
+ throw new IllegalArgumentException("others has dups");
+
+ for (TStream<T> other : others)
+ verify(other);
+
+ others = new ArrayList<>(others);
+ others.add(0, this);
+
+ Vertex<Oplet<T,U>, T, U> fanInVertex = graph().insert(fanin, others.size(), 1);
+ int inputPort = 0;
+ for (TStream<T> other : others) {
+ @SuppressWarnings("unchecked")
+ ConnectorStream<G,T> cs = (ConnectorStream<G, T>) other;
+ cs.connector.connect(fanInVertex, inputPort++);
+ }
+
+ return derived(fanInVertex.getConnectors().get(0));
+ }
+
+ @Override
public <K> TWindow<T, K> last(int count, Function<T, K> keyFunction) {
TWindowImpl<T, K> window = new TWindowImpl<T, K>(count, this, keyFunction);
return window;
[6/9] incubator-quarks git commit: tidy up
Posted by dl...@apache.org.
tidy up
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/5a520e49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/5a520e49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/5a520e49
Branch: refs/heads/master
Commit: 5a520e49909f0371d53d2dec9ef3357a98b53911
Parents: cde2678
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 17:06:30 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 17:06:30 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 116 +++----------------
1 file changed, 13 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/5a520e49/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index ac75103..a267d00 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -19,14 +19,9 @@ under the License.
package quarks.topology.plumbing;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import quarks.function.Function;
import quarks.oplet.plumbing.Barrier;
@@ -251,41 +246,18 @@ public class PlumbingStreams {
/**
* Perform analytics concurrently.
* <P>
- * Process input tuples one at at time, invoking the specified
- * analytics ({@code mappers}) concurrently, combine the results,
- * and then process the next input tuple in the same manner.
+ * This is a convenience function that calls
+ * {@link #concurrent(TStream, List, Function)} after
+ * creating {@code pipeline} and {@code combiner} functions
+ * from the supplied {@code mappers} and {@code combiner} arguments.
* </P><P>
- * Logically, instead of doing this:
+ * That is, it is logically, if not exactly, the same as:
* <pre>{@code
- * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+ * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+ * for (Function<T,U> mapper : mappers)
+ * pipelines.add(s -> s.map(mapper));
+ * concurrent(stream, pipelines, s -> s.map(combiner));
* }</pre>
- * create a graph that's logically like this:
- * <pre>{@code
- * -
- * |-> A1 ->|
- * sensorReadings<T> -> |-> A2 ->| -> result<R>
- * |-> A3 ->|
- * }</pre>
- * </P><P>
- * The typical use case for this is when an application has a collection
- * of independent analytics to perform on each tuple and the analytics
- * are sufficiently long running such that performing them concurrently
- * is desired.
- * </P><P>
- * Note, this is in contrast to "parallel" stream processing,
- * which in Java8 Streams and other contexts means processing multiple
- * tuples in parallel, each on a replicated processing pipeline.
- * </P><P>
- * Threadsafety - one of the following must be true:
- * <ul>
- * <li>the tuples from {@code stream} are threadsafe</li>
- * <li>the {@code mappers} do not modify the input tuples</li>
- * <li>the {@code mappers} provide their own synchronization controls
- * to protect concurrent modifications of the input tuples</li>
- * </ul>
- * </P><P>
- * Logically, a thread is allocated for each of the {@code mappers}.
- * The actual degree of concurrency may be {@link TopologyProvider} dependent.
* </P>
*
* @param <T> Tuple type on input stream.
@@ -316,69 +288,6 @@ public class PlumbingStreams {
return concurrent(stream, pipelines, s -> s.map(combiner));
}
-
- // Q: is there any value to this implementation approach? Or just dispose of it?
- @SuppressWarnings("unused")
- private static <T,U,R> TStream<R> concurrentMapSingleOp(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
- Objects.requireNonNull(stream, "stream");
- Objects.requireNonNull(mappers, "mappers");
- Objects.requireNonNull(combiner, "combiner");
-
- // INITIAL IMPL TO GET STARTED - validate interface and test
- // explore an impl with no new oplets
- //
- // This is the most lightweight impl possible wrt no intermediate streams
- // i.e., all of the processing is handled within a single injected map()
- //
- // TODO: want to create ExecutorService using provider's ThreadFactory service.
- // Can't get RuntimeServicesSupplier from a stream.
- //
- // Note, we impose this "non-null mapper result" requirement so as
- // to enable alternative implementations that might be burdened if
- // null results were allowed.
- // The implementation below could easily handle null results w/o
- // losing synchronization, with the combiner needing to deal with
- // a null result in the list it's given.
-
- AtomicReference<ExecutorService> executorRef = new AtomicReference<>();
-
- return stream.map(tuple -> {
- if (executorRef.get() == null) {
- executorRef.compareAndSet(null, Executors.newFixedThreadPool(Math.min(mappers.size(), 20)));
- }
- ExecutorService executor = executorRef.get();
- List<U> results = new ArrayList<>(Collections.nCopies(mappers.size(), null));
- List<Future<?>> futures = new ArrayList<>(mappers.size());
-
- // Submit a task for each mapper invocation
- int ch = 0;
- for (Function<T,U> mapper : mappers) {
- final int resultIndx = ch++;
- Future<?> future = executor.submit(() -> {
- U result = mapper.apply(tuple);
- if (result == null)
- throw new IllegalStateException("mapper index "+resultIndx+" returned null");
- results.set(resultIndx, result);
- });
- futures.add(future);
- }
- // Await completion of all
- for (Future<?> future : futures) {
- try {
- future.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("mapper interrupted", e);
- } catch (Exception e) {
- throw new RuntimeException("mapper threw", e);
- }
- }
- // Run the combiner
- R result = combiner.apply(results);
- return result;
- });
-
- }
/**
* Perform analytics concurrently.
@@ -394,9 +303,9 @@ public class PlumbingStreams {
* create a graph that's logically like this:
* <pre>{@code
* -
- * |-> A1pipeline ->|
- * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
- * |-> A3pipeline ->|
+ * |-> A1pipeline ->|
+ * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
+ * |-> A3pipeline ->|
* }</pre>
* </P><P>
* The typical use case for this is when an application has a collection
@@ -437,6 +346,7 @@ public class PlumbingStreams {
* I.e., list entry [0] is the result from pipelines[0],
* list entry [1] is the result from pipelines[1], etc.
* @return result stream
+ * @see #barrier(List, int) barrier
*/
public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<TStream<List<U>>,TStream<R>> combiner) {
Objects.requireNonNull(stream, "stream");
[5/9] incubator-quarks git commit: tidy up
Posted by dl...@apache.org.
tidy up
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/cde26788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/cde26788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/cde26788
Branch: refs/heads/master
Commit: cde26788a61a707bc9eea845cf909d75c5c9905f
Parents: 734f146
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 15:47:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 15:47:35 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 72 ++++----------------
.../java/quarks/test/topology/PlumbingTest.java | 30 ++++----
2 files changed, 29 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 554fd39..ac75103 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -25,12 +25,10 @@ import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import quarks.function.Function;
-import quarks.function.UnaryOperator;
import quarks.oplet.plumbing.Barrier;
import quarks.oplet.plumbing.Isolate;
import quarks.oplet.plumbing.PressureReliever;
@@ -445,32 +443,18 @@ public class PlumbingStreams {
Objects.requireNonNull(pipelines, "pipelines");
Objects.requireNonNull(combiner, "combiner");
- // INITIAL IMPL TO GET STARTED - validate interface and test
- // explore an impl with no new oplets
- //
- // A few impl options exist. Some with feedback loop to control stepping,
- // some without. Feedback is OK for single JVM case, less so for
- // multi-JVM/distributed case.
- //
- // Some impls with special oplets some that avoid them.
- //
// Summary of what's below:
- // feedback loop and no new oplets:
- // |-> isolate -> p1 -> map.toPair |
- // stream -> map.gate =>|-> isolate -> p2 -> map.toPair |-> union -> map.Collector -> combiner
- // |-> isolate -> p3 -> map.toPair |
- // . . .
- //
-
- // Add a gate. This keeps all pipelines working lock-step.
- // It also provides the guarantee needed by gatedBarrier below.
- Semaphore gateSemaphore = new Semaphore(1);
- stream = gate(stream, gateSemaphore).tag("concurrent.gate");
+ // |-> isolate(1) -> p1 -> |
+ // stream -> |-> isolate(1) -> p2 -> |-> barrier(10) -> combiner
+ // |-> isolate(1) -> p3 -> |
+ // . . .
+
+ int barrierQueueCapacity = 10; // don't preclude pipelines from getting ahead some.
- // Add parallel fanout - with the gate the queue size really doesn't matter
- List<TStream<T>> fanouts = parallelFanout(stream, pipelines.size(), 1);
- for (int i = 0; i < fanouts.size(); i++)
- fanouts.get(i).tag("concurrent.isolated-ch"+i);
+ // Add concurrent (isolated) fanouts
+ List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
+ for (int i = 0; i < pipelines.size(); i++)
+ fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
// Add pipelines
List<TStream<U>> results = new ArrayList<>(pipelines.size());
@@ -481,43 +465,11 @@ public class PlumbingStreams {
}
// Add the barrier
- // TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
- TStream<List<U>> barrier = barrier(results).tag("concurrent.barrier");
-
- // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
-
- // Add peek() to signal ok to begin next tuple
- barrier = barrier.peek(tuple -> { gateSemaphore.release(); }).tag("concurrent.gateRelease");
+ TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
- // Add the combiner to the topology
+ // Add the combiner
return combiner.apply(barrier);
}
-
- private static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
- return stream.map(tuple -> {
- try {
- semaphore.acquire();
- return tuple;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("interrupted", e);
- }});
- }
-
- private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, int queueCapacity) {
- // TODO want an ExecutorService to enable nThreads for mPipelines.
- // i.e., not use "isolate", or need a way to create a collection of
- // related "isolate" that use an executor
- return parallelFanout(stream, numFanouts, inStream -> PlumbingStreams.isolate(inStream, queueCapacity));
- }
-
- private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, UnaryOperator<TStream<T>> isolator) {
- List<TStream<T>> fanouts = new ArrayList<>(numFanouts);
- for (int i = 0; i < numFanouts; i++) {
- fanouts.add(isolator.apply(stream));
- }
- return fanouts;
- }
/**
* A tuple synchronization barrier.
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f147685..677467a 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -317,13 +316,18 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
public void testConcurrentMap() throws Exception {
Topology top = newTopology("testConcurrentMap");
- Function<Integer,JsonObject> a1 = fakeAnalytic(0, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a2 = fakeAnalytic(1, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a3 = fakeAnalytic(2, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a4 = fakeAnalytic(3, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a5 = fakeAnalytic(4, 100, TimeUnit.MILLISECONDS);
+ int ch = 0;
+ List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ // a couple much faster just in case something's amiss with queues
+ mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
- List<Function<Integer,JsonObject>> mappers = new ArrayList<>(Arrays.asList(a1, a2, a3, a4, a5));
Function<List<JsonObject>,Integer> combiner = list -> {
int sum = 0;
int cnt = 0;
@@ -372,13 +376,13 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
Topology top = newTopology("testConcurrent");
int ch = 0;
- Function<TStream<Integer>,TStream<JsonObject>> p1 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p2 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p3 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p4 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p5 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+ List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>(Arrays.asList(p1, p2, p3, p4, p5));
Function<List<JsonObject>,Integer> tupleCombiner = list -> {
int sum = 0;
int cnt = 0;