You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:42 UTC

[41/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java
deleted file mode 100644
index f34820e..0000000
--- a/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java
+++ /dev/null
@@ -1,682 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.plumbing;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.function.ToIntFunction;
-import edgent.oplet.plumbing.Barrier;
-import edgent.oplet.plumbing.Isolate;
-import edgent.oplet.plumbing.PressureReliever;
-import edgent.oplet.plumbing.UnorderedIsolate;
-import edgent.topology.TStream;
-import edgent.topology.TopologyProvider;
-
-/**
- * Plumbing utilities for {@link TStream}.
- * Methods that manipulate the flow of tuples in a streaming topology,
- * but are not part of the logic of the application.
- */
-public class PlumbingStreams {
-  
-    /**
-     * Insert a blocking delay between tuples.
-     * Returned stream is the input stream delayed by {@code delay}.
-     * <p>
-     * Delays less than 1msec are translated to a 0 delay.
-     * <p>
-     * This function always adds the {@code delay} amount after receiving
-     * a tuple before forwarding it.  
-     * <p>
-     * Downstream tuple processing delays will affect
-     * the overall delay of a subsequent tuple.
-     * <p>
-     * e.g., the input stream contains two tuples t1 and t2 and
-     * the delay is 100ms.  The forwarding of t1 is delayed by 100ms.
-     * Then if a downstream processing delay of 80ms occurs, this function
-     * receives t2 80ms after it forwarded t1 and it will delay another
-     * 100ms before forwarding t2.  Hence the overall delay between forwarding
-     * t1 and t2 is 180ms.
-     * See {@link #blockingThrottle(long, TimeUnit) blockingThrottle}.
-     *
-     * @param <T> Tuple type
-     * @param stream Stream t
-     * @param delay Amount of time to delay a tuple.
-     * @param unit Time unit for {@code delay}.
-     * 
-     * @return Stream that will be delayed.
-     */
-    public static <T> TStream<T> blockingDelay(TStream<T> stream, long delay, TimeUnit unit) {
-        return stream.map(t -> {try {
-            Thread.sleep(unit.toMillis(delay));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        } return t;}) ;
-    }
-    
-    /**
-     * Maintain a constant blocking delay between tuples.
-     * The returned stream is the input stream throttled by {@code delay}.
-     * <p>
-     * Delays less than 1msec are translated to a 0 delay.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * TStream<String> stream = topology.strings("a", "b, "c");
-     * // Create a stream with tuples throttled to 1 second intervals.
-     * TStream<String> throttledStream = blockingThrottle(stream, 1, TimeUnit.SECOND);
-     * // print out the throttled tuples as they arrive
-     * throttledStream.peek(t -> System.out.println(new Date() + " - " + t));
-     * }</pre>
-     * <p>
-     * The function adjusts for downstream processing delays.
-     * The first tuple is not delayed.  If {@code delay} has already
-     * elapsed since the prior tuple was forwarded, the tuple 
-     * is forwarded immediately.
-     * Otherwise, forwarding the tuple is delayed to achieve
-     * a {@code delay} amount since forwarding the prior tuple.
-     * <p>
-     * e.g., the input stream contains two tuples t1 and t2 and
-     * the delay is 100ms.  The forwarding of t1 is delayed by 100ms.
-     * Then if a downstream processing delay of 80ms occurs, this function
-     * receives t2 80ms after it forwarded t1 and it will only delay another
-     * 20ms (100ms - 80ms) before forwarding t2.  
-     * Hence the overall delay between forwarding t1 and t2 remains 100ms.
-     * 
-     * @param <T> tuple type
-     * @param stream the stream to throttle
-     * @param delay Amount of time to delay a tuple.
-     * @param unit Time unit for {@code delay}.
-     * @return the throttled stream
-     */
-    public static <T> TStream<T> blockingThrottle(TStream<T> stream, long delay, TimeUnit unit) {
-        return stream.map( blockingThrottle(delay, unit) );
-    }
-
-    private static <T> Function<T,T> blockingThrottle(long delay, TimeUnit unit) {
-        long[] nextTupleTime = { 0 };
-        return t -> {
-            long now = System.currentTimeMillis();
-            if (nextTupleTime[0] != 0) {
-                if (now < nextTupleTime[0]) {
-                    try {
-                        Thread.sleep(nextTupleTime[0] - now);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
-                    now = System.currentTimeMillis();
-                }
-            }
-            nextTupleTime[0] = now + unit.toMillis(delay);
-            return t;
-        };
-    }
-    
-    /**
-     * Insert a blocking delay before forwarding the first tuple and
-     * no delay for subsequent tuples.
-     * <p>
-     * Delays less than 1msec are translated to a 0 delay.
-     * <p>
-     * Sample use:
-     * <pre>{@code
-     * TStream<String> stream = topology.strings("a", "b, "c");
-     * // create a stream where the first tuple is delayed by 5 seconds. 
-     * TStream<String> oneShotDelayedStream =
-     *      stream.map( blockingOneShotDelay(5, TimeUnit.SECONDS) );
-     * }</pre>
-     * 
-     * @param <T> tuple type
-     * @param stream input stream
-     * @param delay Amount of time to delay a tuple.
-     * @param unit Time unit for {@code delay}.
-     * @return the delayed stream
-     */
-    public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
-        return stream.map( blockingOneShotDelay(delay, unit) );
-    }
-
-    private static <T> Function<T,T> blockingOneShotDelay(long delay, TimeUnit unit) {
-        long[] initialDelay = { unit.toMillis(delay) };
-        return t -> {
-            if (initialDelay[0] != -1) {
-                try {
-                    Thread.sleep(initialDelay[0]);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException(e);
-                }
-                initialDelay[0] = -1;
-            }
-            return t;
-            };
-    }
-    
-    /**
-     * Relieve pressure on upstream processing by discarding tuples.
-     * This method ensures that upstream processing is not
-     * constrained by any delay in downstream processing,
-     * for example by a connector not being able to connect
-     * to its external system.
-     * <P>
-     * Any downstream processing of the returned stream is isolated
-     * from {@code stream} so that any slow down does not affect {@code stream}.
-     * When the downstream processing cannot keep up with rate of
-     * {@code stream} tuples will be dropped from returned stream.
-     * <BR>
-     * Up to {@code count} of the most recent tuples per key from {@code stream}
-     * are maintained when downstream processing is slow, any older tuples
-     * that have not been submitted to the returned stream will be discarded.
-     * <BR>
-     * Tuple order is maintained within a partition but is not guaranteed to
-     * be maintained across partitions.
-     * </P>
-     * 
-     * @param stream Stream to be isolated from downstream processing.
-     * @param keyFunction Function defining the key of each tuple.
-     * @param count Maximum number of tuples to maintain when downstream processing is backing up.
-     * @return Stream that is isolated from and thus relieves pressure on {@code stream}.
-     * 
-     * @param <T> Tuple type.
-     * @param <K> Key type.
-     * @see #isolate(TStream, int) isolate
-     */
-    public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
-        return stream.pipe(new PressureReliever<>(count, keyFunction));
-    }
-    
-    /**
-     * Isolate upstream processing from downstream processing.
-     * <BR>
-     * Implementations may throw {@code OutOfMemoryExceptions} 
-     * if the processing against returned stream cannot keep up
-     * with the arrival rate of tuples on {@code stream}.
-     *
-     * @param <T> Tuple type
-     * @param stream Stream to be isolated from downstream processing.
-     * @param ordered {@code true} to maintain arrival order on the returned stream,
-     * {@code false} to not guaranteed arrival order.
-     * @return Stream that is isolated from {@code stream}.
-     */
-    public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
-        return stream.pipe(
-                ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
-    }
-    
-    /**
-     * Isolate upstream processing from downstream processing.
-     * <P>
-     * If the processing against the returned stream cannot keep up
-     * with the arrival rate of tuples on {@code stream}, upstream
-     * processing will block until there is space in the queue between
-     * the streams.
-     * </P><P>
-     * Processing of tuples occurs in the order they were received.
-     * </P>
-     * 
-     * @param <T> Tuple type
-     * @param stream Stream to be isolated from downstream processing.
-     * @param queueCapacity size of the queue between {@code stream} and
-     *        the returned stream.
-     * @return Stream that is isolated from {@code stream}.
-     * @see #pressureReliever(TStream, Function, int) pressureReliever
-     */
-    public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
-      return stream.pipe(new Isolate<T>(queueCapacity));
-    }
-    
-    /**
-     * Perform analytics concurrently.
-     * <P>
-     * This is a convenience function that calls
-     * {@link #concurrent(TStream, List, Function)} after
-     * creating {@code pipeline} and {@code combiner} functions
-     * from the supplied {@code mappers} and {@code combiner} arguments.
-     * </P><P>
-     * That is, it is logically, if not exactly, the same as:
-     * </P>
-     * <pre>{@code
-     * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
-     * for (Function<T,U> mapper : mappers)
-     *   pipelines.add(s -> s.map(mapper));
-     * concurrent(stream, pipelines, combiner);
-     * }</pre>
-     * 
-     * @param <T> Tuple type on input stream.
-     * @param <U> Tuple type generated by mappers.
-     * @param <R> Tuple type of the result.
-     * 
-     * @param stream input stream
-     * @param mappers functions to be run concurrently.  Each mapper MUST
-     *                 return a non-null result.
-     *                 A runtime error will be generated if a null result
-     *                 is returned.
-     * @param combiner function to create a result tuple from the list of
-     *                 results from {@code mappers}.
-     *                 The input list order is 1:1 with the {@code mappers} list.
-     *                 I.e., list entry [0] is the result from mappers[0],
-     *                 list entry [1] is the result from mappers[1], etc.
-     * @return result stream
-     */
-    public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
-      Objects.requireNonNull(stream, "stream");
-      Objects.requireNonNull(mappers, "mappers");
-      Objects.requireNonNull(combiner, "combiner");
-      
-      List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
-      for (Function<T,U> mapper : mappers) {
-        pipelines.add(s -> s.map(mapper));
-      }
-      
-      return concurrent(stream, pipelines, combiner);
-    }
-
-    /**
-     * Perform analytics concurrently.
-     * <P>
-     * Process input tuples one at at time, invoking the specified
-     * analytics ({@code pipelines}) concurrently, combine the results,
-     * and then process the next input tuple in the same manner.
-     * </P><P>
-     * Logically, instead of doing this:
-     * </P>
-     * <pre>{@code
-     * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
-     * }</pre>
-     * create a graph that's logically like this:
-     * <pre>{@code
-     * - 
-     *                      |-> A1 ->|
-     * sensorReadings<T> -> |-> A2 ->| -> results<R>
-     *                      |-> A3 ->|
-     * 
-     * }</pre>
-     * more specifically a graph like this:
-     * <pre>{@code
-     * -
-     *           |-> isolate(1) -> pipeline1 -> |
-     * stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner 
-     *           |-> isolate(1) -> pipeline3 -> |
-     *                . . .
-     * }</pre>
-     * <P>
-     * The typical use case for this is when an application has a collection
-     * of independent analytics to perform on each tuple and the analytics
-     * are sufficiently long running such that performing them concurrently
-     * is desired.
-     * </P><P>
-     * Note, this is in contrast to "parallel" stream processing,
-     * which in Java8 Streams and other contexts means processing multiple
-     * tuples in parallel, each on a replicated processing pipeline.
-     * </P><P>
-     * Threadsafety - one of the following must be true:
-     * </P>
-     * <ul>
-     * <li>the tuples from {@code stream} are threadsafe</li>
-     * <li>the {@code pipelines} do not modify the input tuples</li>
-     * <li>the {@code pipelines} provide their own synchronization controls
-     *     to protect concurrent modifications of the input tuples</li>
-     * </ul>
-     * <P>
-     * Logically, a thread is allocated for each of the {@code pipelines}.
-     * The actual degree of concurrency may be {@link TopologyProvider} dependent.
-     * </P>
-     * 
-     * @param <T> Tuple type on input stream.
-     * @param <U> Tuple type generated by pipelines.
-     * @param <R> Tuple type of the result.
-     * 
-     * @param stream input stream
-     * @param pipelines a list of functions to add a pipeline to the topology.
-     *                 Each {@code pipeline.apply()} is called with {@code stream}
-     *                 as the input, yielding the pipeline's result stream.
-     *                 For each input tuple, a pipeline MUST create exactly one output tuple.
-     *                 Tuple flow into the pipelines will cease if that requirement
-     *                 is not met.
-     * @param combiner function to create a result tuple from the list of
-     *                 results from {@code pipelines}.
-     *                 The input tuple list's order is 1:1 with the {@code pipelines} list.
-     *                 I.e., list entry [0] is the result from pipelines[0],
-     *                 list entry [1] is the result from pipelines[1], etc.
-     * @return result stream
-     * @see #barrier(List, int) barrier
-     */
-    public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
-      Objects.requireNonNull(stream, "stream");
-      Objects.requireNonNull(pipelines, "pipelines");
-      Objects.requireNonNull(combiner, "combiner");
-      
-      int barrierQueueCapacity = 10;  // don't preclude pipelines from getting ahead some.
-      
-      // Add concurrent (isolated) fanouts
-      List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
-      for (int i = 0; i < pipelines.size(); i++)
-        fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
-      
-      // Add pipelines
-      List<TStream<U>> results = new ArrayList<>(pipelines.size());
-      int ch = 0;
-      for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
-        results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
-        ch++;
-      }
-      
-      // Add the barrier
-      TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
-      
-      // Add the combiner
-      return barrier.map(combiner);
-    }
-
-    /**
-     * A tuple synchronization barrier.
-     * <P>
-     * Same as {@code barrier(others, 1)}
-     * </P>
-     * @param <T> Tuple type
-     * @param streams input streams
-     * @return the output stream
-     * @see #barrier(List, int)
-     */
-    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
-      return barrier(streams, 1);
-    }
-
-    /**
-     * A tuple synchronization barrier.
-     * <P>
-     * A barrier has n input streams with tuple type {@code T}
-     * and one output stream with tuple type {@code List<T>}.
-     * Once the barrier receives one tuple on each of its input streams,
-     * it generates an output tuple containing one tuple from each input stream.
-     * It then waits until it has received another tuple from each input stream.
-     * </P><P>
-     * Input stream 0's tuple is in the output tuple's list[0],
-     * stream 1's tuple in list[1], and so on.
-     * </P><P>
-     * The barrier's output stream is isolated from the input streams.
-     * </P><P>
-     * The barrier has a queue of size {@code queueCapacity} for each
-     * input stream.  When a tuple for an input stream is received it is
-     * added to its queue.  The stream will block if the queue is full.
-     * </P>
-     *
-     * @param <T> Type of the tuple.
-     * 
-     * @param streams the list of input streams
-     * @param queueCapacity the size of each input stream's queue
-     * @return the output stream
-     * @see Barrier
-     */
-    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
-      List<TStream<T>> others = new ArrayList<>(streams);
-      TStream<T> s1 = others.remove(0);
-      return s1.fanin(new Barrier<T>(queueCapacity), others);
-    }
-
-    /**
-     * Perform an analytic function on tuples in parallel.
-     * <P>
-     * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
-     * </P>
-     * @param <T> Input stream tuple type
-     * @param <U> Result stream tuple type
-     * @param stream input stream
-     * @param splitter the tuple channel allocation function
-     * @param mapper analytic function
-     * @param width number of channels
-     * @return the unordered result stream
-     * @see #roundRobinSplitter(int) roundRobinSplitter
-     * @see #concurrentMap(TStream, List, Function) concurrentMap
-     */
-    public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
-      BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
-      return parallel(stream, width, splitter, pipeline);
-    }
-    
-    /**
-     * Perform an analytic pipeline on tuples in parallel.
-     * <P>
-     * Splits {@code stream} into {@code width} parallel processing channels,
-     * partitioning tuples among the channels using {@code splitter}.
-     * Each channel runs a copy of {@code pipeline}.
-     * The resulting stream is isolated from the upstream parallel channels.
-     * </P><P>
-     * The ordering of tuples in {@code stream} is not maintained in the
-     * results from {@code parallel}.
-     * </P><P>
-     * {@code pipeline} is not required to yield a result for each input
-     * tuple.
-     * </P><P>
-     * A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
-     * </P><P>
-     * The generated graph looks like this:
-     * </P>
-     * <pre>{@code
-     * -
-     *                                    |-> isolate(10) -> pipeline-ch1 -> |
-     * stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
-     *                                    |-> isolate(10) -> pipeline-ch3 -> |
-     *                                                . . .
-     * }</pre>
-     * 
-     * @param <T> Input stream tuple type
-     * @param <R> Result stream tuple type
-     * 
-     * @param stream the input stream
-     * @param width number of parallel processing channels
-     * @param splitter the tuple channel allocation function
-     * @param pipeline the pipeline for each channel.  
-     *        {@code pipeline.apply(inputStream,channel)}
-     *        is called to generate the pipeline for each channel.
-     * @return the isolated unordered result from each parallel channel
-     * @see #roundRobinSplitter(int) roundRobinSplitter
-     * @see #concurrent(TStream, List, Function) concurrent
-     */
-    public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
-      Objects.requireNonNull(stream, "stream");
-      if (width < 1)
-        throw new IllegalArgumentException("width");
-      Objects.requireNonNull(splitter, "splitter");
-      Objects.requireNonNull(pipeline, "pipeline");
-      
-      // Add the splitter
-      List<TStream<T>> channels = stream.split(width, splitter);
-      for (int ch = 0; ch < width; ch++)
-        channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
-      
-      // Add concurrency (isolation) to the channels
-      int chBufferSize = 10; // don't immediately block stream if channel is busy
-      for (int ch = 0; ch < width; ch++)
-        channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
-      
-      // Add pipelines
-      List<TStream<R>> results = new ArrayList<>(width);
-      for (int ch = 0; ch < width; ch++) {
-        results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
-      }
-      
-      // Add the Union
-      TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");
-      
-      // Add the isolate - keep channel threads to just their pipeline processing
-      return isolate(result, width);
-    }
-
-    /**
-     * Perform an analytic pipeline on tuples in parallel.
-     * <P>
-     * Splits {@code stream} into {@code width} parallel processing channels,
-     * partitioning tuples among the channels in a load balanced fashion.
-     * Each channel runs a copy of {@code pipeline}.
-     * The resulting stream is isolated from the upstream parallel channels.
-     * </P><P>
-     * The ordering of tuples in {@code stream} is not maintained in the
-     * results from {@code parallel}.
-     * </P><P>
-     * A {@code pipeline} <b>MUST</b> yield a result for each input
-     * tuple.  Failure to do so will result in the channel remaining
-     * in a busy state and no longer available to process additional tuples.
-     * </P><P>
-     * A {@link LoadBalancedSplitter} is used to distribute tuples.
-     * </P><P>
-     * The generated graph looks like this:
-     * </P>
-     * <pre>{@code
-     * -
-     *                                    |-> isolate(1) -> pipeline-ch1 -> peek(splitter.channelDone()) -> |
-     * stream -> split(width,splitter) -> |-> isolate(1) -> pipeline-ch2 -> peek(splitter.channelDone()) -> |-> union -> isolate(width)
-     *                                    |-> isolate(1) -> pipeline-ch3 -> peek(splitter.channelDone()) -> |
-     *                                                . . .
-     * }</pre>
-     * <P>
-     * Note, this implementation requires that the splitter is used from
-     * only a single JVM.  The {@link edgent.providers.direct.DirectProvider DirectProvider}
-     * provider meets this requirement.
-     * </P>
-     * 
-     * @param <T> Input stream tuple type
-     * @param <R> Result stream tuple type
-     * 
-     * @param stream the input stream
-     * @param width number of parallel processing channels
-     * @param pipeline the pipeline for each channel.  
-     *        {@code pipeline.apply(inputStream,channel)}
-     *        is called to generate the pipeline for each channel.
-     * @return the isolated unordered result from each parallel channel
-     * @see #parallel(TStream, int, ToIntFunction, BiFunction)
-     * @see LoadBalancedSplitter
-     */
-    public static <T,R> TStream<R> parallelBalanced(TStream<T> stream, int width, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
-      Objects.requireNonNull(stream, "stream");
-      if (width < 1)
-        throw new IllegalArgumentException("width");
-      Objects.requireNonNull(pipeline, "pipeline");
-      
-      LoadBalancedSplitter<T> splitter = new LoadBalancedSplitter<>(width);
-      
-      // Add the splitter
-      List<TStream<T>> channels = stream.split(width, splitter);
-      for (int ch = 0; ch < width; ch++)
-        channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
-      
-      // Add concurrency (isolation) to the channels
-      int chBufferSize = 1; // 1 is enough with load balanced impl
-      for (int ch = 0; ch < width; ch++)
-        channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
-      
-      // Add pipelines
-      List<TStream<R>> results = new ArrayList<>(width);
-      for (int ch = 0; ch < width; ch++) {
-        final int finalCh = ch;
-        results.add(pipeline.apply(channels.get(ch), ch)
-            .tag("parallel-ch"+ch)
-            .peek(tuple -> splitter.channelDone(finalCh)));
-      }
-      
-      // Add the Union
-      TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");
-      
-      // Add the isolate - keep channel threads to just their pipeline processing
-      return isolate(result, width);
-    }
-    
-    /**
-     * A round-robin splitter ToIntFunction
-     * <P>
-     * The splitter function cycles among the {@code width} channels
-     * on successive calls to {@code roundRobinSplitter.applyAsInt()},
-     * returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
-     * </P>
-     * @param <T> Tuple type
-     * @param width number of splitter channels
-     * @return the splitter
-     * @see TStream#split(int, ToIntFunction) TStream.split
-     * @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
-     */
-    public static <T> ToIntFunction<T> roundRobinSplitter(int width) {
-      AtomicInteger cnt = new AtomicInteger();
-      return tuple -> cnt.getAndIncrement() % width;
-    }
-    /**
-     * Control the flow of tuples to an output stream.
-     * <P>
-     * A {@link Semaphore}
-     * is used to control the flow of tuples
-     * through the {@code gate}
-     * . The gate acquires a permit from the
-     * semaphore to pass the tuple through, blocking until a permit is
-     * acquired (and applying backpressure upstream while blocked).
-     * Elsewhere, some code calls {@link Semaphore#release(int)}
-     * to make permits available.
-     * </P><P>
-     * If a TopologyProvider is used that can distribute a topology's
-     * streams to different JVM's the gate and the code releasing the
-     * permits must be in the same JVM.
-     * </P><P>
-     * Sample use:
-     * <BR>
-     * Suppose you wanted to control processing such that concurrent
-     * pipelines processed each tuple in lock-step.
-     * I.e., You want all of the pipelines to start processing a tuple
-     * at the same time and not start a new tuple until the current
-     * tuple had been fully processed by each of them:
-     * </P>
-     * <pre>{@code
-     * TStream<Integer> readings = ...;
-     * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
-     * TStream<Integer> gated = gate(readings, gateControl);
-     * 
-     * // Create the concurrent pipeline combiner and have it
-     * // signal that concurrent processing of the tuple has completed.
-     * // In this sample the combiner just returns the received list of
-     * // each pipeline result.
-     * 
-     * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
-     *   stream -> stream.map(list -> { * gateControl.release(); * return list; * });
-     *   
-     * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
-     * }</pre>
-     * @param <T> Tuple type
-     * @param stream the input stream
-     * @param semaphore gate control
-     * @return gated stream
-     */
-    public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
-        return stream.map(tuple -> {
-            try {
-                semaphore.acquire();
-                return tuple;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException("interrupted", e);
-            }
-        });
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/plumbing/Valve.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/plumbing/Valve.java b/api/topology/src/main/java/edgent/topology/plumbing/Valve.java
deleted file mode 100644
index a56f234..0000000
--- a/api/topology/src/main/java/edgent/topology/plumbing/Valve.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.plumbing;
-
-import edgent.function.Predicate;
-
-/**
- * A generic "valve" {@link Predicate}.
- * <p>
- * A valve is either open or closed.
- * When used as a Predicate to {@code TStream.filter()},
- * filter passes tuples only when the valve is open.
- * </p><p>
- * A valve is typically used to dynamically control whether or not
- * some downstream tuple processing is enabled.  A decision to change the
- * state of the valve may be a result of local analytics or an external
- * command.
- * <br>
- * E.g., in a simple case, a Valve might be used to control
- * whether or not logging or publishing of tuples is enabled.
- * </p>
- * <pre>{@code
- * TStream<JsonObject> stream = ...;
- * 
- * Valve<JsonObject> valve = new Valve<>(false);
- * stream.filter(valve).sink(someTupleLoggingConsumer);
- *                                 
- * // from some analytic or device command handler...
- *     valve.setOpen(true);
- * }</pre>
- *
- * @param <T> tuple type
- */
-public class Valve<T> implements Predicate<T> {
-    private static final long serialVersionUID = 1L;
-    private transient boolean isOpen;
-
-    /**
-     * Create a new Valve Predicate
-     * <p>
-     * Same as {@code Valve(true)}
-     */
-    public Valve() {
-        this(true);
-    }
-    
-    /**
-     * Create a new Valve Predicate
-     * <p>
-     * @param isOpen the initial state
-     */
-    public Valve(boolean isOpen) {
-        setOpen(isOpen);
-    }
-    
-    /**
-     * Set the valve state
-     * @param isOpen true to open the valve
-     */
-    public void setOpen(boolean isOpen) {
-        this.isOpen = isOpen;
-    }
-    
-    /**
-     * Get the valve state
-     * @return the state, true if the valve is open, false otherwise
-     */
-    public boolean isOpen() {
-        return isOpen;
-    }
-
-    /**
-     * Test the state of the valve, {@code value} is ignored.
-     * @return true when the valve is open, false otherwise
-     */
-    @Override
-    public boolean test(T value) {
-        return isOpen;
-    }
-
-    /**
-     * Returns a String for development/debug support.  Content subject to change.
-     */
-    @Override
-    public String toString() {
-        return "isOpen="+isOpen;
-    }
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/plumbing/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/plumbing/package-info.java b/api/topology/src/main/java/edgent/topology/plumbing/package-info.java
deleted file mode 100644
index 8a1e287..0000000
--- a/api/topology/src/main/java/edgent/topology/plumbing/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Plumbing for a streaming topology.
- * Methods that manipulate the flow of tuples in a streaming topology,
- * but are not part of the logic of the application.
- */
-package edgent.topology.plumbing;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/services/ApplicationService.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/services/ApplicationService.java b/api/topology/src/main/java/edgent/topology/services/ApplicationService.java
deleted file mode 100644
index 12262d7..0000000
--- a/api/topology/src/main/java/edgent/topology/services/ApplicationService.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.services;
-
-import java.util.Set;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.Submitter;
-import edgent.function.BiConsumer;
-import edgent.topology.Topology;
-import edgent.topology.mbeans.ApplicationServiceMXBean;
-
-/**
- * Application registration service.
- * A service that allows registration of applications and
- * the ability to submit them through a control MBean.
- *
- * @see ApplicationServiceMXBean
- */
-public interface ApplicationService {
-    
-	/**
-	 * Default alias a service registers its control MBean as.
-	 * Value is {@value}.
-	 */
-    String ALIAS = "edgent";
-    
-    /**
-     * Prefix ({@value}) reserved for system application names.
-     */
-    String SYSTEM_APP_PREFIX = "edgent";
-    
-    /**
-     * Add a topology that can be started though a control mbean.
-     * Any registration replaces any existing application with the same name.
-     * <BR>
-     * When a {@link ApplicationServiceMXBean#submit(String, String) submit}
-     * is invoked {@code builder.accept(topology, config)} is called passing:
-     * <UL>
-     * <LI>
-     * {@code topology} - An empty topology with the name {@code applicationName}.
-     * </LI>
-     * <LI>
-     * {@code config} - JSON submission configuration from
-     * {@link ApplicationServiceMXBean#submit(String, String) submit}.
-     * </LI>
-     * </UL>
-     * Once {@code builder.accept(topology, config)} returns it is submitted
-     * to the {@link Submitter} associated with the implementation of this service.
-     * <P>
-     * Application names starting with {@link #SYSTEM_APP_PREFIX edgent} are reserved
-     * for system applications.
-     * </P>
-     * 
-     * @param applicationName Application name to register.
-     * @param builder How to build the topology for this application.
-     * 
-     * @see ApplicationServiceMXBean
-     */
-    void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
-    
-    /**
-     * Register a jar file containing new applications.
-     * Any service provider within the jar of type {@link TopologyBuilder}
-     * will be {@link #registerTopology(String, BiConsumer) registered} as
-     * a topology.
-     * 
-     * The jar cannot have any new dependencies, its classpath will
-     * be the classpath of this service.
-     * 
-     * @param jarURL URL of Jar containing new applications.
-     * @param jsonConfig Configuration information, currently unused.
-     */
-    void registerJar(String jarURL, String jsonConfig) throws Exception;
-    
-    /**
-     * Returns the names of applications registered with this service.
-     * 
-     * @return the names of applications registered with this service.
-     */
-    Set<String> getApplicationNames();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java b/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java
deleted file mode 100644
index 7c756db..0000000
--- a/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.services;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.BiConsumer;
-import edgent.topology.Topology;
-
-/**
- * Represents an topology that can be built.
- * 
- * A class implementing {@code TopologyBuilder} can
- * be registered as a service provider in a jar file for
- * automatic application registration using
- * {@link ApplicationService#registerJar(String, String)}.
- *
- */
-public interface TopologyBuilder {
-    /**
-     * Name the application will be known as.
-     * @return Name the application will be known as.
-     */
-    String getName();
-    
-    /**
-     * How the application is built.
-     * @return Function that builds the application.
-     */
-    BiConsumer<Topology, JsonObject> getBuilder();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/services/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/services/package-info.java b/api/topology/src/main/java/edgent/topology/services/package-info.java
deleted file mode 100644
index b647a1a..0000000
--- a/api/topology/src/main/java/edgent/topology/services/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Services for topologies.
- *
- */
-package edgent.topology.services;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/tester/Condition.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/tester/Condition.java b/api/topology/src/main/java/edgent/topology/tester/Condition.java
deleted file mode 100644
index eb5c2b3..0000000
--- a/api/topology/src/main/java/edgent/topology/tester/Condition.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
- */
-package edgent.topology.tester;
-
-/**
- * Function representing if a condition is valid or not.
- * 
- * @param <T> Condition's result type
- */
-public interface Condition<T> {
-
-    boolean valid();
-
-    T getResult();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/tester/Tester.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/tester/Tester.java b/api/topology/src/main/java/edgent/topology/tester/Tester.java
deleted file mode 100644
index 4bf0fd6..0000000
--- a/api/topology/src/main/java/edgent/topology/tester/Tester.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
- */
-package edgent.topology.tester;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyElement;
-
-/**
- * A {@code Tester} adds the ability to test a topology in a test framework such
- * as JUnit.
- * 
- * The main feature is the ability to capture tuples from a {@link TStream} in
- * order to perform some form of verification on them. There are two mechanisms
- * to perform verifications:
- * <UL>
- * <LI>did the stream produce the correct number of tuples.</LI>
- * <LI>did the stream produce the correct tuples.</LI>
- * </UL>
- * Currently, only streams that are instances of
- * {@code TStream<String>} can have conditions or handlers attached.
- * <P>
- * A {@code Tester} modifies its {@link Topology} to achieve the above purpose.
- * </P>
- */
-public interface Tester extends TopologyElement {
-
-    /**
-     * Return a condition that evaluates if {@code stream} has submitted exactly
-     * {@code expectedCount} number of tuples. The function may be evaluated
-     * after the {@link Submitter#submit(Object, JsonObject) submit}
-     * call has returned. <BR>
-     * The {@link Condition#getResult() result} of the returned
-     * {@code Condition} is the number of tuples seen on {@code stream} so far.
-     * <BR>
-     * If the topology is still executing then the returned values from
-     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
-     * more tuples are seen on {@code stream}. <BR>
-     * 
-     * @param stream
-     *            Stream to be tested.
-     * @param expectedCount
-     *            Number of tuples expected on {@code stream}.
-     * @return True if the stream has submitted exactly {@code expectedCount}
-     *         number of tuples, false otherwise.
-     */
-    Condition<Long> tupleCount(TStream<?> stream, long expectedCount);
-
-    /**
-     * Return a condition that evaluates if {@code stream} has submitted at
-     * least {@code expectedCount} number of tuples. The function may be
-     * evaluated after the
-     * {@link Submitter#submit(Object, JsonObject) submit} call has returned. <BR>
-     * The {@link Condition#getResult() result} of the returned
-     * {@code Condition} is the number of tuples seen on {@code stream} so far.
-     * <BR>
-     * If the topology is still executing then the returned values from
-     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
-     * more tuples are seen on {@code stream}. <BR>
-     * 
-     * @param stream
-     *            Stream to be tested.
-     * @param expectedCount
-     *            Number of tuples expected on {@code stream}.
-     * @return Condition that will return true the stream has submitted at least
-     *         {@code expectedCount} number of tuples, false otherwise.
-     */
-    Condition<Long> atLeastTupleCount(TStream<?> stream, long expectedCount);
-
-    /**
-     * Return a condition that evaluates if {@code stream} has submitted
-     * tuples matching {@code values} in the same order. <BR>
-     * The {@link Condition#getResult() result} of the returned
-     * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
-     * If the topology is still executing then the returned values from
-     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
-     * more tuples are seen on {@code stream}. <BR>
-     * 
-     * @param <T> Tuple type
-     * @param stream
-     *            Stream to be tested.
-     * @param values
-     *            Expected tuples on {@code stream}.
-     * @return Condition that will return true if the stream has submitted at
-     *         least tuples matching {@code values} in the same order, false
-     *         otherwise.
-     */
-    <T> Condition<List<T>> streamContents(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
-
-    /**
-     * Return a condition that evaluates if {@code stream} has submitted
-     * tuples matching {@code values} in any order. <BR>
-     * The {@link Condition#getResult() result} of the returned
-     * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
-     * If the topology is still executing then the returned values from
-     * {@link Condition#valid()} and {@link Condition#getResult()} may change as
-     * more tuples are seen on {@code stream}. <BR>
-     *
-     * @param <T> Tuple type
-     * @param stream
-     *            Stream to be tested.
-     * @param values
-     *            Expected tuples on {@code stream}.
-     * @return Condition that will return true if the stream has submitted at
-     *         least tuples matching {@code values} in the any order, false
-     *         otherwise.
-     */
-    <T> Condition<List<T>> contentsUnordered(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
-    
-    /**
-     * Return a condition that is valid only if all of {@code conditions} are valid.
-     * The result of the condition is {@link Condition#valid()}
-     * @param conditions Conditions to AND together.
-     * @return condition that is valid only if all of {@code conditions} are valid.
-     */
-    Condition<Boolean> and(final Condition<?>... conditions);
-
-    /**
-     * Submit the topology for this tester and wait for it to complete, or reach
-     * an end condition. If the topology does not complete or reach its end
-     * condition before {@code timeout} then it is terminated.
-     * <P>
-     * End condition is usually a {@link Condition} returned from
-     * {@link #atLeastTupleCount(TStream, long)} or
-     * {@link #tupleCount(TStream, long)} so that this method returns once the
-     * stream has submitted a sufficient number of tuples. <BR>
-     * Note that the condition will be only checked periodically up to
-     * {@code timeout}, so that if the condition is only valid for a brief
-     * period of time, then its valid state may not be seen, and thus this
-     * method will wait for the timeout period.
-     * </P>
-     * 
-     * @param submitter the {@link Submitter}
-     * @param config
-     *            submission configuration.
-     * @param endCondition
-     *            Condition that will cause this method to return if it is true.
-     * @param timeout
-     *            Maximum time to wait for the topology to complete or reach its
-     *            end condition.
-     * @param unit
-     *            Unit for {@code timeout}.
-     * @return The value of {@code endCondition.valid()}.
-     * 
-     * @throws Exception
-     *             Failure submitting or executing the topology.
-     */
-    boolean complete(Submitter<Topology, ? extends Job> submitter, JsonObject config, Condition<?> endCondition,
-            long timeout, TimeUnit unit) throws Exception;
-    
-    /**
-     * Get the {@code Job} reference for the topology submitted by {@code complete()}.
-     * @return {@code Job} reference for the topology submitted by {@code complete()}.
-     * Null if the {@code complete()} has not been called or the {@code Job} instance is not yet available.
-     */
-    Job getJob();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/tester/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/tester/package-info.java b/api/topology/src/main/java/edgent/topology/tester/package-info.java
deleted file mode 100644
index b5e8aed..0000000
--- a/api/topology/src/main/java/edgent/topology/tester/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Testing for a streaming topology.
- */
-package edgent.topology.tester;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/TSink.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/TSink.java b/api/topology/src/main/java/org/apache/edgent/topology/TSink.java
new file mode 100644
index 0000000..6bf7563
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/TSink.java
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology;
+
+/**
+ * Termination point (sink) for a stream.
+ *
+ * @param <T> Tuple type
+ */
+public interface TSink<T> extends TopologyElement {
+    /**
+     * Get the stream feeding this sink.
+     * The returned reference may be used for
+     * further processing of the feeder stream.
+     * <BR>
+     * For example, {@code s.print().filter(...)}
+     * <BR>
+     * Here the filter is applied
+     * to {@code s} so that {@code s} feeds
+     * the {@code print()} and {@code filter()}.
+     * 
+     * @return stream feeding this sink.
+     */
+    public TStream<T> getFeed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/TStream.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/TStream.java b/api/topology/src/main/java/org/apache/edgent/topology/TStream.java
new file mode 100644
index 0000000..e89746f
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/TStream.java
@@ -0,0 +1,543 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology;
+
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Predicate;
+import org.apache.edgent.function.ToIntFunction;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.oplet.core.FanIn;
+import org.apache.edgent.oplet.core.Pipe;
+import org.apache.edgent.oplet.core.Sink;
+
+/**
+ * A {@code TStream} is a declaration of a continuous sequence of tuples. A
+ * connected topology of streams and functional transformations is built using
+ * {@link Topology}. <BR>
+ * Generic methods on this interface provide the ability to
+ * {@link #filter(Predicate) filter}, {@link #map(Function)
+ * map (or transform)} or {@link #sink(Consumer) sink} this declared stream using a
+ * function.
+ * <P>
+ * {@code TStream} is not a runtime representation of a stream,
+ * it is a declaration used in building a topology.
+ * The actual runtime stream is created once the topology
+ * is {@link org.apache.edgent.execution.Submitter#submit(Object) submitted}
+ * to a runtime.
+ * 
+ * </P>
+ * @param <T>
+ *            Tuple type.
+ */
+public interface TStream<T> extends TopologyElement {
+  
+    /**
+     * TYPE is used to identify {@link ControlService} mbeans registered for
+     * for a TStream.
+     * The value is {@value} 
+     */
+    public static final String TYPE = "stream";
+    // N.B. to avoid build problems due to topology <=> oplet, 
+    // other code contain a copy of this value (ugh) as TSTREAM_TYPE
+
+    /**
+     * Declare a new stream that filters tuples from this stream. Each tuple
+     * {@code t} on this stream will appear in the returned stream if
+     * {@link Predicate#test(Object) filter.test(t)} returns {@code true}. If
+     * {@code filter.test(t)} returns {@code false} then then {@code t} will not
+     * appear in the returned stream.
+     * <P>
+     * Examples of filtering out all empty strings from stream {@code s} of type
+     * {@code String}
+     * </P>
+     * 
+     * <pre>
+     * <code>
+     * TStream&lt;String&gt; s = ...
+     * TStream&lt;String&gt; filtered = s.filter(t -&gt; !t.isEmpty());
+     *             
+     * </code>
+     * </pre>
+     * 
+     * @param predicate
+     *            Filtering logic to be executed against each tuple.
+     * @return Filtered stream
+     */
+    TStream<T> filter(Predicate<T> predicate);
+
+    /**
+     * Declare a new stream that maps (or transforms) each tuple from this stream into one
+     * (or zero) tuple of a different type {@code U}. For each tuple {@code t}
+     * on this stream, the returned stream will contain a tuple that is the
+     * result of {@code mapper.apply(t)} when the return is not {@code null}.
+     * If {@code mapper.apply(t)} returns {@code null} then no tuple
+     * is submitted to the returned stream for {@code t}.
+     * 
+     * <P>
+     * Examples of transforming a stream containing numeric values as
+     * {@code String} objects into a stream of {@code Double} values.
+     * </P>
+     * 
+     * <pre>
+     * <code>
+     * // Using lambda expression
+     * TStream&lt;String&gt; strings = ...
+     * TStream&lt;Double&gt; doubles = strings.map(v -&gt; Double.valueOf(v));
+     * 
+     * // Using method reference
+     * TStream&lt;String&gt; strings = ...
+     * TStream&lt;Double&gt; doubles = strings.map(Double::valueOf);
+     * 
+     * </code>
+     * </pre>
+     * 
+     * @param <U> Tuple type of output stream
+     * @param mapper
+     *            Mapping logic to be executed against each tuple.
+     * @return Stream that will contain tuples of type {@code U} mapped from this
+     *         stream's tuples.
+     */
+    <U> TStream<U> map(Function<T, U> mapper);
+    
+    /**
+     * Declare a new stream that maps tuples from this stream into one or
+     * more (or zero) tuples of a different type {@code U}. For each tuple
+     * {@code t} on this stream, the returned stream will contain all non-null tuples in
+     * the {@code Iterator<U>} that is the result of {@code mapper.apply(t)}.
+     * Tuples will be added to the returned stream in the order the iterator
+     * returns them.
+     * 
+     * <BR>
+     * If the return is null or an empty iterator then no tuples are added to
+     * the returned stream for input tuple {@code t}.
+     * <P>
+     * Examples of mapping a stream containing lines of text into a stream
+     * of words split out from each line. The order of the words in the stream
+     * will match the order of the words in the lines.
+     * </P>
+     * 
+     * <pre>
+     * <code>
+     * TStream&lt;String&gt; lines = ...
+     * TStream&lt;String&gt; words = lines.flatMap(
+     *                     line -&gt; Arrays.asList(line.split(" ")));
+     *             
+     * </code>
+     * </pre>
+     * 
+     * @param <U> Type of mapped input tuples.
+     * @param mapper
+     *            Mapper logic to be executed against each tuple.     
+     * @return Stream that will contain tuples of type {@code U} mapped and flattened from this
+     *         stream's tuples.
+     */
+    <U> TStream<U> flatMap(Function<T, Iterable<U>> mapper);
+
+    /**
+     * Split a stream's tuples among {@code n} streams as specified by
+     * {@code splitter}.
+     * 
+     * <P>
+     * For each tuple on the stream, {@code splitter.applyAsInt(tuple)} is
+     * called. The return value {@code r} determines the destination stream:
+     * </P>
+     * 
+     * <pre>
+     * if r &lt; 0 the tuple is discarded
+     * else it is sent to the stream at position (r % n) in the returned array.
+     * </pre>
+     *
+     * <P>
+     * Each split {@code TStream} is exposed by the API. The user has full
+     * control over the each stream's processing pipeline. Each stream's
+     * pipeline must be declared explicitly. Each stream can have different
+     * processing pipelines.
+     * </P>
+     * <P>
+     * An N-way {@code split()} is logically equivalent to a collection of N
+     * {@code filter()} invocations, each with a predicate to select the tuples
+     * for its stream. {@code split()} is more efficient. Each tuple is analyzed
+     * only once by a single {@code splitter} instance to identify the
+     * destination stream. For example, these are logically equivalent:
+     * </P>
+     * <pre>
+     * List&lt;TStream&lt;String&gt;&gt; streams = stream.split(2, tuple -&gt; tuple.length());
+     * 
+     * TStream&lt;String&gt; stream0 = stream.filter(tuple -&gt; (tuple.length() % 2) == 0);
+     * TStream&lt;String&gt; stream1 = stream.filter(tuple -&gt; (tuple.length() % 2) == 1);
+     * </pre>
+     * <P>
+     * Example of splitting a stream of log records by their level attribute:
+     * </P>
+     * 
+     * <pre>
+     * <code>
+     * TStream&lt;LogRecord&gt; lrs = ...
+     * List&lt;&lt;TStream&lt;LogRecord&gt;&gt; splits = lrr.split(3, lr -&gt; {
+            if (SEVERE.equals(lr.getLevel()))
+                return 0;
+            else if (WARNING.equals(lr.getLevel()))
+                return 1;
+            else
+                return 2;
+        });
+     * splits.get(0). ... // SEVERE log record processing pipeline
+     * splits.get(1). ... // WARNING log record  processing pipeline
+     * splits.get(2). ... // "other" log record processing pipeline
+     * </code>
+     * </pre>
+     * 
+     * @param n
+     *            the number of output streams
+     * @param splitter
+     *            the splitter function
+     * @return List of {@code n} streams
+     * 
+     * @throws IllegalArgumentException
+     *             if {@code n <= 0}
+     */
+    List<TStream<T>> split(int n, ToIntFunction<T> splitter);
+
+    /**
+     * Split a stream's tuples among {@code enumClass.size} streams as specified by
+     * {@code splitter}.
+     *
+     * @param <E> Enum type
+     * @param enumClass
+     *            enum data to split
+     * @param splitter
+     *            the splitter function
+     * @return EnumMap&lt;E,TStream&lt;T&gt;&gt;
+     * @throws IllegalArgumentException
+     * if {@code enumclass.size <= 0}
+     */
+    <E extends Enum<E>> EnumMap<E,TStream<T>> split(Class<E> enumClass, Function<T, E> splitter);
+
+    /**
+     * Declare a stream that contains the same contents as this stream while
+     * peeking at each element using {@code peeker}. <BR>
+     * For each tuple {@code t} on this stream, {@code peeker.accept(t)} will be
+     * called.
+     * 
+     * @param peeker
+     *            Function to be called for each tuple.
+     * @return {@code this}
+     */
+    TStream<T> peek(Consumer<T> peeker);
+
+    /**
+     * Sink (terminate) this stream using a function. For each tuple {@code t} on this stream
+     * {@link Consumer#accept(Object) sinker.accept(t)} will be called. This is
+     * typically used to send information to external systems, such as databases
+     * or dashboards.
+     * <p>
+     * If {@code sinker} implements {@link AutoCloseable}, its {@code close()}
+     * method will be called when the topology's execution is terminated.
+     * </P>
+     * <P>
+     * Example of terminating a stream of {@code String} tuples by printing them
+     * to {@code System.out}.
+     * </P>
+     * 
+     * <pre>
+     * <code>
+     * TStream&lt;String&gt; values = ...
+     * values.sink(t -&gt; System.out.println(tuple));
+     * </code>
+     * </pre>
+     * 
+     * @param sinker
+     *            Logic to be executed against each tuple on this stream.
+     * @return sink element representing termination of this stream.
+     */
+    TSink<T> sink(Consumer<T> sinker);
+    
+    /**
+     * Sink (terminate) this stream using a oplet.
+     * This provides a richer api for a sink than
+     * {@link #sink(Consumer)} with a full life-cycle of
+     * the oplet as well as easy access to
+     * {@link org.apache.edgent.execution.services.RuntimeServices runtime services}.
+     * 
+     * @param oplet Oplet processes each tuple without producing output.
+     * @return sink element representing termination of this stream.
+     */
+    TSink<T> sink(Sink<T> oplet);
+
+    /**
+     * Declare a stream that contains the output of the specified {@link Pipe}
+     * oplet applied to this stream.
+     * 
+     * @param <U> Tuple type of the returned stream.
+     * @param pipe The {@link Pipe} oplet.
+     * 
+     * @return Declared stream that contains the tuples emitted by the pipe
+     *      oplet. 
+     */
+    <U> TStream<U> pipe(Pipe<T, U> pipe);
+
+    /**
+     * Declare a stream that contains the output of the specified 
+     * {@link FanIn} oplet applied to this stream and {@code others}.
+     * 
+     * @param <U> Tuple type of the returned streams.
+     * @param fanin The {@link FanIn} oplet.
+     * @param others The other input streams. 
+     *        Must not be empty or contain duplicates or {@code this}
+     * 
+     * @return a stream that contains the tuples emitted by the oplet.
+     * @see #union(Set)
+     * @see #pipe(Pipe)
+     * @see #sink(Sink)
+     */
+    <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others);
+
+    /**
+     * Declare a new stream that modifies each tuple from this stream into one
+     * (or zero) tuple of the same type {@code T}. For each tuple {@code t}
+     * on this stream, the returned stream will contain a tuple that is the
+     * result of {@code modifier.apply(t)} when the return is not {@code null}.
+     * The function may return the same reference as its input {@code t} or
+     * a different object of the same type.
+     * If {@code modifier.apply(t)} returns {@code null} then no tuple
+     * is submitted to the returned stream for {@code t}.
+     * 
+     * <P>
+     * Example of modifying a stream  {@code String} values by adding the suffix '{@code extra}'.
+     * </P>
+     * 
+     * <pre>
+     * <code>
+     * TStream&lt;String&gt; strings = ...
+     * TStream&lt;String&gt; modifiedStrings = strings.modify(t -&gt; t.concat("extra"));
+     * </code>
+     * </pre>
+     * 
+     * <P>
+     * This method is equivalent to
+     * {@code map(Function<T,T> modifier}).
+     * </P>
+     * 
+     * @param modifier
+     *            Modifier logic to be executed against each tuple.
+     * @return Stream that will contain tuples of type {@code T} modified from this
+     *         stream's tuples.
+     */
+    TStream<T> modify(UnaryOperator<T> modifier);
+
+    /**
+     * Convert this stream to a stream of {@code String} tuples by calling
+     * {@code toString()} on each tuple. This is equivalent to
+     * {@code map(Object::toString)}.
+     * 
+     * @return Declared stream that will contain each the string representation
+     *         of each tuple on this stream.
+     */
+    TStream<String> asString();
+
+    /**
+     * Utility method to print the contents of this stream
+     * to {@code System.out} at runtime. Each tuple is printed
+     * using {@code System.out.println(tuple)}.
+     * @return {@code TSink} for the sink processing.
+     */
+    TSink<T> print();
+    
+    /**
+     * Declare a partitioned window that continually represents the last {@code count}
+     * tuples on this stream for each partition. Each partition independently maintains the last
+     * {@code count} tuples for each key seen on this stream.
+     * If no tuples have been seen on the stream for a key then the corresponding partition will be empty.
+     * <BR>
+     * The window is partitioned by each tuple's key, obtained by {@code keyFunction}.
+     * For each tuple on the stream {@code keyFunction.apply(tuple)} is called
+     * and the returned value is the tuple's key. For any two tuples {@code ta,tb} in a partition
+     * {@code keyFunction.apply(ta).equals(keyFunction.apply(tb))} is true.
+     * <BR>
+     * The key function must return keys that implement {@code equals()} and {@code hashCode()} correctly.
+     * <P>
+     * To create a window partitioned using the tuple as the key use {@link org.apache.edgent.function.Functions#identity() identity()}
+     * as the key function.
+     * </P>
+     * <P>
+     * To create an unpartitioned window use a key function that returns a constant,
+     * by convention {@link org.apache.edgent.function.Functions#unpartitioned() unpartitioned()} is recommended.
+     * </P>
+     * 
+     * @param <K> Key type.
+     * 
+     * @param count Number of tuples to maintain in each partition.
+     * @param keyFunction Function that defines the key for each tuple.
+     * @return Window on this stream representing the last {@code count} tuples for each partition.
+     */
+    <K> TWindow<T, K> last(int count, Function<T, K> keyFunction);
+    
+    /**
+     * Declare a partitioned window that continually represents the last {@code time} seconds of 
+     * tuples on this stream for each partition. If no tuples have been 
+     * seen on the stream for a key in the last {@code time} seconds then the partition will be empty.
+     * Each partition independently maintains the last
+     * {@code count} tuples for each key seen on this stream.
+     * <BR>
+     * The window is partitioned by each tuple's key, obtained by {@code keyFunction}.
+     * For each tuple on the stream {@code keyFunction.apply(tuple)} is called
+     * and the returned value is the tuple's key. For any two tuples {@code ta,tb} in a partition
+     * {@code keyFunction.apply(ta).equals(keyFunction.apply(tb))} is true.
+     * <BR>
+     * The key function must return keys that implement {@code equals()} and {@code hashCode()} correctly.
+     * <P>
+     * To create a window partitioned using the tuple as the key use {@link org.apache.edgent.function.Functions#identity() identity()}
+     * as the key function.
+     * </P>
+     * <P>
+     * To create an unpartitioned window use a key function that returns a constant,
+     * by convention {@link org.apache.edgent.function.Functions#unpartitioned() unpartitioned()} is recommended.
+     * </P>
+     * 
+     * @param <K> Key type.
+     * 
+     * @param time Time to retain a tuple in a partition.
+     * @param unit Unit for {@code time}.
+     * @param keyFunction Function that defines the key for each tuple.
+     * @return Partitioned window on this stream representing the last {@code count} tuple.
+     */
+    <K> TWindow<T, K> last(long time, TimeUnit unit, Function<T, K> keyFunction);
+    
+    /**
+     * Declare a stream that will contain all tuples from this stream and
+     * {@code other}. A stream cannot be unioned with itself, in this case
+     * {@code this} will be returned.
+     * 
+     * @param other the other stream
+     * @return A stream that is the union of {@code this} and {@code other}.
+     */
+    TStream<T> union(TStream<T> other);
+
+    /**
+     * Declare a stream that will contain all tuples from this stream and all the
+     * streams in {@code others}. A stream cannot be unioned with itself, in
+     * this case the union will only contain tuples from this stream once. If
+     * {@code others} is empty or only contains {@code this} then {@code this}
+     * is returned.
+     * 
+     * @param others
+     *            Stream to union with this stream.
+     * @return A stream that is the union of {@code this} and {@code others}.
+     */
+    TStream<T> union(Set<TStream<T>> others);
+    
+    /**
+     * Adds the specified tags to the stream.  Adding the same tag to 
+     * a stream multiple times will not change the result beyond the 
+     * initial application.
+     * 
+     * @param values
+     *            Tag values.
+     * @return The tagged stream.
+     */
+    TStream<T> tag(String... values);
+
+    /**
+     * Returns the set of tags associated with this stream.
+     * 
+     * @return set of tags
+     */
+    Set<String> getTags(); 
+    
+    /**
+     * Set an alias for the stream.
+     * <p>
+     * The alias must be unique within the topology.
+     * The alias may be used in various contexts:
+     * </p>
+     * <ul>
+     * <li>Runtime control services for the stream are registered with this alias.</li>
+     * </ul>
+     * 
+     * @param alias an alias for the stream.
+     * @return this
+     * @throws IllegalStateException if the an alias has already been set.
+     * @see ControlService
+     */
+    TStream<T> alias(String alias);
+    
+    /**
+     * Returns the stream's alias if any.
+     * @return the alias. null if one has not be set.
+     */
+    String getAlias();
+    
+    /**
+     * Join this stream with a partitioned window of type {@code U} with key type {@code K}.
+     * For each tuple on this stream, it is joined with the contents of {@code window}
+     * for the key {@code keyer.apply(tuple)}. Each tuple is
+     * passed into {@code joiner} and the return value is submitted to the
+     * returned stream. If call returns null then no tuple is submitted.
+     * 
+     * @param <J> Tuple type of result stream
+     * @param <U> Tuple type of window to join with
+     * @param <K> Key type
+     * @param keyer Key function for this stream to match the window's key.
+     * @param window Keyed window to join this stream with.
+     * @param joiner Join function.
+     * @return A stream that is the results of joining this stream with
+     *         {@code window}.
+     */ 
+    <J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window, BiFunction<T, List<U>, J> joiner);
+    
+    /**
+     * Join this stream with the last tuple seen on a stream of type {@code U}
+     * with partitioning.
+     * For each tuple on this
+     * stream, it is joined with the last tuple seen on {@code lastStream}
+     * with a matching key (of type {@code K}).
+     * <BR>
+     * Each tuple {@code t} on this stream will match the last tuple
+     * {@code u} on {@code lastStream} if
+     * {@code keyer.apply(t).equals(lastStreamKeyer.apply(u))}
+     * is true.
+     * <BR>
+     * The assumption is made that
+     * the key classes correctly implement the contract for {@code equals} and
+     * {@code hashCode()}.
+     * <P>Each tuple is
+     * passed into {@code joiner} and the return value is submitted to the
+     * returned stream. If call returns null then no tuple is submitted.
+     * </P>
+     * @param <J> Tuple type of result stream
+     * @param <U> Tuple type of stream to join with
+     * @param <K> Key type
+     * @param keyer Key function for this stream
+     * @param lastStream Stream to join with.
+     * @param lastStreamKeyer Key function for {@code lastStream}
+     * @param joiner Join function.
+     * @return A stream that is the results of joining this stream with
+     *         {@code lastStream}.
+     */
+    <J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java b/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java
new file mode 100644
index 0000000..2665a76
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java
@@ -0,0 +1,120 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology;
+
+import java.util.List;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+
+/**
+ * Partitioned window of tuples. Logically a window
+ * represents an continuously updated ordered list of tuples according to the
+ * criteria that created it. For example {@link TStream#last(int, Function) s.last(10, zero())}
+ * declares a window with a single partition that at any time contains the last ten tuples seen on
+ * stream {@code s}.
+ * <P>
+ * Windows are partitioned which means the window's configuration
+ * is independently maintained for each key seen on the stream.
+ * For example with a window created using {@link TStream#last(int, Function) last(3, tuple -> tuple.getId())}
+ * then each key has its own window containing the last
+ * three tuples with the same key obtained from the tuple's identity using {@code getId()}.
+ * </P>
+ *
+ * @param <T> Tuple type
+ * @param <K> Partition key type
+ * 
+ * @see TStream#last(int, Function) Count based window
+ * @see TStream#last(long, java.util.concurrent.TimeUnit, Function) Time based window
+ */
+public interface TWindow<T, K> extends TopologyElement {
+    /**
+     * Declares a stream that is a continuous, sliding, aggregation of
+     * partitions in this window.
+     * <P>
+     * Changes in a partition's contents trigger an invocation of
+     * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+     * a {@code List<T>} containing all the tuples in the partition in
+     * insertion order from oldest to newest.  The list is stable
+     * during the aggregator invocation.
+     * <UL>
+     * <LI>Count-based window: the aggregator is called after each
+     * tuple added to a partition.  When an addition results in a tuple
+     * being evicted, the eviction occurs before the aggregator is called.</LI>
+     * <LI>Time-based window: the aggregator is called after each tuple
+     * added to a partition. The aggregator is also called
+     * each time one or more tuples are evicted from a partition 
+     * (multiple tuples may be evicted at once).  The list will be
+     * empty if the eviction results in an empty partition.</LI>
+     * </UL>
+     * A non-null {@code aggregator} result is added to the returned stream.
+     * </P>
+     * <P>
+     * Thus the returned stream will contain a sequence of tuples where the
+     * most recent tuple represents the most up to date aggregation of a
+     * partition.
+     *
+     * @param <U> Tuple type
+     * @param aggregator
+     *            Logic to aggregation a partition.
+     * @return A stream that contains the latest aggregations of partitions in this window.
+     */
+    <U> TStream<U> aggregate(BiFunction<List<T>, K, U> aggregator);
+    
+    /**
+     * Declares a stream that represents a batched aggregation of
+     * partitions in this window. 
+     * <P>
+     * Each partition "batch" triggers an invocation of
+     * {@code batcher.apply(tuples, key)}, where {@code tuples} is
+     * a {@code List<T>} containing all the tuples in the partition in
+     * insertion order from oldest to newest  The list is stable
+     * during the batcher invocation.
+     * <UL>
+     * <LI>Count-based window: a batch occurs when the partition is full.</LI>
+     * <LI>Time-based window: a batch occurs every "time" period units.  The
+     * list will be empty if no tuples have been received during the period.</LI>
+     * </UL>
+     * A non-null {@code batcher} result is added to the returned stream.
+     * The partition's contents are cleared after a batch is processed.
+     * </P>
+     * <P>
+     * Thus the returned stream will contain a sequence of tuples where the
+     * most recent tuple represents the most up to date aggregation of a
+     * partition.
+     * 
+     * @param <U> Tuple type
+     * @param batcher
+     *            Logic to aggregation a partition.
+     * @return A stream that contains the latest aggregations of partitions in this window.
+     */
+    <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
+    
+    /**
+     * Returns the key function used to map tuples to partitions.
+     * @return Key function used to map tuples to partitions.
+     */
+    Function<T, K> getKeyFunction();
+    
+    /**
+     * Get the stream that feeds this window.
+     * @return stream that feeds this window.
+     */
+    TStream<T> feeder();
+}