You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:40 UTC
[39/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java
new file mode 100644
index 0000000..1c7d0a7
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java
@@ -0,0 +1,682 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.plumbing;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.ToIntFunction;
+import org.apache.edgent.oplet.plumbing.Barrier;
+import org.apache.edgent.oplet.plumbing.Isolate;
+import org.apache.edgent.oplet.plumbing.PressureReliever;
+import org.apache.edgent.oplet.plumbing.UnorderedIsolate;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TopologyProvider;
+
+/**
+ * Plumbing utilities for {@link TStream}.
+ * Methods that manipulate the flow of tuples in a streaming topology,
+ * but are not part of the logic of the application.
+ */
+public class PlumbingStreams {
+
+ /**
+ * Insert a blocking delay between tuples.
+ * Returned stream is the input stream delayed by {@code delay}.
+ * <p>
+ * Delays less than 1msec are translated to a 0 delay.
+ * <p>
+ * This function always adds the {@code delay} amount after receiving
+ * a tuple before forwarding it.
+ * <p>
+ * Downstream tuple processing delays will affect
+ * the overall delay of a subsequent tuple.
+ * <p>
+ * e.g., the input stream contains two tuples t1 and t2 and
+ * the delay is 100ms. The forwarding of t1 is delayed by 100ms.
+ * Then if a downstream processing delay of 80ms occurs, this function
+ * receives t2 80ms after it forwarded t1 and it will delay another
+ * 100ms before forwarding t2. Hence the overall delay between forwarding
+ * t1 and t2 is 180ms.
+ * See {@link #blockingThrottle(long, TimeUnit) blockingThrottle}.
+ *
+ * @param <T> Tuple type
+ * @param stream Stream t
+ * @param delay Amount of time to delay a tuple.
+ * @param unit Time unit for {@code delay}.
+ *
+ * @return Stream that will be delayed.
+ */
+ public static <T> TStream<T> blockingDelay(TStream<T> stream, long delay, TimeUnit unit) {
+ return stream.map(t -> {try {
+ Thread.sleep(unit.toMillis(delay));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } return t;}) ;
+ }
+
+ /**
+ * Maintain a constant blocking delay between tuples.
+ * The returned stream is the input stream throttled by {@code delay}.
+ * <p>
+ * Delays less than 1msec are translated to a 0 delay.
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * TStream<String> stream = topology.strings("a", "b, "c");
+ * // Create a stream with tuples throttled to 1 second intervals.
+ * TStream<String> throttledStream = blockingThrottle(stream, 1, TimeUnit.SECOND);
+ * // print out the throttled tuples as they arrive
+ * throttledStream.peek(t -> System.out.println(new Date() + " - " + t));
+ * }</pre>
+ * <p>
+ * The function adjusts for downstream processing delays.
+ * The first tuple is not delayed. If {@code delay} has already
+ * elapsed since the prior tuple was forwarded, the tuple
+ * is forwarded immediately.
+ * Otherwise, forwarding the tuple is delayed to achieve
+ * a {@code delay} amount since forwarding the prior tuple.
+ * <p>
+ * e.g., the input stream contains two tuples t1 and t2 and
+ * the delay is 100ms. The forwarding of t1 is delayed by 100ms.
+ * Then if a downstream processing delay of 80ms occurs, this function
+ * receives t2 80ms after it forwarded t1 and it will only delay another
+ * 20ms (100ms - 80ms) before forwarding t2.
+ * Hence the overall delay between forwarding t1 and t2 remains 100ms.
+ *
+ * @param <T> tuple type
+ * @param stream the stream to throttle
+ * @param delay Amount of time to delay a tuple.
+ * @param unit Time unit for {@code delay}.
+ * @return the throttled stream
+ */
+ public static <T> TStream<T> blockingThrottle(TStream<T> stream, long delay, TimeUnit unit) {
+ return stream.map( blockingThrottle(delay, unit) );
+ }
+
+ private static <T> Function<T,T> blockingThrottle(long delay, TimeUnit unit) {
+ long[] nextTupleTime = { 0 };
+ return t -> {
+ long now = System.currentTimeMillis();
+ if (nextTupleTime[0] != 0) {
+ if (now < nextTupleTime[0]) {
+ try {
+ Thread.sleep(nextTupleTime[0] - now);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ now = System.currentTimeMillis();
+ }
+ }
+ nextTupleTime[0] = now + unit.toMillis(delay);
+ return t;
+ };
+ }
+
+ /**
+ * Insert a blocking delay before forwarding the first tuple and
+ * no delay for subsequent tuples.
+ * <p>
+ * Delays less than 1msec are translated to a 0 delay.
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * TStream<String> stream = topology.strings("a", "b, "c");
+ * // create a stream where the first tuple is delayed by 5 seconds.
+ * TStream<String> oneShotDelayedStream =
+ * stream.map( blockingOneShotDelay(5, TimeUnit.SECONDS) );
+ * }</pre>
+ *
+ * @param <T> tuple type
+ * @param stream input stream
+ * @param delay Amount of time to delay a tuple.
+ * @param unit Time unit for {@code delay}.
+ * @return the delayed stream
+ */
+ public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
+ return stream.map( blockingOneShotDelay(delay, unit) );
+ }
+
+ private static <T> Function<T,T> blockingOneShotDelay(long delay, TimeUnit unit) {
+ long[] initialDelay = { unit.toMillis(delay) };
+ return t -> {
+ if (initialDelay[0] != -1) {
+ try {
+ Thread.sleep(initialDelay[0]);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ initialDelay[0] = -1;
+ }
+ return t;
+ };
+ }
+
+ /**
+ * Relieve pressure on upstream processing by discarding tuples.
+ * This method ensures that upstream processing is not
+ * constrained by any delay in downstream processing,
+ * for example by a connector not being able to connect
+ * to its external system.
+ * <P>
+ * Any downstream processing of the returned stream is isolated
+ * from {@code stream} so that any slow down does not affect {@code stream}.
+ * When the downstream processing cannot keep up with rate of
+ * {@code stream} tuples will be dropped from returned stream.
+ * <BR>
+ * Up to {@code count} of the most recent tuples per key from {@code stream}
+ * are maintained when downstream processing is slow, any older tuples
+ * that have not been submitted to the returned stream will be discarded.
+ * <BR>
+ * Tuple order is maintained within a partition but is not guaranteed to
+ * be maintained across partitions.
+ * </P>
+ *
+ * @param stream Stream to be isolated from downstream processing.
+ * @param keyFunction Function defining the key of each tuple.
+ * @param count Maximum number of tuples to maintain when downstream processing is backing up.
+ * @return Stream that is isolated from and thus relieves pressure on {@code stream}.
+ *
+ * @param <T> Tuple type.
+ * @param <K> Key type.
+ * @see #isolate(TStream, int) isolate
+ */
+ public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
+ return stream.pipe(new PressureReliever<>(count, keyFunction));
+ }
+
+ /**
+ * Isolate upstream processing from downstream processing.
+ * <BR>
+ * Implementations may throw {@code OutOfMemoryExceptions}
+ * if the processing against returned stream cannot keep up
+ * with the arrival rate of tuples on {@code stream}.
+ *
+ * @param <T> Tuple type
+ * @param stream Stream to be isolated from downstream processing.
+ * @param ordered {@code true} to maintain arrival order on the returned stream,
+ * {@code false} to not guaranteed arrival order.
+ * @return Stream that is isolated from {@code stream}.
+ */
+ public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
+ return stream.pipe(
+ ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
+ }
+
+ /**
+ * Isolate upstream processing from downstream processing.
+ * <P>
+ * If the processing against the returned stream cannot keep up
+ * with the arrival rate of tuples on {@code stream}, upstream
+ * processing will block until there is space in the queue between
+ * the streams.
+ * </P><P>
+ * Processing of tuples occurs in the order they were received.
+ * </P>
+ *
+ * @param <T> Tuple type
+ * @param stream Stream to be isolated from downstream processing.
+ * @param queueCapacity size of the queue between {@code stream} and
+ * the returned stream.
+ * @return Stream that is isolated from {@code stream}.
+ * @see #pressureReliever(TStream, Function, int) pressureReliever
+ */
+ public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
+ return stream.pipe(new Isolate<T>(queueCapacity));
+ }
+
+ /**
+ * Perform analytics concurrently.
+ * <P>
+ * This is a convenience function that calls
+ * {@link #concurrent(TStream, List, Function)} after
+ * creating {@code pipeline} and {@code combiner} functions
+ * from the supplied {@code mappers} and {@code combiner} arguments.
+ * </P><P>
+ * That is, it is logically, if not exactly, the same as:
+ * </P>
+ * <pre>{@code
+ * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+ * for (Function<T,U> mapper : mappers)
+ * pipelines.add(s -> s.map(mapper));
+ * concurrent(stream, pipelines, combiner);
+ * }</pre>
+ *
+ * @param <T> Tuple type on input stream.
+ * @param <U> Tuple type generated by mappers.
+ * @param <R> Tuple type of the result.
+ *
+ * @param stream input stream
+ * @param mappers functions to be run concurrently. Each mapper MUST
+ * return a non-null result.
+ * A runtime error will be generated if a null result
+ * is returned.
+ * @param combiner function to create a result tuple from the list of
+ * results from {@code mappers}.
+ * The input list order is 1:1 with the {@code mappers} list.
+ * I.e., list entry [0] is the result from mappers[0],
+ * list entry [1] is the result from mappers[1], etc.
+ * @return result stream
+ */
+ public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
+ Objects.requireNonNull(stream, "stream");
+ Objects.requireNonNull(mappers, "mappers");
+ Objects.requireNonNull(combiner, "combiner");
+
+ List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+ for (Function<T,U> mapper : mappers) {
+ pipelines.add(s -> s.map(mapper));
+ }
+
+ return concurrent(stream, pipelines, combiner);
+ }
+
+ /**
+ * Perform analytics concurrently.
+ * <P>
+ * Process input tuples one at at time, invoking the specified
+ * analytics ({@code pipelines}) concurrently, combine the results,
+ * and then process the next input tuple in the same manner.
+ * </P><P>
+ * Logically, instead of doing this:
+ * </P>
+ * <pre>{@code
+ * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+ * }</pre>
+ * create a graph that's logically like this:
+ * <pre>{@code
+ * -
+ * |-> A1 ->|
+ * sensorReadings<T> -> |-> A2 ->| -> results<R>
+ * |-> A3 ->|
+ *
+ * }</pre>
+ * more specifically a graph like this:
+ * <pre>{@code
+ * -
+ * |-> isolate(1) -> pipeline1 -> |
+ * stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner
+ * |-> isolate(1) -> pipeline3 -> |
+ * . . .
+ * }</pre>
+ * <P>
+ * The typical use case for this is when an application has a collection
+ * of independent analytics to perform on each tuple and the analytics
+ * are sufficiently long running such that performing them concurrently
+ * is desired.
+ * </P><P>
+ * Note, this is in contrast to "parallel" stream processing,
+ * which in Java8 Streams and other contexts means processing multiple
+ * tuples in parallel, each on a replicated processing pipeline.
+ * </P><P>
+ * Threadsafety - one of the following must be true:
+ * </P>
+ * <ul>
+ * <li>the tuples from {@code stream} are threadsafe</li>
+ * <li>the {@code pipelines} do not modify the input tuples</li>
+ * <li>the {@code pipelines} provide their own synchronization controls
+ * to protect concurrent modifications of the input tuples</li>
+ * </ul>
+ * <P>
+ * Logically, a thread is allocated for each of the {@code pipelines}.
+ * The actual degree of concurrency may be {@link TopologyProvider} dependent.
+ * </P>
+ *
+ * @param <T> Tuple type on input stream.
+ * @param <U> Tuple type generated by pipelines.
+ * @param <R> Tuple type of the result.
+ *
+ * @param stream input stream
+ * @param pipelines a list of functions to add a pipeline to the topology.
+ * Each {@code pipeline.apply()} is called with {@code stream}
+ * as the input, yielding the pipeline's result stream.
+ * For each input tuple, a pipeline MUST create exactly one output tuple.
+ * Tuple flow into the pipelines will cease if that requirement
+ * is not met.
+ * @param combiner function to create a result tuple from the list of
+ * results from {@code pipelines}.
+ * The input tuple list's order is 1:1 with the {@code pipelines} list.
+ * I.e., list entry [0] is the result from pipelines[0],
+ * list entry [1] is the result from pipelines[1], etc.
+ * @return result stream
+ * @see #barrier(List, int) barrier
+ */
+ public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
+ Objects.requireNonNull(stream, "stream");
+ Objects.requireNonNull(pipelines, "pipelines");
+ Objects.requireNonNull(combiner, "combiner");
+
+ int barrierQueueCapacity = 10; // don't preclude pipelines from getting ahead some.
+
+ // Add concurrent (isolated) fanouts
+ List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
+ for (int i = 0; i < pipelines.size(); i++)
+ fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
+
+ // Add pipelines
+ List<TStream<U>> results = new ArrayList<>(pipelines.size());
+ int ch = 0;
+ for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
+ results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
+ ch++;
+ }
+
+ // Add the barrier
+ TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
+
+ // Add the combiner
+ return barrier.map(combiner);
+ }
+
+ /**
+ * A tuple synchronization barrier.
+ * <P>
+ * Same as {@code barrier(others, 1)}
+ * </P>
+ * @param <T> Tuple type
+ * @param streams input streams
+ * @return the output stream
+ * @see #barrier(List, int)
+ */
+ public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
+ return barrier(streams, 1);
+ }
+
+ /**
+ * A tuple synchronization barrier.
+ * <P>
+ * A barrier has n input streams with tuple type {@code T}
+ * and one output stream with tuple type {@code List<T>}.
+ * Once the barrier receives one tuple on each of its input streams,
+ * it generates an output tuple containing one tuple from each input stream.
+ * It then waits until it has received another tuple from each input stream.
+ * </P><P>
+ * Input stream 0's tuple is in the output tuple's list[0],
+ * stream 1's tuple in list[1], and so on.
+ * </P><P>
+ * The barrier's output stream is isolated from the input streams.
+ * </P><P>
+ * The barrier has a queue of size {@code queueCapacity} for each
+ * input stream. When a tuple for an input stream is received it is
+ * added to its queue. The stream will block if the queue is full.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ *
+ * @param streams the list of input streams
+ * @param queueCapacity the size of each input stream's queue
+ * @return the output stream
+ * @see Barrier
+ */
+ public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
+ List<TStream<T>> others = new ArrayList<>(streams);
+ TStream<T> s1 = others.remove(0);
+ return s1.fanin(new Barrier<T>(queueCapacity), others);
+ }
+
+ /**
+ * Perform an analytic function on tuples in parallel.
+ * <P>
+ * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
+ * </P>
+ * @param <T> Input stream tuple type
+ * @param <U> Result stream tuple type
+ * @param stream input stream
+ * @param splitter the tuple channel allocation function
+ * @param mapper analytic function
+ * @param width number of channels
+ * @return the unordered result stream
+ * @see #roundRobinSplitter(int) roundRobinSplitter
+ * @see #concurrentMap(TStream, List, Function) concurrentMap
+ */
+ public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
+ BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
+ return parallel(stream, width, splitter, pipeline);
+ }
+
+ /**
+ * Perform an analytic pipeline on tuples in parallel.
+ * <P>
+ * Splits {@code stream} into {@code width} parallel processing channels,
+ * partitioning tuples among the channels using {@code splitter}.
+ * Each channel runs a copy of {@code pipeline}.
+ * The resulting stream is isolated from the upstream parallel channels.
+ * </P><P>
+ * The ordering of tuples in {@code stream} is not maintained in the
+ * results from {@code parallel}.
+ * </P><P>
+ * {@code pipeline} is not required to yield a result for each input
+ * tuple.
+ * </P><P>
+ * A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
+ * </P><P>
+ * The generated graph looks like this:
+ * </P>
+ * <pre>{@code
+ * -
+ * |-> isolate(10) -> pipeline-ch1 -> |
+ * stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
+ * |-> isolate(10) -> pipeline-ch3 -> |
+ * . . .
+ * }</pre>
+ *
+ * @param <T> Input stream tuple type
+ * @param <R> Result stream tuple type
+ *
+ * @param stream the input stream
+ * @param width number of parallel processing channels
+ * @param splitter the tuple channel allocation function
+ * @param pipeline the pipeline for each channel.
+ * {@code pipeline.apply(inputStream,channel)}
+ * is called to generate the pipeline for each channel.
+ * @return the isolated unordered result from each parallel channel
+ * @see #roundRobinSplitter(int) roundRobinSplitter
+ * @see #concurrent(TStream, List, Function) concurrent
+ */
+ public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
+ Objects.requireNonNull(stream, "stream");
+ if (width < 1)
+ throw new IllegalArgumentException("width");
+ Objects.requireNonNull(splitter, "splitter");
+ Objects.requireNonNull(pipeline, "pipeline");
+
+ // Add the splitter
+ List<TStream<T>> channels = stream.split(width, splitter);
+ for (int ch = 0; ch < width; ch++)
+ channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
+
+ // Add concurrency (isolation) to the channels
+ int chBufferSize = 10; // don't immediately block stream if channel is busy
+ for (int ch = 0; ch < width; ch++)
+ channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
+
+ // Add pipelines
+ List<TStream<R>> results = new ArrayList<>(width);
+ for (int ch = 0; ch < width; ch++) {
+ results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
+ }
+
+ // Add the Union
+ TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
+
+ // Add the isolate - keep channel threads to just their pipeline processing
+ return isolate(result, width);
+ }
+
+ /**
+ * Perform an analytic pipeline on tuples in parallel.
+ * <P>
+ * Splits {@code stream} into {@code width} parallel processing channels,
+ * partitioning tuples among the channels in a load balanced fashion.
+ * Each channel runs a copy of {@code pipeline}.
+ * The resulting stream is isolated from the upstream parallel channels.
+ * </P><P>
+ * The ordering of tuples in {@code stream} is not maintained in the
+ * results from {@code parallel}.
+ * </P><P>
+ * A {@code pipeline} <b>MUST</b> yield a result for each input
+ * tuple. Failure to do so will result in the channel remaining
+ * in a busy state and no longer available to process additional tuples.
+ * </P><P>
+ * A {@link LoadBalancedSplitter} is used to distribute tuples.
+ * </P><P>
+ * The generated graph looks like this:
+ * </P>
+ * <pre>{@code
+ * -
+ * |-> isolate(1) -> pipeline-ch1 -> peek(splitter.channelDone()) -> |
+ * stream -> split(width,splitter) -> |-> isolate(1) -> pipeline-ch2 -> peek(splitter.channelDone()) -> |-> union -> isolate(width)
+ * |-> isolate(1) -> pipeline-ch3 -> peek(splitter.channelDone()) -> |
+ * . . .
+ * }</pre>
+ * <P>
+ * Note, this implementation requires that the splitter is used from
+ * only a single JVM. The {@link org.apache.edgent.providers.direct.DirectProvider DirectProvider}
+ * provider meets this requirement.
+ * </P>
+ *
+ * @param <T> Input stream tuple type
+ * @param <R> Result stream tuple type
+ *
+ * @param stream the input stream
+ * @param width number of parallel processing channels
+ * @param pipeline the pipeline for each channel.
+ * {@code pipeline.apply(inputStream,channel)}
+ * is called to generate the pipeline for each channel.
+ * @return the isolated unordered result from each parallel channel
+ * @see #parallel(TStream, int, ToIntFunction, BiFunction)
+ * @see LoadBalancedSplitter
+ */
+ public static <T,R> TStream<R> parallelBalanced(TStream<T> stream, int width, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
+ Objects.requireNonNull(stream, "stream");
+ if (width < 1)
+ throw new IllegalArgumentException("width");
+ Objects.requireNonNull(pipeline, "pipeline");
+
+ LoadBalancedSplitter<T> splitter = new LoadBalancedSplitter<>(width);
+
+ // Add the splitter
+ List<TStream<T>> channels = stream.split(width, splitter);
+ for (int ch = 0; ch < width; ch++)
+ channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
+
+ // Add concurrency (isolation) to the channels
+ int chBufferSize = 1; // 1 is enough with load balanced impl
+ for (int ch = 0; ch < width; ch++)
+ channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
+
+ // Add pipelines
+ List<TStream<R>> results = new ArrayList<>(width);
+ for (int ch = 0; ch < width; ch++) {
+ final int finalCh = ch;
+ results.add(pipeline.apply(channels.get(ch), ch)
+ .tag("parallel-ch"+ch)
+ .peek(tuple -> splitter.channelDone(finalCh)));
+ }
+
+ // Add the Union
+ TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
+
+ // Add the isolate - keep channel threads to just their pipeline processing
+ return isolate(result, width);
+ }
+
+ /**
+ * A round-robin splitter ToIntFunction
+ * <P>
+ * The splitter function cycles among the {@code width} channels
+ * on successive calls to {@code roundRobinSplitter.applyAsInt()},
+ * returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
+ * </P>
+ * @param <T> Tuple type
+ * @param width number of splitter channels
+ * @return the splitter
+ * @see TStream#split(int, ToIntFunction) TStream.split
+ * @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
+ */
+ public static <T> ToIntFunction<T> roundRobinSplitter(int width) {
+ AtomicInteger cnt = new AtomicInteger();
+ return tuple -> cnt.getAndIncrement() % width;
+ }
+ /**
+ * Control the flow of tuples to an output stream.
+ * <P>
+ * A {@link Semaphore}
+ * is used to control the flow of tuples
+ * through the {@code gate}
+ * . The gate acquires a permit from the
+ * semaphore to pass the tuple through, blocking until a permit is
+ * acquired (and applying backpressure upstream while blocked).
+ * Elsewhere, some code calls {@link Semaphore#release(int)}
+ * to make permits available.
+ * </P><P>
+ * If a TopologyProvider is used that can distribute a topology's
+ * streams to different JVM's the gate and the code releasing the
+ * permits must be in the same JVM.
+ * </P><P>
+ * Sample use:
+ * <BR>
+ * Suppose you wanted to control processing such that concurrent
+ * pipelines processed each tuple in lock-step.
+ * I.e., You want all of the pipelines to start processing a tuple
+ * at the same time and not start a new tuple until the current
+ * tuple had been fully processed by each of them:
+ * </P>
+ * <pre>{@code
+ * TStream<Integer> readings = ...;
+ * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
+ * TStream<Integer> gated = gate(readings, gateControl);
+ *
+ * // Create the concurrent pipeline combiner and have it
+ * // signal that concurrent processing of the tuple has completed.
+ * // In this sample the combiner just returns the received list of
+ * // each pipeline result.
+ *
+ * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
+ * stream -> stream.map(list -> { * gateControl.release(); * return list; * });
+ *
+ * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
+ * }</pre>
+ * @param <T> Tuple type
+ * @param stream the input stream
+ * @param semaphore gate control
+ * @return gated stream
+ */
+ public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
+ return stream.map(tuple -> {
+ try {
+ semaphore.acquire();
+ return tuple;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("interrupted", e);
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java
new file mode 100644
index 0000000..a7120cb
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java
@@ -0,0 +1,105 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.plumbing;
+
+import org.apache.edgent.function.Predicate;
+
+/**
+ * A generic "valve" {@link Predicate}.
+ * <p>
+ * A valve is either open or closed.
+ * When used as a Predicate to {@code TStream.filter()},
+ * filter passes tuples only when the valve is open.
+ * </p><p>
+ * A valve is typically used to dynamically control whether or not
+ * some downstream tuple processing is enabled. A decision to change the
+ * state of the valve may be a result of local analytics or an external
+ * command.
+ * <br>
+ * E.g., in a simple case, a Valve might be used to control
+ * whether or not logging or publishing of tuples is enabled.
+ * </p>
+ * <pre>{@code
+ * TStream<JsonObject> stream = ...;
+ *
+ * Valve<JsonObject> valve = new Valve<>(false);
+ * stream.filter(valve).sink(someTupleLoggingConsumer);
+ *
+ * // from some analytic or device command handler...
+ * valve.setOpen(true);
+ * }</pre>
+ *
+ * @param <T> tuple type
+ */
+public class Valve<T> implements Predicate<T> {
+ private static final long serialVersionUID = 1L;
+ private transient boolean isOpen;
+
+ /**
+ * Create a new Valve Predicate
+ * <p>
+ * Same as {@code Valve(true)}
+ */
+ public Valve() {
+ this(true);
+ }
+
+ /**
+ * Create a new Valve Predicate
+ * <p>
+ * @param isOpen the initial state
+ */
+ public Valve(boolean isOpen) {
+ setOpen(isOpen);
+ }
+
+ /**
+ * Set the valve state
+ * @param isOpen true to open the valve
+ */
+ public void setOpen(boolean isOpen) {
+ this.isOpen = isOpen;
+ }
+
+ /**
+ * Get the valve state
+ * @return the state, true if the valve is open, false otherwise
+ */
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ /**
+ * Test the state of the valve, {@code value} is ignored.
+ * @return true when the valve is open, false otherwise
+ */
+ @Override
+ public boolean test(T value) {
+ return isOpen;
+ }
+
+ /**
+ * Returns a String for development/debug support. Content subject to change.
+ */
+ @Override
+ public String toString() {
+ return "isOpen="+isOpen;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java
new file mode 100644
index 0000000..29952e5
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Plumbing for a streaming topology.
+ * Methods that manipulate the flow of tuples in a streaming topology,
+ * but are not part of the logic of the application.
+ */
+package org.apache.edgent.topology.plumbing;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java b/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java
new file mode 100644
index 0000000..6a710e5
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.services;
+
+import java.util.Set;
+
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.mbeans.ApplicationServiceMXBean;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Application registration service.
+ * A service that allows registration of applications and
+ * the ability to submit them through a control MBean.
+ *
+ * @see ApplicationServiceMXBean
+ */
+public interface ApplicationService {
+
+ /**
+ * Default alias a service registers its control MBean as.
+ * Value is {@value}.
+ */
+ String ALIAS = "edgent";
+
+ /**
+ * Prefix ({@value}) reserved for system application names.
+ */
+ String SYSTEM_APP_PREFIX = "edgent";
+
+ /**
+ * Add a topology that can be started though a control mbean.
+ * Any registration replaces any existing application with the same name.
+ * <BR>
+ * When a {@link ApplicationServiceMXBean#submit(String, String) submit}
+ * is invoked {@code builder.accept(topology, config)} is called passing:
+ * <UL>
+ * <LI>
+ * {@code topology} - An empty topology with the name {@code applicationName}.
+ * </LI>
+ * <LI>
+ * {@code config} - JSON submission configuration from
+ * {@link ApplicationServiceMXBean#submit(String, String) submit}.
+ * </LI>
+ * </UL>
+ * Once {@code builder.accept(topology, config)} returns it is submitted
+ * to the {@link Submitter} associated with the implementation of this service.
+ * <P>
+ * Application names starting with {@link #SYSTEM_APP_PREFIX edgent} are reserved
+ * for system applications.
+ * </P>
+ *
+ * @param applicationName Application name to register.
+ * @param builder How to build the topology for this application.
+ *
+ * @see ApplicationServiceMXBean
+ */
+ void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
+
+ /**
+ * Register a jar file containing new applications.
+ * Any service provider within the jar of type {@link TopologyBuilder}
+ * will be {@link #registerTopology(String, BiConsumer) registered} as
+ * a topology.
+ *
+ * The jar cannot have any new dependencies, its classpath will
+ * be the classpath of this service.
+ *
+ * @param jarURL URL of Jar containing new applications.
+ * @param jsonConfig Configuration information, currently unused.
+ * @throws Exception if failure
+ */
+ void registerJar(String jarURL, String jsonConfig) throws Exception;
+
+ /**
+ * Returns the names of applications registered with this service.
+ *
+ * @return the names of applications registered with this service.
+ */
+ Set<String> getApplicationNames();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java b/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java
new file mode 100644
index 0000000..be32b1c
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.services;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Represents an topology that can be built.
+ *
+ * A class implementing {@code TopologyBuilder} can
+ * be registered as a service provider in a jar file for
+ * automatic application registration using
+ * {@link ApplicationService#registerJar(String, String)}.
+ *
+ */
+public interface TopologyBuilder {
+ /**
+ * Name the application will be known as.
+ * @return Name the application will be known as.
+ */
+ String getName();
+
+ /**
+ * How the application is built.
+ * @return Function that builds the application.
+ */
+ BiConsumer<Topology, JsonObject> getBuilder();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java b/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java
new file mode 100644
index 0000000..7f45168
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Services for topologies.
+ *
+ */
+package org.apache.edgent.topology.services;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java b/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java
new file mode 100644
index 0000000..2a0c21b
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.edgent.topology.tester;
+
+/**
+ * Function representing if a condition is valid or not.
+ *
+ * @param <T> Condition's result type
+ */
+public interface Condition<T> {
+
+ boolean valid();
+
+ T getResult();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java b/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java
new file mode 100644
index 0000000..22fc52c
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java
@@ -0,0 +1,180 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.edgent.topology.tester;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyElement;
+
+import com.google.gson.JsonObject;
+
+/**
+ * A {@code Tester} adds the ability to test a topology in a test framework such
+ * as JUnit.
+ *
+ * The main feature is the ability to capture tuples from a {@link TStream} in
+ * order to perform some form of verification on them. There are two mechanisms
+ * to perform verifications:
+ * <UL>
+ * <LI>did the stream produce the correct number of tuples.</LI>
+ * <LI>did the stream produce the correct tuples.</LI>
+ * </UL>
+ * Currently, only streams that are instances of
+ * {@code TStream<String>} can have conditions or handlers attached.
+ * <P>
+ * A {@code Tester} modifies its {@link Topology} to achieve the above purpose.
+ * </P>
+ */
+public interface Tester extends TopologyElement {
+
+ /**
+ * Return a condition that evaluates if {@code stream} has submitted exactly
+ * {@code expectedCount} number of tuples. The function may be evaluated
+ * after the {@link Submitter#submit(Object, JsonObject) submit}
+ * call has returned. <BR>
+ * The {@link Condition#getResult() result} of the returned
+ * {@code Condition} is the number of tuples seen on {@code stream} so far.
+ * <BR>
+ * If the topology is still executing then the returned values from
+ * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+ * more tuples are seen on {@code stream}. <BR>
+ *
+ * @param stream
+ * Stream to be tested.
+ * @param expectedCount
+ * Number of tuples expected on {@code stream}.
+ * @return True if the stream has submitted exactly {@code expectedCount}
+ * number of tuples, false otherwise.
+ */
+ Condition<Long> tupleCount(TStream<?> stream, long expectedCount);
+
+ /**
+ * Return a condition that evaluates if {@code stream} has submitted at
+ * least {@code expectedCount} number of tuples. The function may be
+ * evaluated after the
+ * {@link Submitter#submit(Object, JsonObject) submit} call has returned. <BR>
+ * The {@link Condition#getResult() result} of the returned
+ * {@code Condition} is the number of tuples seen on {@code stream} so far.
+ * <BR>
+ * If the topology is still executing then the returned values from
+ * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+ * more tuples are seen on {@code stream}. <BR>
+ *
+ * @param stream
+ * Stream to be tested.
+ * @param expectedCount
+ * Number of tuples expected on {@code stream}.
+ * @return Condition that will return true the stream has submitted at least
+ * {@code expectedCount} number of tuples, false otherwise.
+ */
+ Condition<Long> atLeastTupleCount(TStream<?> stream, long expectedCount);
+
+ /**
+ * Return a condition that evaluates if {@code stream} has submitted
+ * tuples matching {@code values} in the same order. <BR>
+ * The {@link Condition#getResult() result} of the returned
+ * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
+ * If the topology is still executing then the returned values from
+ * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+ * more tuples are seen on {@code stream}. <BR>
+ *
+ * @param <T> Tuple type
+ * @param stream
+ * Stream to be tested.
+ * @param values
+ * Expected tuples on {@code stream}.
+ * @return Condition that will return true if the stream has submitted at
+ * least tuples matching {@code values} in the same order, false
+ * otherwise.
+ */
+ <T> Condition<List<T>> streamContents(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
+
+ /**
+ * Return a condition that evaluates if {@code stream} has submitted
+ * tuples matching {@code values} in any order. <BR>
+ * The {@link Condition#getResult() result} of the returned
+ * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
+ * If the topology is still executing then the returned values from
+ * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+ * more tuples are seen on {@code stream}. <BR>
+ *
+ * @param <T> Tuple type
+ * @param stream
+ * Stream to be tested.
+ * @param values
+ * Expected tuples on {@code stream}.
+ * @return Condition that will return true if the stream has submitted at
+ * least tuples matching {@code values} in the any order, false
+ * otherwise.
+ */
+ <T> Condition<List<T>> contentsUnordered(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
+
+ /**
+ * Return a condition that is valid only if all of {@code conditions} are valid.
+ * The result of the condition is {@link Condition#valid()}
+ * @param conditions Conditions to AND together.
+ * @return condition that is valid only if all of {@code conditions} are valid.
+ */
+ Condition<Boolean> and(final Condition<?>... conditions);
+
+ /**
+ * Submit the topology for this tester and wait for it to complete, or reach
+ * an end condition. If the topology does not complete or reach its end
+ * condition before {@code timeout} then it is terminated.
+ * <P>
+ * End condition is usually a {@link Condition} returned from
+ * {@link #atLeastTupleCount(TStream, long)} or
+ * {@link #tupleCount(TStream, long)} so that this method returns once the
+ * stream has submitted a sufficient number of tuples. <BR>
+ * Note that the condition will be only checked periodically up to
+ * {@code timeout}, so that if the condition is only valid for a brief
+ * period of time, then its valid state may not be seen, and thus this
+ * method will wait for the timeout period.
+ * </P>
+ *
+ * @param submitter the {@link Submitter}
+ * @param config
+ * submission configuration.
+ * @param endCondition
+ * Condition that will cause this method to return if it is true.
+ * @param timeout
+ * Maximum time to wait for the topology to complete or reach its
+ * end condition.
+ * @param unit
+ * Unit for {@code timeout}.
+ * @return The value of {@code endCondition.valid()}.
+ *
+ * @throws Exception
+ * Failure submitting or executing the topology.
+ */
+ boolean complete(Submitter<Topology, ? extends Job> submitter, JsonObject config, Condition<?> endCondition,
+ long timeout, TimeUnit unit) throws Exception;
+
+ /**
+ * Get the {@code Job} reference for the topology submitted by {@code complete()}.
+ * @return {@code Job} reference for the topology submitted by {@code complete()}.
+ * Null if the {@code complete()} has not been called or the {@code Job} instance is not yet available.
+ */
+ Job getJob();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java b/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java
new file mode 100644
index 0000000..d160725
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Testing for a streaming topology.
+ */
+package org.apache.edgent.topology.tester;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java b/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java
deleted file mode 100644
index a76b5b6..0000000
--- a/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-import edgent.function.Function;
-import edgent.topology.json.JsonFunctions;
-
-public class JsonFunctionsTest {
-
- private JsonObject newTestObject() {
- // Just a mix of things so we have reasonable confidence
- // the JsonFunctions are working.
- JsonObject jo = new JsonObject();
- jo.addProperty("boolean", true);
- jo.addProperty("character", 'c');
- jo.addProperty("short", (short)7);
- jo.addProperty("int", 23);
- jo.addProperty("long", 99L);
- jo.addProperty("float", 3.0f);
- jo.addProperty("double", 7.128d);
- jo.addProperty("string", "a string value");
- JsonArray ja = new JsonArray();
- ja.add(new JsonPrimitive(123));
- ja.add(new JsonPrimitive(456));
- jo.add("array", ja);
- JsonObject jo2 = new JsonObject();
- jo2.addProperty("int", 789);
- jo.add("object", jo2);
- return jo;
- }
-
- @Test
- public void testStrings() {
- JsonObject jo1 = newTestObject();
- Function<JsonObject,String> asString = JsonFunctions.asString();
- Function<String,JsonObject> fromString = JsonFunctions.fromString();
-
- String s1 = asString.apply(jo1);
- JsonObject jo2 = fromString.apply(s1);
-
- assertEquals(jo2, jo1);
- }
-
- @Test
- public void testBytes() {
- JsonObject jo1 = newTestObject();
- Function<JsonObject,byte[]> asBytes = JsonFunctions.asBytes();
- Function<byte[],JsonObject> fromBytes = JsonFunctions.fromBytes();
-
- byte[] b1 = asBytes.apply(jo1);
- JsonObject jo2 = fromBytes.apply(b1);
-
- assertEquals(jo2, jo1);
- }
-}