You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:40 UTC

[39/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java
new file mode 100644
index 0000000..1c7d0a7
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/PlumbingStreams.java
@@ -0,0 +1,682 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.plumbing;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.ToIntFunction;
+import org.apache.edgent.oplet.plumbing.Barrier;
+import org.apache.edgent.oplet.plumbing.Isolate;
+import org.apache.edgent.oplet.plumbing.PressureReliever;
+import org.apache.edgent.oplet.plumbing.UnorderedIsolate;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TopologyProvider;
+
+/**
+ * Plumbing utilities for {@link TStream}.
+ * Methods that manipulate the flow of tuples in a streaming topology,
+ * but are not part of the logic of the application.
+ */
+public class PlumbingStreams {
+  
+    /**
+     * Insert a blocking delay between tuples.
+     * Returned stream is the input stream delayed by {@code delay}.
+     * <p>
+     * Delays less than 1msec are translated to a 0 delay.
+     * <p>
+     * This function always adds the {@code delay} amount after receiving
+     * a tuple before forwarding it.  
+     * <p>
+     * Downstream tuple processing delays will affect
+     * the overall delay of a subsequent tuple.
+     * <p>
+     * e.g., the input stream contains two tuples t1 and t2 and
+     * the delay is 100ms.  The forwarding of t1 is delayed by 100ms.
+     * Then if a downstream processing delay of 80ms occurs, this function
+     * receives t2 80ms after it forwarded t1 and it will delay another
+     * 100ms before forwarding t2.  Hence the overall delay between forwarding
+     * t1 and t2 is 180ms.
+     * See {@link #blockingThrottle(long, TimeUnit) blockingThrottle}.
+     *
+     * @param <T> Tuple type
+     * @param stream Stream t
+     * @param delay Amount of time to delay a tuple.
+     * @param unit Time unit for {@code delay}.
+     * 
+     * @return Stream that will be delayed.
+     */
+    public static <T> TStream<T> blockingDelay(TStream<T> stream, long delay, TimeUnit unit) {
+        return stream.map(t -> {try {
+            Thread.sleep(unit.toMillis(delay));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        } return t;}) ;
+    }
+    
+    /**
+     * Maintain a constant blocking delay between tuples.
+     * The returned stream is the input stream throttled by {@code delay}.
+     * <p>
+     * Delays less than 1msec are translated to a 0 delay.
+     * <p>
+     * Sample use:
+     * <pre>{@code
+     * TStream<String> stream = topology.strings("a", "b, "c");
+     * // Create a stream with tuples throttled to 1 second intervals.
+     * TStream<String> throttledStream = blockingThrottle(stream, 1, TimeUnit.SECOND);
+     * // print out the throttled tuples as they arrive
+     * throttledStream.peek(t -> System.out.println(new Date() + " - " + t));
+     * }</pre>
+     * <p>
+     * The function adjusts for downstream processing delays.
+     * The first tuple is not delayed.  If {@code delay} has already
+     * elapsed since the prior tuple was forwarded, the tuple 
+     * is forwarded immediately.
+     * Otherwise, forwarding the tuple is delayed to achieve
+     * a {@code delay} amount since forwarding the prior tuple.
+     * <p>
+     * e.g., the input stream contains two tuples t1 and t2 and
+     * the delay is 100ms.  The forwarding of t1 is delayed by 100ms.
+     * Then if a downstream processing delay of 80ms occurs, this function
+     * receives t2 80ms after it forwarded t1 and it will only delay another
+     * 20ms (100ms - 80ms) before forwarding t2.  
+     * Hence the overall delay between forwarding t1 and t2 remains 100ms.
+     * 
+     * @param <T> tuple type
+     * @param stream the stream to throttle
+     * @param delay Amount of time to delay a tuple.
+     * @param unit Time unit for {@code delay}.
+     * @return the throttled stream
+     */
+    public static <T> TStream<T> blockingThrottle(TStream<T> stream, long delay, TimeUnit unit) {
+        return stream.map( blockingThrottle(delay, unit) );
+    }
+
+    private static <T> Function<T,T> blockingThrottle(long delay, TimeUnit unit) {
+        long[] nextTupleTime = { 0 };
+        return t -> {
+            long now = System.currentTimeMillis();
+            if (nextTupleTime[0] != 0) {
+                if (now < nextTupleTime[0]) {
+                    try {
+                        Thread.sleep(nextTupleTime[0] - now);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                    now = System.currentTimeMillis();
+                }
+            }
+            nextTupleTime[0] = now + unit.toMillis(delay);
+            return t;
+        };
+    }
+    
+    /**
+     * Insert a blocking delay before forwarding the first tuple and
+     * no delay for subsequent tuples.
+     * <p>
+     * Delays less than 1msec are translated to a 0 delay.
+     * <p>
+     * Sample use:
+     * <pre>{@code
+     * TStream<String> stream = topology.strings("a", "b, "c");
+     * // create a stream where the first tuple is delayed by 5 seconds. 
+     * TStream<String> oneShotDelayedStream =
+     *      stream.map( blockingOneShotDelay(5, TimeUnit.SECONDS) );
+     * }</pre>
+     * 
+     * @param <T> tuple type
+     * @param stream input stream
+     * @param delay Amount of time to delay a tuple.
+     * @param unit Time unit for {@code delay}.
+     * @return the delayed stream
+     */
+    public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
+        return stream.map( blockingOneShotDelay(delay, unit) );
+    }
+
+    private static <T> Function<T,T> blockingOneShotDelay(long delay, TimeUnit unit) {
+        long[] initialDelay = { unit.toMillis(delay) };
+        return t -> {
+            if (initialDelay[0] != -1) {
+                try {
+                    Thread.sleep(initialDelay[0]);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+                initialDelay[0] = -1;
+            }
+            return t;
+            };
+    }
+    
+    /**
+     * Relieve pressure on upstream processing by discarding tuples.
+     * This method ensures that upstream processing is not
+     * constrained by any delay in downstream processing,
+     * for example by a connector not being able to connect
+     * to its external system.
+     * <P>
+     * Any downstream processing of the returned stream is isolated
+     * from {@code stream} so that any slow down does not affect {@code stream}.
+     * When the downstream processing cannot keep up with rate of
+     * {@code stream} tuples will be dropped from returned stream.
+     * <BR>
+     * Up to {@code count} of the most recent tuples per key from {@code stream}
+     * are maintained when downstream processing is slow, any older tuples
+     * that have not been submitted to the returned stream will be discarded.
+     * <BR>
+     * Tuple order is maintained within a partition but is not guaranteed to
+     * be maintained across partitions.
+     * </P>
+     * 
+     * @param stream Stream to be isolated from downstream processing.
+     * @param keyFunction Function defining the key of each tuple.
+     * @param count Maximum number of tuples to maintain when downstream processing is backing up.
+     * @return Stream that is isolated from and thus relieves pressure on {@code stream}.
+     * 
+     * @param <T> Tuple type.
+     * @param <K> Key type.
+     * @see #isolate(TStream, int) isolate
+     */
+    public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
+        return stream.pipe(new PressureReliever<>(count, keyFunction));
+    }
+    
+    /**
+     * Isolate upstream processing from downstream processing.
+     * <BR>
+     * Implementations may throw {@code OutOfMemoryExceptions} 
+     * if the processing against returned stream cannot keep up
+     * with the arrival rate of tuples on {@code stream}.
+     *
+     * @param <T> Tuple type
+     * @param stream Stream to be isolated from downstream processing.
+     * @param ordered {@code true} to maintain arrival order on the returned stream,
+     * {@code false} to not guaranteed arrival order.
+     * @return Stream that is isolated from {@code stream}.
+     */
+    public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
+        return stream.pipe(
+                ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
+    }
+    
+    /**
+     * Isolate upstream processing from downstream processing.
+     * <P>
+     * If the processing against the returned stream cannot keep up
+     * with the arrival rate of tuples on {@code stream}, upstream
+     * processing will block until there is space in the queue between
+     * the streams.
+     * </P><P>
+     * Processing of tuples occurs in the order they were received.
+     * </P>
+     * 
+     * @param <T> Tuple type
+     * @param stream Stream to be isolated from downstream processing.
+     * @param queueCapacity size of the queue between {@code stream} and
+     *        the returned stream.
+     * @return Stream that is isolated from {@code stream}.
+     * @see #pressureReliever(TStream, Function, int) pressureReliever
+     */
+    public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
+      return stream.pipe(new Isolate<T>(queueCapacity));
+    }
+    
+    /**
+     * Perform analytics concurrently.
+     * <P>
+     * This is a convenience function that calls
+     * {@link #concurrent(TStream, List, Function)} after
+     * creating {@code pipeline} and {@code combiner} functions
+     * from the supplied {@code mappers} and {@code combiner} arguments.
+     * </P><P>
+     * That is, it is logically, if not exactly, the same as:
+     * </P>
+     * <pre>{@code
+     * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+     * for (Function<T,U> mapper : mappers)
+     *   pipelines.add(s -> s.map(mapper));
+     * concurrent(stream, pipelines, combiner);
+     * }</pre>
+     * 
+     * @param <T> Tuple type on input stream.
+     * @param <U> Tuple type generated by mappers.
+     * @param <R> Tuple type of the result.
+     * 
+     * @param stream input stream
+     * @param mappers functions to be run concurrently.  Each mapper MUST
+     *                 return a non-null result.
+     *                 A runtime error will be generated if a null result
+     *                 is returned.
+     * @param combiner function to create a result tuple from the list of
+     *                 results from {@code mappers}.
+     *                 The input list order is 1:1 with the {@code mappers} list.
+     *                 I.e., list entry [0] is the result from mappers[0],
+     *                 list entry [1] is the result from mappers[1], etc.
+     * @return result stream
+     */
+    public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
+      Objects.requireNonNull(stream, "stream");
+      Objects.requireNonNull(mappers, "mappers");
+      Objects.requireNonNull(combiner, "combiner");
+      
+      List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
+      for (Function<T,U> mapper : mappers) {
+        pipelines.add(s -> s.map(mapper));
+      }
+      
+      return concurrent(stream, pipelines, combiner);
+    }
+
+    /**
+     * Perform analytics concurrently.
+     * <P>
+     * Process input tuples one at at time, invoking the specified
+     * analytics ({@code pipelines}) concurrently, combine the results,
+     * and then process the next input tuple in the same manner.
+     * </P><P>
+     * Logically, instead of doing this:
+     * </P>
+     * <pre>{@code
+     * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+     * }</pre>
+     * create a graph that's logically like this:
+     * <pre>{@code
+     * - 
+     *                      |-> A1 ->|
+     * sensorReadings<T> -> |-> A2 ->| -> results<R>
+     *                      |-> A3 ->|
+     * 
+     * }</pre>
+     * more specifically a graph like this:
+     * <pre>{@code
+     * -
+     *           |-> isolate(1) -> pipeline1 -> |
+     * stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner 
+     *           |-> isolate(1) -> pipeline3 -> |
+     *                . . .
+     * }</pre>
+     * <P>
+     * The typical use case for this is when an application has a collection
+     * of independent analytics to perform on each tuple and the analytics
+     * are sufficiently long running such that performing them concurrently
+     * is desired.
+     * </P><P>
+     * Note, this is in contrast to "parallel" stream processing,
+     * which in Java8 Streams and other contexts means processing multiple
+     * tuples in parallel, each on a replicated processing pipeline.
+     * </P><P>
+     * Threadsafety - one of the following must be true:
+     * </P>
+     * <ul>
+     * <li>the tuples from {@code stream} are threadsafe</li>
+     * <li>the {@code pipelines} do not modify the input tuples</li>
+     * <li>the {@code pipelines} provide their own synchronization controls
+     *     to protect concurrent modifications of the input tuples</li>
+     * </ul>
+     * <P>
+     * Logically, a thread is allocated for each of the {@code pipelines}.
+     * The actual degree of concurrency may be {@link TopologyProvider} dependent.
+     * </P>
+     * 
+     * @param <T> Tuple type on input stream.
+     * @param <U> Tuple type generated by pipelines.
+     * @param <R> Tuple type of the result.
+     * 
+     * @param stream input stream
+     * @param pipelines a list of functions to add a pipeline to the topology.
+     *                 Each {@code pipeline.apply()} is called with {@code stream}
+     *                 as the input, yielding the pipeline's result stream.
+     *                 For each input tuple, a pipeline MUST create exactly one output tuple.
+     *                 Tuple flow into the pipelines will cease if that requirement
+     *                 is not met.
+     * @param combiner function to create a result tuple from the list of
+     *                 results from {@code pipelines}.
+     *                 The input tuple list's order is 1:1 with the {@code pipelines} list.
+     *                 I.e., list entry [0] is the result from pipelines[0],
+     *                 list entry [1] is the result from pipelines[1], etc.
+     * @return result stream
+     * @see #barrier(List, int) barrier
+     */
+    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
+      Objects.requireNonNull(stream, "stream");
+      Objects.requireNonNull(pipelines, "pipelines");
+      Objects.requireNonNull(combiner, "combiner");
+      
+      int barrierQueueCapacity = 10;  // don't preclude pipelines from getting ahead some.
+      
+      // Add concurrent (isolated) fanouts
+      List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
+      for (int i = 0; i < pipelines.size(); i++)
+        fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
+      
+      // Add pipelines
+      List<TStream<U>> results = new ArrayList<>(pipelines.size());
+      int ch = 0;
+      for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
+        results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
+        ch++;
+      }
+      
+      // Add the barrier
+      TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
+      
+      // Add the combiner
+      return barrier.map(combiner);
+    }
+
+    /**
+     * A tuple synchronization barrier.
+     * <P>
+     * Same as {@code barrier(others, 1)}
+     * </P>
+     * @param <T> Tuple type
+     * @param streams input streams
+     * @return the output stream
+     * @see #barrier(List, int)
+     */
+    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
+      return barrier(streams, 1);
+    }
+
+    /**
+     * A tuple synchronization barrier.
+     * <P>
+     * A barrier has n input streams with tuple type {@code T}
+     * and one output stream with tuple type {@code List<T>}.
+     * Once the barrier receives one tuple on each of its input streams,
+     * it generates an output tuple containing one tuple from each input stream.
+     * It then waits until it has received another tuple from each input stream.
+     * </P><P>
+     * Input stream 0's tuple is in the output tuple's list[0],
+     * stream 1's tuple in list[1], and so on.
+     * </P><P>
+     * The barrier's output stream is isolated from the input streams.
+     * </P><P>
+     * The barrier has a queue of size {@code queueCapacity} for each
+     * input stream.  When a tuple for an input stream is received it is
+     * added to its queue.  The stream will block if the queue is full.
+     * </P>
+     *
+     * @param <T> Type of the tuple.
+     * 
+     * @param streams the list of input streams
+     * @param queueCapacity the size of each input stream's queue
+     * @return the output stream
+     * @see Barrier
+     */
+    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
+      List<TStream<T>> others = new ArrayList<>(streams);
+      TStream<T> s1 = others.remove(0);
+      return s1.fanin(new Barrier<T>(queueCapacity), others);
+    }
+
+    /**
+     * Perform an analytic function on tuples in parallel.
+     * <P>
+     * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
+     * </P>
+     * @param <T> Input stream tuple type
+     * @param <U> Result stream tuple type
+     * @param stream input stream
+     * @param splitter the tuple channel allocation function
+     * @param mapper analytic function
+     * @param width number of channels
+     * @return the unordered result stream
+     * @see #roundRobinSplitter(int) roundRobinSplitter
+     * @see #concurrentMap(TStream, List, Function) concurrentMap
+     */
+    public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
+      BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
+      return parallel(stream, width, splitter, pipeline);
+    }
+    
+    /**
+     * Perform an analytic pipeline on tuples in parallel.
+     * <P>
+     * Splits {@code stream} into {@code width} parallel processing channels,
+     * partitioning tuples among the channels using {@code splitter}.
+     * Each channel runs a copy of {@code pipeline}.
+     * The resulting stream is isolated from the upstream parallel channels.
+     * </P><P>
+     * The ordering of tuples in {@code stream} is not maintained in the
+     * results from {@code parallel}.
+     * </P><P>
+     * {@code pipeline} is not required to yield a result for each input
+     * tuple.
+     * </P><P>
+     * A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
+     * </P><P>
+     * The generated graph looks like this:
+     * </P>
+     * <pre>{@code
+     * -
+     *                                    |-> isolate(10) -> pipeline-ch1 -> |
+     * stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
+     *                                    |-> isolate(10) -> pipeline-ch3 -> |
+     *                                                . . .
+     * }</pre>
+     * 
+     * @param <T> Input stream tuple type
+     * @param <R> Result stream tuple type
+     * 
+     * @param stream the input stream
+     * @param width number of parallel processing channels
+     * @param splitter the tuple channel allocation function
+     * @param pipeline the pipeline for each channel.  
+     *        {@code pipeline.apply(inputStream,channel)}
+     *        is called to generate the pipeline for each channel.
+     * @return the isolated unordered result from each parallel channel
+     * @see #roundRobinSplitter(int) roundRobinSplitter
+     * @see #concurrent(TStream, List, Function) concurrent
+     */
+    public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
+      Objects.requireNonNull(stream, "stream");
+      if (width < 1)
+        throw new IllegalArgumentException("width");
+      Objects.requireNonNull(splitter, "splitter");
+      Objects.requireNonNull(pipeline, "pipeline");
+      
+      // Add the splitter
+      List<TStream<T>> channels = stream.split(width, splitter);
+      for (int ch = 0; ch < width; ch++)
+        channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
+      
+      // Add concurrency (isolation) to the channels
+      int chBufferSize = 10; // don't immediately block stream if channel is busy
+      for (int ch = 0; ch < width; ch++)
+        channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
+      
+      // Add pipelines
+      List<TStream<R>> results = new ArrayList<>(width);
+      for (int ch = 0; ch < width; ch++) {
+        results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
+      }
+      
+      // Add the Union
+      TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");
+      
+      // Add the isolate - keep channel threads to just their pipeline processing
+      return isolate(result, width);
+    }
+
+    /**
+     * Perform an analytic pipeline on tuples in parallel.
+     * <P>
+     * Splits {@code stream} into {@code width} parallel processing channels,
+     * partitioning tuples among the channels in a load balanced fashion.
+     * Each channel runs a copy of {@code pipeline}.
+     * The resulting stream is isolated from the upstream parallel channels.
+     * </P><P>
+     * The ordering of tuples in {@code stream} is not maintained in the
+     * results from {@code parallel}.
+     * </P><P>
+     * A {@code pipeline} <b>MUST</b> yield a result for each input
+     * tuple.  Failure to do so will result in the channel remaining
+     * in a busy state and no longer available to process additional tuples.
+     * </P><P>
+     * A {@link LoadBalancedSplitter} is used to distribute tuples.
+     * </P><P>
+     * The generated graph looks like this:
+     * </P>
+     * <pre>{@code
+     * -
+     *                                    |-> isolate(1) -> pipeline-ch1 -> peek(splitter.channelDone()) -> |
+     * stream -> split(width,splitter) -> |-> isolate(1) -> pipeline-ch2 -> peek(splitter.channelDone()) -> |-> union -> isolate(width)
+     *                                    |-> isolate(1) -> pipeline-ch3 -> peek(splitter.channelDone()) -> |
+     *                                                . . .
+     * }</pre>
+     * <P>
+     * Note, this implementation requires that the splitter is used from
+     * only a single JVM.  The {@link org.apache.edgent.providers.direct.DirectProvider DirectProvider}
+     * provider meets this requirement.
+     * </P>
+     * 
+     * @param <T> Input stream tuple type
+     * @param <R> Result stream tuple type
+     * 
+     * @param stream the input stream
+     * @param width number of parallel processing channels
+     * @param pipeline the pipeline for each channel.  
+     *        {@code pipeline.apply(inputStream,channel)}
+     *        is called to generate the pipeline for each channel.
+     * @return the isolated unordered result from each parallel channel
+     * @see #parallel(TStream, int, ToIntFunction, BiFunction)
+     * @see LoadBalancedSplitter
+     */
+    public static <T,R> TStream<R> parallelBalanced(TStream<T> stream, int width, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
+      Objects.requireNonNull(stream, "stream");
+      if (width < 1)
+        throw new IllegalArgumentException("width");
+      Objects.requireNonNull(pipeline, "pipeline");
+      
+      LoadBalancedSplitter<T> splitter = new LoadBalancedSplitter<>(width);
+      
+      // Add the splitter
+      List<TStream<T>> channels = stream.split(width, splitter);
+      for (int ch = 0; ch < width; ch++)
+        channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
+      
+      // Add concurrency (isolation) to the channels
+      int chBufferSize = 1; // 1 is enough with load balanced impl
+      for (int ch = 0; ch < width; ch++)
+        channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
+      
+      // Add pipelines
+      List<TStream<R>> results = new ArrayList<>(width);
+      for (int ch = 0; ch < width; ch++) {
+        final int finalCh = ch;
+        results.add(pipeline.apply(channels.get(ch), ch)
+            .tag("parallel-ch"+ch)
+            .peek(tuple -> splitter.channelDone(finalCh)));
+      }
+      
+      // Add the Union
+      TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");
+      
+      // Add the isolate - keep channel threads to just their pipeline processing
+      return isolate(result, width);
+    }
+    
+    /**
+     * A round-robin splitter ToIntFunction
+     * <P>
+     * The splitter function cycles among the {@code width} channels
+     * on successive calls to {@code roundRobinSplitter.applyAsInt()},
+     * returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
+     * </P>
+     * @param <T> Tuple type
+     * @param width number of splitter channels
+     * @return the splitter
+     * @see TStream#split(int, ToIntFunction) TStream.split
+     * @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
+     */
+    public static <T> ToIntFunction<T> roundRobinSplitter(int width) {
+      AtomicInteger cnt = new AtomicInteger();
+      return tuple -> cnt.getAndIncrement() % width;
+    }
+    /**
+     * Control the flow of tuples to an output stream.
+     * <P>
+     * A {@link Semaphore}
+     * is used to control the flow of tuples
+     * through the {@code gate}
+     * . The gate acquires a permit from the
+     * semaphore to pass the tuple through, blocking until a permit is
+     * acquired (and applying backpressure upstream while blocked).
+     * Elsewhere, some code calls {@link Semaphore#release(int)}
+     * to make permits available.
+     * </P><P>
+     * If a TopologyProvider is used that can distribute a topology's
+     * streams to different JVM's the gate and the code releasing the
+     * permits must be in the same JVM.
+     * </P><P>
+     * Sample use:
+     * <BR>
+     * Suppose you wanted to control processing such that concurrent
+     * pipelines processed each tuple in lock-step.
+     * I.e., You want all of the pipelines to start processing a tuple
+     * at the same time and not start a new tuple until the current
+     * tuple had been fully processed by each of them:
+     * </P>
+     * <pre>{@code
+     * TStream<Integer> readings = ...;
+     * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
+     * TStream<Integer> gated = gate(readings, gateControl);
+     * 
+     * // Create the concurrent pipeline combiner and have it
+     * // signal that concurrent processing of the tuple has completed.
+     * // In this sample the combiner just returns the received list of
+     * // each pipeline result.
+     * 
+     * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
+     *   stream -> stream.map(list -> { * gateControl.release(); * return list; * });
+     *   
+     * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
+     * }</pre>
+     * @param <T> Tuple type
+     * @param stream the input stream
+     * @param semaphore gate control
+     * @return gated stream
+     */
+    public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
+        return stream.map(tuple -> {
+            try {
+                semaphore.acquire();
+                return tuple;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("interrupted", e);
+            }
+        });
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java
new file mode 100644
index 0000000..a7120cb
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/Valve.java
@@ -0,0 +1,105 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.plumbing;
+
+import org.apache.edgent.function.Predicate;
+
+/**
+ * A generic "valve" {@link Predicate}.
+ * <p>
+ * A valve is either open or closed.
+ * When used as a Predicate to {@code TStream.filter()},
+ * filter passes tuples only when the valve is open.
+ * </p><p>
+ * A valve is typically used to dynamically control whether or not
+ * some downstream tuple processing is enabled.  A decision to change the
+ * state of the valve may be a result of local analytics or an external
+ * command.
+ * <br>
+ * E.g., in a simple case, a Valve might be used to control
+ * whether or not logging or publishing of tuples is enabled.
+ * </p>
+ * <pre>{@code
+ * TStream<JsonObject> stream = ...;
+ * 
+ * Valve<JsonObject> valve = new Valve<>(false);
+ * stream.filter(valve).sink(someTupleLoggingConsumer);
+ *                                 
+ * // from some analytic or device command handler...
+ *     valve.setOpen(true);
+ * }</pre>
+ *
+ * @param <T> tuple type
+ */
+public class Valve<T> implements Predicate<T> {
+    private static final long serialVersionUID = 1L;
+    private transient boolean isOpen;
+
+    /**
+     * Create a new Valve Predicate
+     * <p>
+     * Same as {@code Valve(true)}
+     */
+    public Valve() {
+        this(true);
+    }
+    
+    /**
+     * Create a new Valve Predicate
+     * <p>
+     * @param isOpen the initial state
+     */
+    public Valve(boolean isOpen) {
+        setOpen(isOpen);
+    }
+    
+    /**
+     * Set the valve state
+     * @param isOpen true to open the valve
+     */
+    public void setOpen(boolean isOpen) {
+        this.isOpen = isOpen;
+    }
+    
+    /**
+     * Get the valve state
+     * @return the state, true if the valve is open, false otherwise
+     */
+    public boolean isOpen() {
+        return isOpen;
+    }
+
+    /**
+     * Test the state of the valve, {@code value} is ignored.
+     * @return true when the valve is open, false otherwise
+     */
+    @Override
+    public boolean test(T value) {
+        return isOpen;
+    }
+
+    /**
+     * Returns a String for development/debug support.  Content subject to change.
+     */
+    @Override
+    public String toString() {
+        return "isOpen="+isOpen;
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java
new file mode 100644
index 0000000..29952e5
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/plumbing/package-info.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Plumbing for a streaming topology.
+ * Methods that manipulate the flow of tuples in a streaming topology,
+ * but are not part of the logic of the application.
+ */
+package org.apache.edgent.topology.plumbing;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java b/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java
new file mode 100644
index 0000000..6a710e5
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/services/ApplicationService.java
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.services;
+
+import java.util.Set;
+
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.mbeans.ApplicationServiceMXBean;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Application registration service.
+ * A service that allows registration of applications and
+ * the ability to submit them through a control MBean.
+ *
+ * @see ApplicationServiceMXBean
+ */
+public interface ApplicationService {
+    
+	/**
+	 * Default alias a service registers its control MBean as.
+	 * Value is {@value}.
+	 */
+    String ALIAS = "edgent";
+    
+    /**
+     * Prefix ({@value}) reserved for system application names.
+     */
+    String SYSTEM_APP_PREFIX = "edgent";
+    
+    /**
+     * Add a topology that can be started though a control mbean.
+     * Any registration replaces any existing application with the same name.
+     * <BR>
+     * When a {@link ApplicationServiceMXBean#submit(String, String) submit}
+     * is invoked {@code builder.accept(topology, config)} is called passing:
+     * <UL>
+     * <LI>
+     * {@code topology} - An empty topology with the name {@code applicationName}.
+     * </LI>
+     * <LI>
+     * {@code config} - JSON submission configuration from
+     * {@link ApplicationServiceMXBean#submit(String, String) submit}.
+     * </LI>
+     * </UL>
+     * Once {@code builder.accept(topology, config)} returns it is submitted
+     * to the {@link Submitter} associated with the implementation of this service.
+     * <P>
+     * Application names starting with {@link #SYSTEM_APP_PREFIX edgent} are reserved
+     * for system applications.
+     * </P>
+     * 
+     * @param applicationName Application name to register.
+     * @param builder How to build the topology for this application.
+     * 
+     * @see ApplicationServiceMXBean
+     */
+    void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
+    
+    /**
+     * Register a jar file containing new applications.
+     * Any service provider within the jar of type {@link TopologyBuilder}
+     * will be {@link #registerTopology(String, BiConsumer) registered} as
+     * a topology.
+     * 
+     * The jar cannot have any new dependencies, its classpath will
+     * be the classpath of this service.
+     * 
+     * @param jarURL URL of Jar containing new applications.
+     * @param jsonConfig Configuration information, currently unused.
+     * @throws Exception if failure
+     */
+    void registerJar(String jarURL, String jsonConfig) throws Exception;
+    
+    /**
+     * Returns the names of applications registered with this service.
+     * 
+     * @return the names of applications registered with this service.
+     */
+    Set<String> getApplicationNames();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java b/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java
new file mode 100644
index 0000000..be32b1c
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/services/TopologyBuilder.java
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology.services;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Represents an topology that can be built.
+ * 
+ * A class implementing {@code TopologyBuilder} can
+ * be registered as a service provider in a jar file for
+ * automatic application registration using
+ * {@link ApplicationService#registerJar(String, String)}.
+ *
+ */
+public interface TopologyBuilder {
+    /**
+     * Name the application will be known as.
+     * @return Name the application will be known as.
+     */
+    String getName();
+    
+    /**
+     * How the application is built.
+     * @return Function that builds the application.
+     */
+    BiConsumer<Topology, JsonObject> getBuilder();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java b/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java
new file mode 100644
index 0000000..7f45168
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/services/package-info.java
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Services for topologies.
+ *
+ */
+package org.apache.edgent.topology.services;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java b/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java
new file mode 100644
index 0000000..2a0c21b
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/tester/Condition.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.edgent.topology.tester;
+
+/**
+ * Function representing if a condition is valid or not.
+ * 
+ * @param <T> Condition's result type
+ */
+public interface Condition<T> {
+
+    boolean valid();
+
+    T getResult();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java b/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java
new file mode 100644
index 0000000..22fc52c
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/tester/Tester.java
@@ -0,0 +1,180 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.edgent.topology.tester;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyElement;
+
+import com.google.gson.JsonObject;
+
+/**
+ * A {@code Tester} adds the ability to test a topology in a test framework such
+ * as JUnit.
+ * 
+ * The main feature is the ability to capture tuples from a {@link TStream} in
+ * order to perform some form of verification on them. There are two mechanisms
+ * to perform verifications:
+ * <UL>
+ * <LI>did the stream produce the correct number of tuples.</LI>
+ * <LI>did the stream produce the correct tuples.</LI>
+ * </UL>
+ * Currently, only streams that are instances of
+ * {@code TStream<String>} can have conditions or handlers attached.
+ * <P>
+ * A {@code Tester} modifies its {@link Topology} to achieve the above purpose.
+ * </P>
+ */
+public interface Tester extends TopologyElement {
+
+    /**
+     * Return a condition that evaluates if {@code stream} has submitted exactly
+     * {@code expectedCount} number of tuples. The function may be evaluated
+     * after the {@link Submitter#submit(Object, JsonObject) submit}
+     * call has returned. <BR>
+     * The {@link Condition#getResult() result} of the returned
+     * {@code Condition} is the number of tuples seen on {@code stream} so far.
+     * <BR>
+     * If the topology is still executing then the returned values from
+     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+     * more tuples are seen on {@code stream}. <BR>
+     * 
+     * @param stream
+     *            Stream to be tested.
+     * @param expectedCount
+     *            Number of tuples expected on {@code stream}.
+     * @return True if the stream has submitted exactly {@code expectedCount}
+     *         number of tuples, false otherwise.
+     */
+    Condition<Long> tupleCount(TStream<?> stream, long expectedCount);
+
+    /**
+     * Return a condition that evaluates if {@code stream} has submitted at
+     * least {@code expectedCount} number of tuples. The function may be
+     * evaluated after the
+     * {@link Submitter#submit(Object, JsonObject) submit} call has returned. <BR>
+     * The {@link Condition#getResult() result} of the returned
+     * {@code Condition} is the number of tuples seen on {@code stream} so far.
+     * <BR>
+     * If the topology is still executing then the returned values from
+     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+     * more tuples are seen on {@code stream}. <BR>
+     * 
+     * @param stream
+     *            Stream to be tested.
+     * @param expectedCount
+     *            Number of tuples expected on {@code stream}.
+     * @return Condition that will return true the stream has submitted at least
+     *         {@code expectedCount} number of tuples, false otherwise.
+     */
+    Condition<Long> atLeastTupleCount(TStream<?> stream, long expectedCount);
+
+    /**
+     * Return a condition that evaluates if {@code stream} has submitted
+     * tuples matching {@code values} in the same order. <BR>
+     * The {@link Condition#getResult() result} of the returned
+     * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
+     * If the topology is still executing then the returned values from
+     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+     * more tuples are seen on {@code stream}. <BR>
+     * 
+     * @param <T> Tuple type
+     * @param stream
+     *            Stream to be tested.
+     * @param values
+     *            Expected tuples on {@code stream}.
+     * @return Condition that will return true if the stream has submitted at
+     *         least tuples matching {@code values} in the same order, false
+     *         otherwise.
+     */
+    <T> Condition<List<T>> streamContents(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
+
+    /**
+     * Return a condition that evaluates if {@code stream} has submitted
+     * tuples matching {@code values} in any order. <BR>
+     * The {@link Condition#getResult() result} of the returned
+     * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
+     * If the topology is still executing then the returned values from
+     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
+     * more tuples are seen on {@code stream}. <BR>
+     *
+     * @param <T> Tuple type
+     * @param stream
+     *            Stream to be tested.
+     * @param values
+     *            Expected tuples on {@code stream}.
+     * @return Condition that will return true if the stream has submitted at
+     *         least tuples matching {@code values} in the any order, false
+     *         otherwise.
+     */
+    <T> Condition<List<T>> contentsUnordered(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
+    
+    /**
+     * Return a condition that is valid only if all of {@code conditions} are valid.
+     * The result of the condition is {@link Condition#valid()}
+     * @param conditions Conditions to AND together.
+     * @return condition that is valid only if all of {@code conditions} are valid.
+     */
+    Condition<Boolean> and(final Condition<?>... conditions);
+
+    /**
+     * Submit the topology for this tester and wait for it to complete, or reach
+     * an end condition. If the topology does not complete or reach its end
+     * condition before {@code timeout} then it is terminated.
+     * <P>
+     * End condition is usually a {@link Condition} returned from
+     * {@link #atLeastTupleCount(TStream, long)} or
+     * {@link #tupleCount(TStream, long)} so that this method returns once the
+     * stream has submitted a sufficient number of tuples. <BR>
+     * Note that the condition will be only checked periodically up to
+     * {@code timeout}, so that if the condition is only valid for a brief
+     * period of time, then its valid state may not be seen, and thus this
+     * method will wait for the timeout period.
+     * </P>
+     * 
+     * @param submitter the {@link Submitter}
+     * @param config
+     *            submission configuration.
+     * @param endCondition
+     *            Condition that will cause this method to return if it is true.
+     * @param timeout
+     *            Maximum time to wait for the topology to complete or reach its
+     *            end condition.
+     * @param unit
+     *            Unit for {@code timeout}.
+     * @return The value of {@code endCondition.valid()}.
+     * 
+     * @throws Exception
+     *             Failure submitting or executing the topology.
+     */
+    boolean complete(Submitter<Topology, ? extends Job> submitter, JsonObject config, Condition<?> endCondition,
+            long timeout, TimeUnit unit) throws Exception;
+    
+    /**
+     * Get the {@code Job} reference for the topology submitted by {@code complete()}.
+     * @return {@code Job} reference for the topology submitted by {@code complete()}.
+     * Null if the {@code complete()} has not been called or the {@code Job} instance is not yet available.
+     */
+    Job getJob();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java b/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java
new file mode 100644
index 0000000..d160725
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/tester/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Testing for a streaming topology.
+ */
+package org.apache.edgent.topology.tester;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java b/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java
deleted file mode 100644
index a76b5b6..0000000
--- a/api/topology/src/test/java/edgent/test/topology/JsonFunctionsTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-import edgent.function.Function;
-import edgent.topology.json.JsonFunctions;
-
-public class JsonFunctionsTest {
-    
-    private JsonObject newTestObject() {
-        // Just a mix of things so we have reasonable confidence
-        // the JsonFunctions are working.
-        JsonObject jo = new JsonObject();
-        jo.addProperty("boolean", true);
-        jo.addProperty("character", 'c');
-        jo.addProperty("short", (short)7);
-        jo.addProperty("int", 23);
-        jo.addProperty("long", 99L);
-        jo.addProperty("float", 3.0f);
-        jo.addProperty("double", 7.128d);
-        jo.addProperty("string", "a string value");
-        JsonArray ja = new JsonArray();
-        ja.add(new JsonPrimitive(123));
-        ja.add(new JsonPrimitive(456));
-        jo.add("array", ja);
-        JsonObject jo2 = new JsonObject();
-        jo2.addProperty("int", 789);
-        jo.add("object", jo2);
-        return jo;
-    }
-    
-    @Test
-    public void testStrings() {
-        JsonObject jo1 = newTestObject();
-        Function<JsonObject,String> asString = JsonFunctions.asString();
-        Function<String,JsonObject> fromString = JsonFunctions.fromString();
-        
-        String s1 = asString.apply(jo1);
-        JsonObject jo2 = fromString.apply(s1);
-        
-        assertEquals(jo2, jo1);
-    }
-    
-    @Test
-    public void testBytes() {
-        JsonObject jo1 = newTestObject();
-        Function<JsonObject,byte[]> asBytes = JsonFunctions.asBytes();
-        Function<byte[],JsonObject> fromBytes = JsonFunctions.fromBytes();
-        
-        byte[] b1 = asBytes.apply(jo1);
-        JsonObject jo2 = fromBytes.apply(b1);
-        
-        assertEquals(jo2, jo1);
-    }
-}