You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:42 UTC
[41/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java
deleted file mode 100644
index f34820e..0000000
--- a/api/topology/src/main/java/edgent/topology/plumbing/PlumbingStreams.java
+++ /dev/null
@@ -1,682 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.plumbing;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edgent.function.BiFunction;
-import edgent.function.Function;
-import edgent.function.ToIntFunction;
-import edgent.oplet.plumbing.Barrier;
-import edgent.oplet.plumbing.Isolate;
-import edgent.oplet.plumbing.PressureReliever;
-import edgent.oplet.plumbing.UnorderedIsolate;
-import edgent.topology.TStream;
-import edgent.topology.TopologyProvider;
-
-/**
- * Plumbing utilities for {@link TStream}.
- * Methods that manipulate the flow of tuples in a streaming topology,
- * but are not part of the logic of the application.
- */
-public class PlumbingStreams {
-
- /**
- * Insert a blocking delay between tuples.
- * Returned stream is the input stream delayed by {@code delay}.
- * <p>
- * Delays less than 1msec are translated to a 0 delay.
- * <p>
- * This function always adds the {@code delay} amount after receiving
- * a tuple before forwarding it.
- * <p>
- * Downstream tuple processing delays will affect
- * the overall delay of a subsequent tuple.
- * <p>
- * e.g., the input stream contains two tuples t1 and t2 and
- * the delay is 100ms. The forwarding of t1 is delayed by 100ms.
- * Then if a downstream processing delay of 80ms occurs, this function
- * receives t2 80ms after it forwarded t1 and it will delay another
- * 100ms before forwarding t2. Hence the overall delay between forwarding
- * t1 and t2 is 180ms.
- * See {@link #blockingThrottle(long, TimeUnit) blockingThrottle}.
- *
- * @param <T> Tuple type
- * @param stream Stream t
- * @param delay Amount of time to delay a tuple.
- * @param unit Time unit for {@code delay}.
- *
- * @return Stream that will be delayed.
- */
- public static <T> TStream<T> blockingDelay(TStream<T> stream, long delay, TimeUnit unit) {
- return stream.map(t -> {try {
- Thread.sleep(unit.toMillis(delay));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } return t;}) ;
- }
-
- /**
- * Maintain a constant blocking delay between tuples.
- * The returned stream is the input stream throttled by {@code delay}.
- * <p>
- * Delays less than 1msec are translated to a 0 delay.
- * <p>
- * Sample use:
- * <pre>{@code
- * TStream<String> stream = topology.strings("a", "b, "c");
- * // Create a stream with tuples throttled to 1 second intervals.
- * TStream<String> throttledStream = blockingThrottle(stream, 1, TimeUnit.SECOND);
- * // print out the throttled tuples as they arrive
- * throttledStream.peek(t -> System.out.println(new Date() + " - " + t));
- * }</pre>
- * <p>
- * The function adjusts for downstream processing delays.
- * The first tuple is not delayed. If {@code delay} has already
- * elapsed since the prior tuple was forwarded, the tuple
- * is forwarded immediately.
- * Otherwise, forwarding the tuple is delayed to achieve
- * a {@code delay} amount since forwarding the prior tuple.
- * <p>
- * e.g., the input stream contains two tuples t1 and t2 and
- * the delay is 100ms. The forwarding of t1 is delayed by 100ms.
- * Then if a downstream processing delay of 80ms occurs, this function
- * receives t2 80ms after it forwarded t1 and it will only delay another
- * 20ms (100ms - 80ms) before forwarding t2.
- * Hence the overall delay between forwarding t1 and t2 remains 100ms.
- *
- * @param <T> tuple type
- * @param stream the stream to throttle
- * @param delay Amount of time to delay a tuple.
- * @param unit Time unit for {@code delay}.
- * @return the throttled stream
- */
- public static <T> TStream<T> blockingThrottle(TStream<T> stream, long delay, TimeUnit unit) {
- return stream.map( blockingThrottle(delay, unit) );
- }
-
- private static <T> Function<T,T> blockingThrottle(long delay, TimeUnit unit) {
- long[] nextTupleTime = { 0 };
- return t -> {
- long now = System.currentTimeMillis();
- if (nextTupleTime[0] != 0) {
- if (now < nextTupleTime[0]) {
- try {
- Thread.sleep(nextTupleTime[0] - now);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- now = System.currentTimeMillis();
- }
- }
- nextTupleTime[0] = now + unit.toMillis(delay);
- return t;
- };
- }
-
- /**
- * Insert a blocking delay before forwarding the first tuple and
- * no delay for subsequent tuples.
- * <p>
- * Delays less than 1msec are translated to a 0 delay.
- * <p>
- * Sample use:
- * <pre>{@code
- * TStream<String> stream = topology.strings("a", "b, "c");
- * // create a stream where the first tuple is delayed by 5 seconds.
- * TStream<String> oneShotDelayedStream =
- * stream.map( blockingOneShotDelay(5, TimeUnit.SECONDS) );
- * }</pre>
- *
- * @param <T> tuple type
- * @param stream input stream
- * @param delay Amount of time to delay a tuple.
- * @param unit Time unit for {@code delay}.
- * @return the delayed stream
- */
- public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
- return stream.map( blockingOneShotDelay(delay, unit) );
- }
-
- private static <T> Function<T,T> blockingOneShotDelay(long delay, TimeUnit unit) {
- long[] initialDelay = { unit.toMillis(delay) };
- return t -> {
- if (initialDelay[0] != -1) {
- try {
- Thread.sleep(initialDelay[0]);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- initialDelay[0] = -1;
- }
- return t;
- };
- }
-
- /**
- * Relieve pressure on upstream processing by discarding tuples.
- * This method ensures that upstream processing is not
- * constrained by any delay in downstream processing,
- * for example by a connector not being able to connect
- * to its external system.
- * <P>
- * Any downstream processing of the returned stream is isolated
- * from {@code stream} so that any slow down does not affect {@code stream}.
- * When the downstream processing cannot keep up with rate of
- * {@code stream} tuples will be dropped from returned stream.
- * <BR>
- * Up to {@code count} of the most recent tuples per key from {@code stream}
- * are maintained when downstream processing is slow, any older tuples
- * that have not been submitted to the returned stream will be discarded.
- * <BR>
- * Tuple order is maintained within a partition but is not guaranteed to
- * be maintained across partitions.
- * </P>
- *
- * @param stream Stream to be isolated from downstream processing.
- * @param keyFunction Function defining the key of each tuple.
- * @param count Maximum number of tuples to maintain when downstream processing is backing up.
- * @return Stream that is isolated from and thus relieves pressure on {@code stream}.
- *
- * @param <T> Tuple type.
- * @param <K> Key type.
- * @see #isolate(TStream, int) isolate
- */
- public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
- return stream.pipe(new PressureReliever<>(count, keyFunction));
- }
-
- /**
- * Isolate upstream processing from downstream processing.
- * <BR>
- * Implementations may throw {@code OutOfMemoryExceptions}
- * if the processing against returned stream cannot keep up
- * with the arrival rate of tuples on {@code stream}.
- *
- * @param <T> Tuple type
- * @param stream Stream to be isolated from downstream processing.
- * @param ordered {@code true} to maintain arrival order on the returned stream,
- * {@code false} to not guaranteed arrival order.
- * @return Stream that is isolated from {@code stream}.
- */
- public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
- return stream.pipe(
- ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
- }
-
- /**
- * Isolate upstream processing from downstream processing.
- * <P>
- * If the processing against the returned stream cannot keep up
- * with the arrival rate of tuples on {@code stream}, upstream
- * processing will block until there is space in the queue between
- * the streams.
- * </P><P>
- * Processing of tuples occurs in the order they were received.
- * </P>
- *
- * @param <T> Tuple type
- * @param stream Stream to be isolated from downstream processing.
- * @param queueCapacity size of the queue between {@code stream} and
- * the returned stream.
- * @return Stream that is isolated from {@code stream}.
- * @see #pressureReliever(TStream, Function, int) pressureReliever
- */
- public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
- return stream.pipe(new Isolate<T>(queueCapacity));
- }
-
- /**
- * Perform analytics concurrently.
- * <P>
- * This is a convenience function that calls
- * {@link #concurrent(TStream, List, Function)} after
- * creating {@code pipeline} and {@code combiner} functions
- * from the supplied {@code mappers} and {@code combiner} arguments.
- * </P><P>
- * That is, it is logically, if not exactly, the same as:
- * </P>
- * <pre>{@code
- * List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
- * for (Function<T,U> mapper : mappers)
- * pipelines.add(s -> s.map(mapper));
- * concurrent(stream, pipelines, combiner);
- * }</pre>
- *
- * @param <T> Tuple type on input stream.
- * @param <U> Tuple type generated by mappers.
- * @param <R> Tuple type of the result.
- *
- * @param stream input stream
- * @param mappers functions to be run concurrently. Each mapper MUST
- * return a non-null result.
- * A runtime error will be generated if a null result
- * is returned.
- * @param combiner function to create a result tuple from the list of
- * results from {@code mappers}.
- * The input list order is 1:1 with the {@code mappers} list.
- * I.e., list entry [0] is the result from mappers[0],
- * list entry [1] is the result from mappers[1], etc.
- * @return result stream
- */
- public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
- Objects.requireNonNull(stream, "stream");
- Objects.requireNonNull(mappers, "mappers");
- Objects.requireNonNull(combiner, "combiner");
-
- List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
- for (Function<T,U> mapper : mappers) {
- pipelines.add(s -> s.map(mapper));
- }
-
- return concurrent(stream, pipelines, combiner);
- }
-
- /**
- * Perform analytics concurrently.
- * <P>
- * Process input tuples one at at time, invoking the specified
- * analytics ({@code pipelines}) concurrently, combine the results,
- * and then process the next input tuple in the same manner.
- * </P><P>
- * Logically, instead of doing this:
- * </P>
- * <pre>{@code
- * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
- * }</pre>
- * create a graph that's logically like this:
- * <pre>{@code
- * -
- * |-> A1 ->|
- * sensorReadings<T> -> |-> A2 ->| -> results<R>
- * |-> A3 ->|
- *
- * }</pre>
- * more specifically a graph like this:
- * <pre>{@code
- * -
- * |-> isolate(1) -> pipeline1 -> |
- * stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner
- * |-> isolate(1) -> pipeline3 -> |
- * . . .
- * }</pre>
- * <P>
- * The typical use case for this is when an application has a collection
- * of independent analytics to perform on each tuple and the analytics
- * are sufficiently long running such that performing them concurrently
- * is desired.
- * </P><P>
- * Note, this is in contrast to "parallel" stream processing,
- * which in Java8 Streams and other contexts means processing multiple
- * tuples in parallel, each on a replicated processing pipeline.
- * </P><P>
- * Threadsafety - one of the following must be true:
- * </P>
- * <ul>
- * <li>the tuples from {@code stream} are threadsafe</li>
- * <li>the {@code pipelines} do not modify the input tuples</li>
- * <li>the {@code pipelines} provide their own synchronization controls
- * to protect concurrent modifications of the input tuples</li>
- * </ul>
- * <P>
- * Logically, a thread is allocated for each of the {@code pipelines}.
- * The actual degree of concurrency may be {@link TopologyProvider} dependent.
- * </P>
- *
- * @param <T> Tuple type on input stream.
- * @param <U> Tuple type generated by pipelines.
- * @param <R> Tuple type of the result.
- *
- * @param stream input stream
- * @param pipelines a list of functions to add a pipeline to the topology.
- * Each {@code pipeline.apply()} is called with {@code stream}
- * as the input, yielding the pipeline's result stream.
- * For each input tuple, a pipeline MUST create exactly one output tuple.
- * Tuple flow into the pipelines will cease if that requirement
- * is not met.
- * @param combiner function to create a result tuple from the list of
- * results from {@code pipelines}.
- * The input tuple list's order is 1:1 with the {@code pipelines} list.
- * I.e., list entry [0] is the result from pipelines[0],
- * list entry [1] is the result from pipelines[1], etc.
- * @return result stream
- * @see #barrier(List, int) barrier
- */
- public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
- Objects.requireNonNull(stream, "stream");
- Objects.requireNonNull(pipelines, "pipelines");
- Objects.requireNonNull(combiner, "combiner");
-
- int barrierQueueCapacity = 10; // don't preclude pipelines from getting ahead some.
-
- // Add concurrent (isolated) fanouts
- List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
- for (int i = 0; i < pipelines.size(); i++)
- fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
-
- // Add pipelines
- List<TStream<U>> results = new ArrayList<>(pipelines.size());
- int ch = 0;
- for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
- results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
- ch++;
- }
-
- // Add the barrier
- TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
-
- // Add the combiner
- return barrier.map(combiner);
- }
-
- /**
- * A tuple synchronization barrier.
- * <P>
- * Same as {@code barrier(others, 1)}
- * </P>
- * @param <T> Tuple type
- * @param streams input streams
- * @return the output stream
- * @see #barrier(List, int)
- */
- public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
- return barrier(streams, 1);
- }
-
- /**
- * A tuple synchronization barrier.
- * <P>
- * A barrier has n input streams with tuple type {@code T}
- * and one output stream with tuple type {@code List<T>}.
- * Once the barrier receives one tuple on each of its input streams,
- * it generates an output tuple containing one tuple from each input stream.
- * It then waits until it has received another tuple from each input stream.
- * </P><P>
- * Input stream 0's tuple is in the output tuple's list[0],
- * stream 1's tuple in list[1], and so on.
- * </P><P>
- * The barrier's output stream is isolated from the input streams.
- * </P><P>
- * The barrier has a queue of size {@code queueCapacity} for each
- * input stream. When a tuple for an input stream is received it is
- * added to its queue. The stream will block if the queue is full.
- * </P>
- *
- * @param <T> Type of the tuple.
- *
- * @param streams the list of input streams
- * @param queueCapacity the size of each input stream's queue
- * @return the output stream
- * @see Barrier
- */
- public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
- List<TStream<T>> others = new ArrayList<>(streams);
- TStream<T> s1 = others.remove(0);
- return s1.fanin(new Barrier<T>(queueCapacity), others);
- }
-
- /**
- * Perform an analytic function on tuples in parallel.
- * <P>
- * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
- * </P>
- * @param <T> Input stream tuple type
- * @param <U> Result stream tuple type
- * @param stream input stream
- * @param splitter the tuple channel allocation function
- * @param mapper analytic function
- * @param width number of channels
- * @return the unordered result stream
- * @see #roundRobinSplitter(int) roundRobinSplitter
- * @see #concurrentMap(TStream, List, Function) concurrentMap
- */
- public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
- BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
- return parallel(stream, width, splitter, pipeline);
- }
-
- /**
- * Perform an analytic pipeline on tuples in parallel.
- * <P>
- * Splits {@code stream} into {@code width} parallel processing channels,
- * partitioning tuples among the channels using {@code splitter}.
- * Each channel runs a copy of {@code pipeline}.
- * The resulting stream is isolated from the upstream parallel channels.
- * </P><P>
- * The ordering of tuples in {@code stream} is not maintained in the
- * results from {@code parallel}.
- * </P><P>
- * {@code pipeline} is not required to yield a result for each input
- * tuple.
- * </P><P>
- * A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
- * </P><P>
- * The generated graph looks like this:
- * </P>
- * <pre>{@code
- * -
- * |-> isolate(10) -> pipeline-ch1 -> |
- * stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
- * |-> isolate(10) -> pipeline-ch3 -> |
- * . . .
- * }</pre>
- *
- * @param <T> Input stream tuple type
- * @param <R> Result stream tuple type
- *
- * @param stream the input stream
- * @param width number of parallel processing channels
- * @param splitter the tuple channel allocation function
- * @param pipeline the pipeline for each channel.
- * {@code pipeline.apply(inputStream,channel)}
- * is called to generate the pipeline for each channel.
- * @return the isolated unordered result from each parallel channel
- * @see #roundRobinSplitter(int) roundRobinSplitter
- * @see #concurrent(TStream, List, Function) concurrent
- */
- public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
- Objects.requireNonNull(stream, "stream");
- if (width < 1)
- throw new IllegalArgumentException("width");
- Objects.requireNonNull(splitter, "splitter");
- Objects.requireNonNull(pipeline, "pipeline");
-
- // Add the splitter
- List<TStream<T>> channels = stream.split(width, splitter);
- for (int ch = 0; ch < width; ch++)
- channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
-
- // Add concurrency (isolation) to the channels
- int chBufferSize = 10; // don't immediately block stream if channel is busy
- for (int ch = 0; ch < width; ch++)
- channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
-
- // Add pipelines
- List<TStream<R>> results = new ArrayList<>(width);
- for (int ch = 0; ch < width; ch++) {
- results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
- }
-
- // Add the Union
- TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
-
- // Add the isolate - keep channel threads to just their pipeline processing
- return isolate(result, width);
- }
-
- /**
- * Perform an analytic pipeline on tuples in parallel.
- * <P>
- * Splits {@code stream} into {@code width} parallel processing channels,
- * partitioning tuples among the channels in a load balanced fashion.
- * Each channel runs a copy of {@code pipeline}.
- * The resulting stream is isolated from the upstream parallel channels.
- * </P><P>
- * The ordering of tuples in {@code stream} is not maintained in the
- * results from {@code parallel}.
- * </P><P>
- * A {@code pipeline} <b>MUST</b> yield a result for each input
- * tuple. Failure to do so will result in the channel remaining
- * in a busy state and no longer available to process additional tuples.
- * </P><P>
- * A {@link LoadBalancedSplitter} is used to distribute tuples.
- * </P><P>
- * The generated graph looks like this:
- * </P>
- * <pre>{@code
- * -
- * |-> isolate(1) -> pipeline-ch1 -> peek(splitter.channelDone()) -> |
- * stream -> split(width,splitter) -> |-> isolate(1) -> pipeline-ch2 -> peek(splitter.channelDone()) -> |-> union -> isolate(width)
- * |-> isolate(1) -> pipeline-ch3 -> peek(splitter.channelDone()) -> |
- * . . .
- * }</pre>
- * <P>
- * Note, this implementation requires that the splitter is used from
- * only a single JVM. The {@link edgent.providers.direct.DirectProvider DirectProvider}
- * provider meets this requirement.
- * </P>
- *
- * @param <T> Input stream tuple type
- * @param <R> Result stream tuple type
- *
- * @param stream the input stream
- * @param width number of parallel processing channels
- * @param pipeline the pipeline for each channel.
- * {@code pipeline.apply(inputStream,channel)}
- * is called to generate the pipeline for each channel.
- * @return the isolated unordered result from each parallel channel
- * @see #parallel(TStream, int, ToIntFunction, BiFunction)
- * @see LoadBalancedSplitter
- */
- public static <T,R> TStream<R> parallelBalanced(TStream<T> stream, int width, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
- Objects.requireNonNull(stream, "stream");
- if (width < 1)
- throw new IllegalArgumentException("width");
- Objects.requireNonNull(pipeline, "pipeline");
-
- LoadBalancedSplitter<T> splitter = new LoadBalancedSplitter<>(width);
-
- // Add the splitter
- List<TStream<T>> channels = stream.split(width, splitter);
- for (int ch = 0; ch < width; ch++)
- channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
-
- // Add concurrency (isolation) to the channels
- int chBufferSize = 1; // 1 is enough with load balanced impl
- for (int ch = 0; ch < width; ch++)
- channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
-
- // Add pipelines
- List<TStream<R>> results = new ArrayList<>(width);
- for (int ch = 0; ch < width; ch++) {
- final int finalCh = ch;
- results.add(pipeline.apply(channels.get(ch), ch)
- .tag("parallel-ch"+ch)
- .peek(tuple -> splitter.channelDone(finalCh)));
- }
-
- // Add the Union
- TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
-
- // Add the isolate - keep channel threads to just their pipeline processing
- return isolate(result, width);
- }
-
- /**
- * A round-robin splitter ToIntFunction
- * <P>
- * The splitter function cycles among the {@code width} channels
- * on successive calls to {@code roundRobinSplitter.applyAsInt()},
- * returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
- * </P>
- * @param <T> Tuple type
- * @param width number of splitter channels
- * @return the splitter
- * @see TStream#split(int, ToIntFunction) TStream.split
- * @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
- */
- public static <T> ToIntFunction<T> roundRobinSplitter(int width) {
- AtomicInteger cnt = new AtomicInteger();
- return tuple -> cnt.getAndIncrement() % width;
- }
- /**
- * Control the flow of tuples to an output stream.
- * <P>
- * A {@link Semaphore}
- * is used to control the flow of tuples
- * through the {@code gate}
- * . The gate acquires a permit from the
- * semaphore to pass the tuple through, blocking until a permit is
- * acquired (and applying backpressure upstream while blocked).
- * Elsewhere, some code calls {@link Semaphore#release(int)}
- * to make permits available.
- * </P><P>
- * If a TopologyProvider is used that can distribute a topology's
- * streams to different JVM's the gate and the code releasing the
- * permits must be in the same JVM.
- * </P><P>
- * Sample use:
- * <BR>
- * Suppose you wanted to control processing such that concurrent
- * pipelines processed each tuple in lock-step.
- * I.e., You want all of the pipelines to start processing a tuple
- * at the same time and not start a new tuple until the current
- * tuple had been fully processed by each of them:
- * </P>
- * <pre>{@code
- * TStream<Integer> readings = ...;
- * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
- * TStream<Integer> gated = gate(readings, gateControl);
- *
- * // Create the concurrent pipeline combiner and have it
- * // signal that concurrent processing of the tuple has completed.
- * // In this sample the combiner just returns the received list of
- * // each pipeline result.
- *
- * Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
- * stream -> stream.map(list -> { * gateControl.release(); * return list; * });
- *
- * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
- * }</pre>
- * @param <T> Tuple type
- * @param stream the input stream
- * @param semaphore gate control
- * @return gated stream
- */
- public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
- return stream.map(tuple -> {
- try {
- semaphore.acquire();
- return tuple;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("interrupted", e);
- }
- });
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/plumbing/Valve.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/plumbing/Valve.java b/api/topology/src/main/java/edgent/topology/plumbing/Valve.java
deleted file mode 100644
index a56f234..0000000
--- a/api/topology/src/main/java/edgent/topology/plumbing/Valve.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.plumbing;
-
-import edgent.function.Predicate;
-
-/**
- * A generic "valve" {@link Predicate}.
- * <p>
- * A valve is either open or closed.
- * When used as a Predicate to {@code TStream.filter()},
- * filter passes tuples only when the valve is open.
- * </p><p>
- * A valve is typically used to dynamically control whether or not
- * some downstream tuple processing is enabled. A decision to change the
- * state of the valve may be a result of local analytics or an external
- * command.
- * <br>
- * E.g., in a simple case, a Valve might be used to control
- * whether or not logging or publishing of tuples is enabled.
- * </p>
- * <pre>{@code
- * TStream<JsonObject> stream = ...;
- *
- * Valve<JsonObject> valve = new Valve<>(false);
- * stream.filter(valve).sink(someTupleLoggingConsumer);
- *
- * // from some analytic or device command handler...
- * valve.setOpen(true);
- * }</pre>
- *
- * @param <T> tuple type
- */
-public class Valve<T> implements Predicate<T> {
- private static final long serialVersionUID = 1L;
- private transient boolean isOpen;
-
- /**
- * Create a new Valve Predicate
- * <p>
- * Same as {@code Valve(true)}
- */
- public Valve() {
- this(true);
- }
-
- /**
- * Create a new Valve Predicate
- * <p>
- * @param isOpen the initial state
- */
- public Valve(boolean isOpen) {
- setOpen(isOpen);
- }
-
- /**
- * Set the valve state
- * @param isOpen true to open the valve
- */
- public void setOpen(boolean isOpen) {
- this.isOpen = isOpen;
- }
-
- /**
- * Get the valve state
- * @return the state, true if the valve is open, false otherwise
- */
- public boolean isOpen() {
- return isOpen;
- }
-
- /**
- * Test the state of the valve, {@code value} is ignored.
- * @return true when the valve is open, false otherwise
- */
- @Override
- public boolean test(T value) {
- return isOpen;
- }
-
- /**
- * Returns a String for development/debug support. Content subject to change.
- */
- @Override
- public String toString() {
- return "isOpen="+isOpen;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/plumbing/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/plumbing/package-info.java b/api/topology/src/main/java/edgent/topology/plumbing/package-info.java
deleted file mode 100644
index 8a1e287..0000000
--- a/api/topology/src/main/java/edgent/topology/plumbing/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Plumbing for a streaming topology.
- * Methods that manipulate the flow of tuples in a streaming topology,
- * but are not part of the logic of the application.
- */
-package edgent.topology.plumbing;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/services/ApplicationService.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/services/ApplicationService.java b/api/topology/src/main/java/edgent/topology/services/ApplicationService.java
deleted file mode 100644
index 12262d7..0000000
--- a/api/topology/src/main/java/edgent/topology/services/ApplicationService.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.services;
-
-import java.util.Set;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.Submitter;
-import edgent.function.BiConsumer;
-import edgent.topology.Topology;
-import edgent.topology.mbeans.ApplicationServiceMXBean;
-
-/**
- * Application registration service.
- * A service that allows registration of applications and
- * the ability to submit them through a control MBean.
- *
- * @see ApplicationServiceMXBean
- */
-public interface ApplicationService {
-
- /**
- * Default alias a service registers its control MBean as.
- * Value is {@value}.
- */
- String ALIAS = "edgent";
-
- /**
- * Prefix ({@value}) reserved for system application names.
- */
- String SYSTEM_APP_PREFIX = "edgent";
-
- /**
- * Add a topology that can be started though a control mbean.
- * Any registration replaces any existing application with the same name.
- * <BR>
- * When a {@link ApplicationServiceMXBean#submit(String, String) submit}
- * is invoked {@code builder.accept(topology, config)} is called passing:
- * <UL>
- * <LI>
- * {@code topology} - An empty topology with the name {@code applicationName}.
- * </LI>
- * <LI>
- * {@code config} - JSON submission configuration from
- * {@link ApplicationServiceMXBean#submit(String, String) submit}.
- * </LI>
- * </UL>
- * Once {@code builder.accept(topology, config)} returns it is submitted
- * to the {@link Submitter} associated with the implementation of this service.
- * <P>
- * Application names starting with {@link #SYSTEM_APP_PREFIX edgent} are reserved
- * for system applications.
- * </P>
- *
- * @param applicationName Application name to register.
- * @param builder How to build the topology for this application.
- *
- * @see ApplicationServiceMXBean
- */
- void registerTopology(String applicationName, BiConsumer<Topology, JsonObject> builder);
-
- /**
- * Register a jar file containing new applications.
- * Any service provider within the jar of type {@link TopologyBuilder}
- * will be {@link #registerTopology(String, BiConsumer) registered} as
- * a topology.
- *
- * The jar cannot have any new dependencies, its classpath will
- * be the classpath of this service.
- *
- * @param jarURL URL of Jar containing new applications.
- * @param jsonConfig Configuration information, currently unused.
- */
- void registerJar(String jarURL, String jsonConfig) throws Exception;
-
- /**
- * Returns the names of applications registered with this service.
- *
- * @return the names of applications registered with this service.
- */
- Set<String> getApplicationNames();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java b/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java
deleted file mode 100644
index 7c756db..0000000
--- a/api/topology/src/main/java/edgent/topology/services/TopologyBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.topology.services;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.BiConsumer;
-import edgent.topology.Topology;
-
-/**
- * Represents an topology that can be built.
- *
- * A class implementing {@code TopologyBuilder} can
- * be registered as a service provider in a jar file for
- * automatic application registration using
- * {@link ApplicationService#registerJar(String, String)}.
- *
- */
-public interface TopologyBuilder {
- /**
- * Name the application will be known as.
- * @return Name the application will be known as.
- */
- String getName();
-
- /**
- * How the application is built.
- * @return Function that builds the application.
- */
- BiConsumer<Topology, JsonObject> getBuilder();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/services/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/services/package-info.java b/api/topology/src/main/java/edgent/topology/services/package-info.java
deleted file mode 100644
index b647a1a..0000000
--- a/api/topology/src/main/java/edgent/topology/services/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-/**
- * Services for topologies.
- *
- */
-package edgent.topology.services;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/tester/Condition.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/tester/Condition.java b/api/topology/src/main/java/edgent/topology/tester/Condition.java
deleted file mode 100644
index eb5c2b3..0000000
--- a/api/topology/src/main/java/edgent/topology/tester/Condition.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
- */
-package edgent.topology.tester;
-
-/**
- * Function representing if a condition is valid or not.
- *
- * @param <T> Condition's result type
- */
-public interface Condition<T> {
-
- boolean valid();
-
- T getResult();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/tester/Tester.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/tester/Tester.java b/api/topology/src/main/java/edgent/topology/tester/Tester.java
deleted file mode 100644
index 4bf0fd6..0000000
--- a/api/topology/src/main/java/edgent/topology/tester/Tester.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
- */
-package edgent.topology.tester;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyElement;
-
-/**
- * A {@code Tester} adds the ability to test a topology in a test framework such
- * as JUnit.
- *
- * The main feature is the ability to capture tuples from a {@link TStream} in
- * order to perform some form of verification on them. There are two mechanisms
- * to perform verifications:
- * <UL>
- * <LI>did the stream produce the correct number of tuples.</LI>
- * <LI>did the stream produce the correct tuples.</LI>
- * </UL>
- * Currently, only streams that are instances of
- * {@code TStream<String>} can have conditions or handlers attached.
- * <P>
- * A {@code Tester} modifies its {@link Topology} to achieve the above purpose.
- * </P>
- */
-public interface Tester extends TopologyElement {
-
- /**
- * Return a condition that evaluates if {@code stream} has submitted exactly
- * {@code expectedCount} number of tuples. The function may be evaluated
- * after the {@link Submitter#submit(Object, JsonObject) submit}
- * call has returned. <BR>
- * The {@link Condition#getResult() result} of the returned
- * {@code Condition} is the number of tuples seen on {@code stream} so far.
- * <BR>
- * If the topology is still executing then the returned values from
- * {@link Condition#valid()} and {@link Condition#getResult()} may change as
- * more tuples are seen on {@code stream}. <BR>
- *
- * @param stream
- * Stream to be tested.
- * @param expectedCount
- * Number of tuples expected on {@code stream}.
- * @return True if the stream has submitted exactly {@code expectedCount}
- * number of tuples, false otherwise.
- */
- Condition<Long> tupleCount(TStream<?> stream, long expectedCount);
-
- /**
- * Return a condition that evaluates if {@code stream} has submitted at
- * least {@code expectedCount} number of tuples. The function may be
- * evaluated after the
- * {@link Submitter#submit(Object, JsonObject) submit} call has returned. <BR>
- * The {@link Condition#getResult() result} of the returned
- * {@code Condition} is the number of tuples seen on {@code stream} so far.
- * <BR>
- * If the topology is still executing then the returned values from
- * {@link Condition#valid()} and {@link Condition#getResult()} may change as
- * more tuples are seen on {@code stream}. <BR>
- *
- * @param stream
- * Stream to be tested.
- * @param expectedCount
- * Number of tuples expected on {@code stream}.
- * @return Condition that will return true the stream has submitted at least
- * {@code expectedCount} number of tuples, false otherwise.
- */
- Condition<Long> atLeastTupleCount(TStream<?> stream, long expectedCount);
-
- /**
- * Return a condition that evaluates if {@code stream} has submitted
- * tuples matching {@code values} in the same order. <BR>
- * The {@link Condition#getResult() result} of the returned
- * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
- * If the topology is still executing then the returned values from
- * {@link Condition#valid()} and {@link Condition#getResult()} may change as
- * more tuples are seen on {@code stream}. <BR>
- *
- * @param <T> Tuple type
- * @param stream
- * Stream to be tested.
- * @param values
- * Expected tuples on {@code stream}.
- * @return Condition that will return true if the stream has submitted at
- * least tuples matching {@code values} in the same order, false
- * otherwise.
- */
- <T> Condition<List<T>> streamContents(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
-
- /**
- * Return a condition that evaluates if {@code stream} has submitted
- * tuples matching {@code values} in any order. <BR>
- * The {@link Condition#getResult() result} of the returned
- * {@code Condition} is the tuples seen on {@code stream} so far. <BR>
- * If the topology is still executing then the returned values from
- * {@link Condition#valid()} and {@link Condition#getResult()} may change as
- * more tuples are seen on {@code stream}. <BR>
- *
- * @param <T> Tuple type
- * @param stream
- * Stream to be tested.
- * @param values
- * Expected tuples on {@code stream}.
- * @return Condition that will return true if the stream has submitted at
- * least tuples matching {@code values} in the any order, false
- * otherwise.
- */
- <T> Condition<List<T>> contentsUnordered(TStream<T> stream, @SuppressWarnings("unchecked") T... values);
-
- /**
- * Return a condition that is valid only if all of {@code conditions} are valid.
- * The result of the condition is {@link Condition#valid()}
- * @param conditions Conditions to AND together.
- * @return condition that is valid only if all of {@code conditions} are valid.
- */
- Condition<Boolean> and(final Condition<?>... conditions);
-
- /**
- * Submit the topology for this tester and wait for it to complete, or reach
- * an end condition. If the topology does not complete or reach its end
- * condition before {@code timeout} then it is terminated.
- * <P>
- * End condition is usually a {@link Condition} returned from
- * {@link #atLeastTupleCount(TStream, long)} or
- * {@link #tupleCount(TStream, long)} so that this method returns once the
- * stream has submitted a sufficient number of tuples. <BR>
- * Note that the condition will be only checked periodically up to
- * {@code timeout}, so that if the condition is only valid for a brief
- * period of time, then its valid state may not be seen, and thus this
- * method will wait for the timeout period.
- * </P>
- *
- * @param submitter the {@link Submitter}
- * @param config
- * submission configuration.
- * @param endCondition
- * Condition that will cause this method to return if it is true.
- * @param timeout
- * Maximum time to wait for the topology to complete or reach its
- * end condition.
- * @param unit
- * Unit for {@code timeout}.
- * @return The value of {@code endCondition.valid()}.
- *
- * @throws Exception
- * Failure submitting or executing the topology.
- */
- boolean complete(Submitter<Topology, ? extends Job> submitter, JsonObject config, Condition<?> endCondition,
- long timeout, TimeUnit unit) throws Exception;
-
- /**
- * Get the {@code Job} reference for the topology submitted by {@code complete()}.
- * @return {@code Job} reference for the topology submitted by {@code complete()}.
- * Null if the {@code complete()} has not been called or the {@code Job} instance is not yet available.
- */
- Job getJob();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/edgent/topology/tester/package-info.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/edgent/topology/tester/package-info.java b/api/topology/src/main/java/edgent/topology/tester/package-info.java
deleted file mode 100644
index b5e8aed..0000000
--- a/api/topology/src/main/java/edgent/topology/tester/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Testing for a streaming topology.
- */
-package edgent.topology.tester;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/TSink.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/TSink.java b/api/topology/src/main/java/org/apache/edgent/topology/TSink.java
new file mode 100644
index 0000000..6bf7563
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/TSink.java
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology;
+
+/**
+ * Termination point (sink) for a stream.
+ *
+ * @param <T> Tuple type
+ */
+public interface TSink<T> extends TopologyElement {
+ /**
+ * Get the stream feeding this sink.
+ * The returned reference may be used for
+ * further processing of the feeder stream.
+ * <BR>
+ * For example, {@code s.print().filter(...)}
+ * <BR>
+ * Here the filter is applied
+ * to {@code s} so that {@code s} feeds
+ * the {@code print()} and {@code filter()}.
+ *
+ * @return stream feeding this sink.
+ */
+ public TStream<T> getFeed();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/TStream.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/TStream.java b/api/topology/src/main/java/org/apache/edgent/topology/TStream.java
new file mode 100644
index 0000000..e89746f
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/TStream.java
@@ -0,0 +1,543 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology;
+
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Predicate;
+import org.apache.edgent.function.ToIntFunction;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.oplet.core.FanIn;
+import org.apache.edgent.oplet.core.Pipe;
+import org.apache.edgent.oplet.core.Sink;
+
+/**
+ * A {@code TStream} is a declaration of a continuous sequence of tuples. A
+ * connected topology of streams and functional transformations is built using
+ * {@link Topology}. <BR>
+ * Generic methods on this interface provide the ability to
+ * {@link #filter(Predicate) filter}, {@link #map(Function)
+ * map (or transform)} or {@link #sink(Consumer) sink} this declared stream using a
+ * function.
+ * <P>
+ * {@code TStream} is not a runtime representation of a stream,
+ * it is a declaration used in building a topology.
+ * The actual runtime stream is created once the topology
+ * is {@link org.apache.edgent.execution.Submitter#submit(Object) submitted}
+ * to a runtime.
+ *
+ * </P>
+ * @param <T>
+ * Tuple type.
+ */
+public interface TStream<T> extends TopologyElement {
+
+ /**
+ * TYPE is used to identify {@link ControlService} mbeans registered for
+ * for a TStream.
+ * The value is {@value}
+ */
+ public static final String TYPE = "stream";
+ // N.B. to avoid build problems due to topology <=> oplet,
+ // other code contain a copy of this value (ugh) as TSTREAM_TYPE
+
+ /**
+ * Declare a new stream that filters tuples from this stream. Each tuple
+ * {@code t} on this stream will appear in the returned stream if
+ * {@link Predicate#test(Object) filter.test(t)} returns {@code true}. If
+ * {@code filter.test(t)} returns {@code false} then then {@code t} will not
+ * appear in the returned stream.
+ * <P>
+ * Examples of filtering out all empty strings from stream {@code s} of type
+ * {@code String}
+ * </P>
+ *
+ * <pre>
+ * <code>
+ * TStream<String> s = ...
+ * TStream<String> filtered = s.filter(t -> !t.isEmpty());
+ *
+ * </code>
+ * </pre>
+ *
+ * @param predicate
+ * Filtering logic to be executed against each tuple.
+ * @return Filtered stream
+ */
+ TStream<T> filter(Predicate<T> predicate);
+
+ /**
+ * Declare a new stream that maps (or transforms) each tuple from this stream into one
+ * (or zero) tuple of a different type {@code U}. For each tuple {@code t}
+ * on this stream, the returned stream will contain a tuple that is the
+ * result of {@code mapper.apply(t)} when the return is not {@code null}.
+ * If {@code mapper.apply(t)} returns {@code null} then no tuple
+ * is submitted to the returned stream for {@code t}.
+ *
+ * <P>
+ * Examples of transforming a stream containing numeric values as
+ * {@code String} objects into a stream of {@code Double} values.
+ * </P>
+ *
+ * <pre>
+ * <code>
+ * // Using lambda expression
+ * TStream<String> strings = ...
+ * TStream<Double> doubles = strings.map(v -> Double.valueOf(v));
+ *
+ * // Using method reference
+ * TStream<String> strings = ...
+ * TStream<Double> doubles = strings.map(Double::valueOf);
+ *
+ * </code>
+ * </pre>
+ *
+ * @param <U> Tuple type of output stream
+ * @param mapper
+ * Mapping logic to be executed against each tuple.
+ * @return Stream that will contain tuples of type {@code U} mapped from this
+ * stream's tuples.
+ */
+ <U> TStream<U> map(Function<T, U> mapper);
+
+ /**
+ * Declare a new stream that maps tuples from this stream into one or
+ * more (or zero) tuples of a different type {@code U}. For each tuple
+ * {@code t} on this stream, the returned stream will contain all non-null tuples in
+ * the {@code Iterator<U>} that is the result of {@code mapper.apply(t)}.
+ * Tuples will be added to the returned stream in the order the iterator
+ * returns them.
+ *
+ * <BR>
+ * If the return is null or an empty iterator then no tuples are added to
+ * the returned stream for input tuple {@code t}.
+ * <P>
+ * Examples of mapping a stream containing lines of text into a stream
+ * of words split out from each line. The order of the words in the stream
+ * will match the order of the words in the lines.
+ * </P>
+ *
+ * <pre>
+ * <code>
+ * TStream<String> lines = ...
+ * TStream<String> words = lines.flatMap(
+ * line -> Arrays.asList(line.split(" ")));
+ *
+ * </code>
+ * </pre>
+ *
+ * @param <U> Type of mapped input tuples.
+ * @param mapper
+ * Mapper logic to be executed against each tuple.
+ * @return Stream that will contain tuples of type {@code U} mapped and flattened from this
+ * stream's tuples.
+ */
+ <U> TStream<U> flatMap(Function<T, Iterable<U>> mapper);
+
+ /**
+ * Split a stream's tuples among {@code n} streams as specified by
+ * {@code splitter}.
+ *
+ * <P>
+ * For each tuple on the stream, {@code splitter.applyAsInt(tuple)} is
+ * called. The return value {@code r} determines the destination stream:
+ * </P>
+ *
+ * <pre>
+ * if r < 0 the tuple is discarded
+ * else it is sent to the stream at position (r % n) in the returned array.
+ * </pre>
+ *
+ * <P>
+ * Each split {@code TStream} is exposed by the API. The user has full
+ * control over the each stream's processing pipeline. Each stream's
+ * pipeline must be declared explicitly. Each stream can have different
+ * processing pipelines.
+ * </P>
+ * <P>
+ * An N-way {@code split()} is logically equivalent to a collection of N
+ * {@code filter()} invocations, each with a predicate to select the tuples
+ * for its stream. {@code split()} is more efficient. Each tuple is analyzed
+ * only once by a single {@code splitter} instance to identify the
+ * destination stream. For example, these are logically equivalent:
+ * </P>
+ * <pre>
+ * List<TStream<String>> streams = stream.split(2, tuple -> tuple.length());
+ *
+ * TStream<String> stream0 = stream.filter(tuple -> (tuple.length() % 2) == 0);
+ * TStream<String> stream1 = stream.filter(tuple -> (tuple.length() % 2) == 1);
+ * </pre>
+ * <P>
+ * Example of splitting a stream of log records by their level attribute:
+ * </P>
+ *
+ * <pre>
+ * <code>
+ * TStream<LogRecord> lrs = ...
+ * List<<TStream<LogRecord>> splits = lrr.split(3, lr -> {
+ if (SEVERE.equals(lr.getLevel()))
+ return 0;
+ else if (WARNING.equals(lr.getLevel()))
+ return 1;
+ else
+ return 2;
+ });
+ * splits.get(0). ... // SEVERE log record processing pipeline
+ * splits.get(1). ... // WARNING log record processing pipeline
+ * splits.get(2). ... // "other" log record processing pipeline
+ * </code>
+ * </pre>
+ *
+ * @param n
+ * the number of output streams
+ * @param splitter
+ * the splitter function
+ * @return List of {@code n} streams
+ *
+ * @throws IllegalArgumentException
+ * if {@code n <= 0}
+ */
+ List<TStream<T>> split(int n, ToIntFunction<T> splitter);
+
+ /**
+ * Split a stream's tuples among {@code enumClass.size} streams as specified by
+ * {@code splitter}.
+ *
+ * @param <E> Enum type
+ * @param enumClass
+ * enum data to split
+ * @param splitter
+ * the splitter function
+ * @return EnumMap<E,TStream<T>>
+ * @throws IllegalArgumentException
+ * if {@code enumclass.size <= 0}
+ */
+ <E extends Enum<E>> EnumMap<E,TStream<T>> split(Class<E> enumClass, Function<T, E> splitter);
+
+ /**
+ * Declare a stream that contains the same contents as this stream while
+ * peeking at each element using {@code peeker}. <BR>
+ * For each tuple {@code t} on this stream, {@code peeker.accept(t)} will be
+ * called.
+ *
+ * @param peeker
+ * Function to be called for each tuple.
+ * @return {@code this}
+ */
+ TStream<T> peek(Consumer<T> peeker);
+
+ /**
+ * Sink (terminate) this stream using a function. For each tuple {@code t} on this stream
+ * {@link Consumer#accept(Object) sinker.accept(t)} will be called. This is
+ * typically used to send information to external systems, such as databases
+ * or dashboards.
+ * <p>
+ * If {@code sinker} implements {@link AutoCloseable}, its {@code close()}
+ * method will be called when the topology's execution is terminated.
+ * </P>
+ * <P>
+ * Example of terminating a stream of {@code String} tuples by printing them
+ * to {@code System.out}.
+ * </P>
+ *
+ * <pre>
+ * <code>
+ * TStream<String> values = ...
+ * values.sink(t -> System.out.println(tuple));
+ * </code>
+ * </pre>
+ *
+ * @param sinker
+ * Logic to be executed against each tuple on this stream.
+ * @return sink element representing termination of this stream.
+ */
+ TSink<T> sink(Consumer<T> sinker);
+
+ /**
+ * Sink (terminate) this stream using a oplet.
+ * This provides a richer api for a sink than
+ * {@link #sink(Consumer)} with a full life-cycle of
+ * the oplet as well as easy access to
+ * {@link org.apache.edgent.execution.services.RuntimeServices runtime services}.
+ *
+ * @param oplet Oplet processes each tuple without producing output.
+ * @return sink element representing termination of this stream.
+ */
+ TSink<T> sink(Sink<T> oplet);
+
+ /**
+ * Declare a stream that contains the output of the specified {@link Pipe}
+ * oplet applied to this stream.
+ *
+ * @param <U> Tuple type of the returned stream.
+ * @param pipe The {@link Pipe} oplet.
+ *
+ * @return Declared stream that contains the tuples emitted by the pipe
+ * oplet.
+ */
+ <U> TStream<U> pipe(Pipe<T, U> pipe);
+
+ /**
+ * Declare a stream that contains the output of the specified
+ * {@link FanIn} oplet applied to this stream and {@code others}.
+ *
+ * @param <U> Tuple type of the returned streams.
+ * @param fanin The {@link FanIn} oplet.
+ * @param others The other input streams.
+ * Must not be empty or contain duplicates or {@code this}
+ *
+ * @return a stream that contains the tuples emitted by the oplet.
+ * @see #union(Set)
+ * @see #pipe(Pipe)
+ * @see #sink(Sink)
+ */
+ <U> TStream<U> fanin(FanIn<T,U> fanin, List<TStream<T>> others);
+
+ /**
+ * Declare a new stream that modifies each tuple from this stream into one
+ * (or zero) tuple of the same type {@code T}. For each tuple {@code t}
+ * on this stream, the returned stream will contain a tuple that is the
+ * result of {@code modifier.apply(t)} when the return is not {@code null}.
+ * The function may return the same reference as its input {@code t} or
+ * a different object of the same type.
+ * If {@code modifier.apply(t)} returns {@code null} then no tuple
+ * is submitted to the returned stream for {@code t}.
+ *
+ * <P>
+ * Example of modifying a stream {@code String} values by adding the suffix '{@code extra}'.
+ * </P>
+ *
+ * <pre>
+ * <code>
+ * TStream<String> strings = ...
+ * TStream<String> modifiedStrings = strings.modify(t -> t.concat("extra"));
+ * </code>
+ * </pre>
+ *
+ * <P>
+ * This method is equivalent to
+ * {@code map(Function<T,T> modifier}).
+ * </P>
+ *
+ * @param modifier
+ * Modifier logic to be executed against each tuple.
+ * @return Stream that will contain tuples of type {@code T} modified from this
+ * stream's tuples.
+ */
+ TStream<T> modify(UnaryOperator<T> modifier);
+
+ /**
+ * Convert this stream to a stream of {@code String} tuples by calling
+ * {@code toString()} on each tuple. This is equivalent to
+ * {@code map(Object::toString)}.
+ *
+ * @return Declared stream that will contain each the string representation
+ * of each tuple on this stream.
+ */
+ TStream<String> asString();
+
+ /**
+ * Utility method to print the contents of this stream
+ * to {@code System.out} at runtime. Each tuple is printed
+ * using {@code System.out.println(tuple)}.
+ * @return {@code TSink} for the sink processing.
+ */
+ TSink<T> print();
+
+ /**
+ * Declare a partitioned window that continually represents the last {@code count}
+ * tuples on this stream for each partition. Each partition independently maintains the last
+ * {@code count} tuples for each key seen on this stream.
+ * If no tuples have been seen on the stream for a key then the corresponding partition will be empty.
+ * <BR>
+ * The window is partitioned by each tuple's key, obtained by {@code keyFunction}.
+ * For each tuple on the stream {@code keyFunction.apply(tuple)} is called
+ * and the returned value is the tuple's key. For any two tuples {@code ta,tb} in a partition
+ * {@code keyFunction.apply(ta).equals(keyFunction.apply(tb))} is true.
+ * <BR>
+ * The key function must return keys that implement {@code equals()} and {@code hashCode()} correctly.
+ * <P>
+ * To create a window partitioned using the tuple as the key use {@link org.apache.edgent.function.Functions#identity() identity()}
+ * as the key function.
+ * </P>
+ * <P>
+ * To create an unpartitioned window use a key function that returns a constant,
+ * by convention {@link org.apache.edgent.function.Functions#unpartitioned() unpartitioned()} is recommended.
+ * </P>
+ *
+ * @param <K> Key type.
+ *
+ * @param count Number of tuples to maintain in each partition.
+ * @param keyFunction Function that defines the key for each tuple.
+ * @return Window on this stream representing the last {@code count} tuples for each partition.
+ */
+ <K> TWindow<T, K> last(int count, Function<T, K> keyFunction);
+
+ /**
+ * Declare a partitioned window that continually represents the last {@code time} seconds of
+ * tuples on this stream for each partition. If no tuples have been
+ * seen on the stream for a key in the last {@code time} seconds then the partition will be empty.
+ * Each partition independently maintains the last
+ * {@code count} tuples for each key seen on this stream.
+ * <BR>
+ * The window is partitioned by each tuple's key, obtained by {@code keyFunction}.
+ * For each tuple on the stream {@code keyFunction.apply(tuple)} is called
+ * and the returned value is the tuple's key. For any two tuples {@code ta,tb} in a partition
+ * {@code keyFunction.apply(ta).equals(keyFunction.apply(tb))} is true.
+ * <BR>
+ * The key function must return keys that implement {@code equals()} and {@code hashCode()} correctly.
+ * <P>
+ * To create a window partitioned using the tuple as the key use {@link org.apache.edgent.function.Functions#identity() identity()}
+ * as the key function.
+ * </P>
+ * <P>
+ * To create an unpartitioned window use a key function that returns a constant,
+ * by convention {@link org.apache.edgent.function.Functions#unpartitioned() unpartitioned()} is recommended.
+ * </P>
+ *
+ * @param <K> Key type.
+ *
+ * @param time Time to retain a tuple in a partition.
+ * @param unit Unit for {@code time}.
+ * @param keyFunction Function that defines the key for each tuple.
+ * @return Partitioned window on this stream representing the last {@code count} tuple.
+ */
+ <K> TWindow<T, K> last(long time, TimeUnit unit, Function<T, K> keyFunction);
+
+ /**
+ * Declare a stream that will contain all tuples from this stream and
+ * {@code other}. A stream cannot be unioned with itself, in this case
+ * {@code this} will be returned.
+ *
+ * @param other the other stream
+ * @return A stream that is the union of {@code this} and {@code other}.
+ */
+ TStream<T> union(TStream<T> other);
+
+ /**
+ * Declare a stream that will contain all tuples from this stream and all the
+ * streams in {@code others}. A stream cannot be unioned with itself, in
+ * this case the union will only contain tuples from this stream once. If
+ * {@code others} is empty or only contains {@code this} then {@code this}
+ * is returned.
+ *
+ * @param others
+ * Stream to union with this stream.
+ * @return A stream that is the union of {@code this} and {@code others}.
+ */
+ TStream<T> union(Set<TStream<T>> others);
+
+ /**
+ * Adds the specified tags to the stream. Adding the same tag to
+ * a stream multiple times will not change the result beyond the
+ * initial application.
+ *
+ * @param values
+ * Tag values.
+ * @return The tagged stream.
+ */
+ TStream<T> tag(String... values);
+
+ /**
+ * Returns the set of tags associated with this stream.
+ *
+ * @return set of tags
+ */
+ Set<String> getTags();
+
+ /**
+ * Set an alias for the stream.
+ * <p>
+ * The alias must be unique within the topology.
+ * The alias may be used in various contexts:
+ * </p>
+ * <ul>
+ * <li>Runtime control services for the stream are registered with this alias.</li>
+ * </ul>
+ *
+ * @param alias an alias for the stream.
+ * @return this
+ * @throws IllegalStateException if the an alias has already been set.
+ * @see ControlService
+ */
+ TStream<T> alias(String alias);
+
+ /**
+ * Returns the stream's alias if any.
+ * @return the alias. null if one has not be set.
+ */
+ String getAlias();
+
+ /**
+ * Join this stream with a partitioned window of type {@code U} with key type {@code K}.
+ * For each tuple on this stream, it is joined with the contents of {@code window}
+ * for the key {@code keyer.apply(tuple)}. Each tuple is
+ * passed into {@code joiner} and the return value is submitted to the
+ * returned stream. If call returns null then no tuple is submitted.
+ *
+ * @param <J> Tuple type of result stream
+ * @param <U> Tuple type of window to join with
+ * @param <K> Key type
+ * @param keyer Key function for this stream to match the window's key.
+ * @param window Keyed window to join this stream with.
+ * @param joiner Join function.
+ * @return A stream that is the results of joining this stream with
+ * {@code window}.
+ */
+ <J, U, K> TStream<J> join(Function<T, K> keyer, TWindow<U, K> window, BiFunction<T, List<U>, J> joiner);
+
+ /**
+ * Join this stream with the last tuple seen on a stream of type {@code U}
+ * with partitioning.
+ * For each tuple on this
+ * stream, it is joined with the last tuple seen on {@code lastStream}
+ * with a matching key (of type {@code K}).
+ * <BR>
+ * Each tuple {@code t} on this stream will match the last tuple
+ * {@code u} on {@code lastStream} if
+ * {@code keyer.apply(t).equals(lastStreamKeyer.apply(u))}
+ * is true.
+ * <BR>
+ * The assumption is made that
+ * the key classes correctly implement the contract for {@code equals} and
+ * {@code hashCode()}.
+ * <P>Each tuple is
+ * passed into {@code joiner} and the return value is submitted to the
+ * returned stream. If call returns null then no tuple is submitted.
+ * </P>
+ * @param <J> Tuple type of result stream
+ * @param <U> Tuple type of stream to join with
+ * @param <K> Key type
+ * @param keyer Key function for this stream
+ * @param lastStream Stream to join with.
+ * @param lastStreamKeyer Key function for {@code lastStream}
+ * @param joiner Join function.
+ * @return A stream that is the results of joining this stream with
+ * {@code lastStream}.
+ */
+ <J, U, K> TStream<J> joinLast(Function<T, K> keyer, TStream<U> lastStream, Function<U, K> lastStreamKeyer, BiFunction<T, U, J> joiner);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java b/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java
new file mode 100644
index 0000000..2665a76
--- /dev/null
+++ b/api/topology/src/main/java/org/apache/edgent/topology/TWindow.java
@@ -0,0 +1,120 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.topology;
+
+import java.util.List;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+
+/**
+ * Partitioned window of tuples. Logically a window
+ * represents an continuously updated ordered list of tuples according to the
+ * criteria that created it. For example {@link TStream#last(int, Function) s.last(10, zero())}
+ * declares a window with a single partition that at any time contains the last ten tuples seen on
+ * stream {@code s}.
+ * <P>
+ * Windows are partitioned which means the window's configuration
+ * is independently maintained for each key seen on the stream.
+ * For example with a window created using {@link TStream#last(int, Function) last(3, tuple -> tuple.getId())}
+ * then each key has its own window containing the last
+ * three tuples with the same key obtained from the tuple's identity using {@code getId()}.
+ * </P>
+ *
+ * @param <T> Tuple type
+ * @param <K> Partition key type
+ *
+ * @see TStream#last(int, Function) Count based window
+ * @see TStream#last(long, java.util.concurrent.TimeUnit, Function) Time based window
+ */
+public interface TWindow<T, K> extends TopologyElement {
+ /**
+ * Declares a stream that is a continuous, sliding, aggregation of
+ * partitions in this window.
+ * <P>
+ * Changes in a partition's contents trigger an invocation of
+ * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest. The list is stable
+ * during the aggregator invocation.
+ * <UL>
+ * <LI>Count-based window: the aggregator is called after each
+ * tuple added to a partition. When an addition results in a tuple
+ * being evicted, the eviction occurs before the aggregator is called.</LI>
+ * <LI>Time-based window: the aggregator is called after each tuple
+ * added to a partition. The aggregator is also called
+ * each time one or more tuples are evicted from a partition
+ * (multiple tuples may be evicted at once). The list will be
+ * empty if the eviction results in an empty partition.</LI>
+ * </UL>
+ * A non-null {@code aggregator} result is added to the returned stream.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param aggregator
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of partitions in this window.
+ */
+ <U> TStream<U> aggregate(BiFunction<List<T>, K, U> aggregator);
+
+ /**
+ * Declares a stream that represents a batched aggregation of
+ * partitions in this window.
+ * <P>
+ * Each partition "batch" triggers an invocation of
+ * {@code batcher.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest The list is stable
+ * during the batcher invocation.
+ * <UL>
+ * <LI>Count-based window: a batch occurs when the partition is full.</LI>
+ * <LI>Time-based window: a batch occurs every "time" period units. The
+ * list will be empty if no tuples have been received during the period.</LI>
+ * </UL>
+ * A non-null {@code batcher} result is added to the returned stream.
+ * The partition's contents are cleared after a batch is processed.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param <U> Tuple type
+ * @param batcher
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of partitions in this window.
+ */
+ <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);
+
+ /**
+ * Returns the key function used to map tuples to partitions.
+ * @return Key function used to map tuples to partitions.
+ */
+ Function<T, K> getKeyFunction();
+
+ /**
+ * Get the stream that feeds this window.
+ * @return stream that feeds this window.
+ */
+ TStream<T> feeder();
+}