You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:23 UTC

[18/34] incubator-flink git commit: [streaming] Windowing API update + package refactor

[streaming] Windowing API update + package refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2d73c3f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2d73c3f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2d73c3f1

Branch: refs/heads/master
Commit: 2d73c3f168b4006f3100941dc8121c4e4c534c48
Parents: 55ee64f
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Nov 24 19:44:01 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |   2 +-
 .../streaming/api/datastream/DataStream.java    | 722 +++++++++----------
 .../api/datastream/WindowedDataStream.java      | 255 ++++---
 .../operator/GroupedWindowInvokable.java        | 322 +++++++++
 .../operator/GroupedWindowingInvokable.java     | 322 ---------
 .../operator/WindowGroupReduceInvokable.java    |  43 ++
 .../api/invokable/operator/WindowInvokable.java | 368 ++++++++++
 .../operator/WindowReduceInvokable.java         |  59 ++
 .../operator/WindowingGroupInvokable.java       |  43 --
 .../invokable/operator/WindowingInvokable.java  | 368 ----------
 .../operator/WindowingReduceInvokable.java      |  59 --
 .../policy/CloneableEvictionPolicy.java         |   4 +-
 .../policy/CloneableTriggerPolicy.java          |   4 +-
 .../operator/GroupedWindowingInvokableTest.java |  10 +-
 .../operator/WindowingInvokableTest.java        |   8 +-
 15 files changed, 1315 insertions(+), 1274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index a8002ff..a0bc1d6 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -257,7 +257,7 @@ The Flink Streaming API supports different types of aggregation operators simila
 
 Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`, `minBy(fieldPosition, first)`, `maxBy(fieldPosition, first)`
 
-With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. 
+With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. 
 
 With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given fieldposition. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter.
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 2ce28d9..49981a5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -64,6 +64,8 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.partitioner.FieldsPartitioner;
@@ -194,67 +196,6 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Gets the class of the field at the given position
-	 * 
-	 * @param pos
-	 *            Position of the field
-	 * @return The class of the field
-	 */
-	@SuppressWarnings("rawtypes")
-	protected Class<?> getClassAtPos(int pos) {
-		Class<?> type;
-		TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
-		if (outTypeInfo.isTupleType()) {
-			type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
-
-		} else if (outTypeInfo instanceof BasicArrayTypeInfo) {
-
-			type = ((BasicArrayTypeInfo) outTypeInfo).getComponentTypeClass();
-
-		} else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) {
-			Class<?> clazz = outTypeInfo.getTypeClass();
-			if (clazz == boolean[].class) {
-				type = Boolean.class;
-			} else if (clazz == short[].class) {
-				type = Short.class;
-			} else if (clazz == int[].class) {
-				type = Integer.class;
-			} else if (clazz == long[].class) {
-				type = Long.class;
-			} else if (clazz == float[].class) {
-				type = Float.class;
-			} else if (clazz == double[].class) {
-				type = Double.class;
-			} else if (clazz == char[].class) {
-				type = Character.class;
-			} else {
-				throw new IndexOutOfBoundsException("Type could not be determined for array");
-			}
-
-		} else if (pos == 0) {
-			type = outTypeInfo.getTypeClass();
-		} else {
-			throw new IndexOutOfBoundsException("Position is out of range");
-		}
-		return type;
-	}
-
-	/**
-	 * Checks if the given field position is allowed for the output type
-	 * 
-	 * @param pos
-	 *            Position to check
-	 */
-	protected void checkFieldRange(int pos) {
-		try {
-			getClassAtPos(pos);
-		} catch (IndexOutOfBoundsException e) {
-			throw new RuntimeException("Selected field is out of range");
-
-		}
-	}
-
-	/**
 	 * Creates a new {@link DataStream} by merging {@link DataStream} outputs of
 	 * the same type with each other. The DataStreams merged using this operator
 	 * will be transformed simultaneously.
@@ -275,14 +216,6 @@ public class DataStream<OUT> {
 		return returnStream;
 	}
 
-	private void validateMerge(String id) {
-		for (DataStream<OUT> ds : this.mergedStreams) {
-			if (ds.getId().equals(id)) {
-				throw new RuntimeException("A DataStream cannot be merged with itself");
-			}
-		}
-	}
-
 	/**
 	 * Creates a new {@link ConnectedDataStream} by connecting
 	 * {@link DataStream} outputs of different type with each other. The
@@ -297,152 +230,51 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Creates a cross (Cartesian product) of a data stream window. The user can
-	 * implement their own time stamps or use the system time by default.
-	 * 
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * @param dataStreamToCross
-	 * @param windowSize
-	 * @param slideInterval
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowCross(
-			DataStream<IN2> dataStreamToCross, long windowSize, long slideInterval) {
-		return this.windowCross(dataStreamToCross, windowSize, slideInterval,
-				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>());
-	}
-
-	/**
-	 * Creates a cross (Cartesian product) of a data stream window.
-	 * 
-	 * @param dataStreamToCross
-	 *            {@link DataStream} to cross with.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * @param timestamp1
-	 *            User defined time stamps for the first input.
-	 * @param timestamp2
-	 *            User defined time stamps for the second input.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowCross(
-			DataStream<IN2> dataStreamToCross, long windowSize, long slideInterval,
-			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2) {
-		return this.connect(dataStreamToCross).windowCross(windowSize, slideInterval, timestamp1,
-				timestamp2);
-	}
-
-	/**
-	 * Creates a join of a data stream based on the given positions.
-	 * 
-	 * @param dataStreamToJoin
-	 *            {@link DataStream} to join with.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * @param fieldIn1
-	 *            The field in the first stream to be matched.
-	 * @param fieldIn2
-	 *            The field in the second stream to be matched.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
-			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, int fieldIn1,
-			int fieldIn2) {
-		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
-				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
-	}
-
-	/**
-	 * Creates a join of a data stream based on the given positions.
+	 * Groups the elements of a {@link DataStream} by the given key positions to
+	 * be used with grouped operators like
+	 * {@link GroupedDataStream#reduce(ReduceFunction)}
 	 * 
-	 * @param dataStreamToJoin
-	 *            {@link DataStream} to join with.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * @param fieldIn1
-	 *            The field in the first stream to be matched.
-	 * @param fieldIn2
-	 *            The field in the second stream to be matched.
-	 * @return The transformed {@link DataStream}.
+	 * @param fields
+	 *            The position of the fields on which the {@link DataStream}
+	 *            will be grouped.
+	 * @return The grouped {@link DataStream}
 	 */
-	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
-			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, String fieldIn1,
-			String fieldIn2) {
-		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
-				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+	public GroupedDataStream<OUT> groupBy(int... fields) {
+	
+		return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
+	
 	}
 
 	/**
-	 * Creates a join of a data stream based on the given positions.
+	 * Groups a {@link DataStream} using field expressions. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream}S underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }. This method
+	 * returns an {@link GroupedDataStream}.
 	 * 
-	 * @param dataStreamToJoin
-	 *            {@link DataStream} to join with.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * @param timestamp1
-	 *            User defined time stamps for the first input.
-	 * @param timestamp2
-	 *            User defined time stamps for the second input.
-	 * @param fieldIn1
-	 *            The field in the first stream to be matched.
-	 * @param fieldIn2
-	 *            The field in the second stream to be matched.
-	 * @return The transformed {@link DataStream}.
-	 */
-	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
-			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
-			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
-		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
-				timestamp2, fieldIn1, fieldIn2);
+	 * @param fields
+	 *            One or more field expressions on which the DataStream will be
+	 *            grouped.
+	 * @return The grouped {@link DataStream}
+	 **/
+	public GroupedDataStream<OUT> groupBy(String... fields) {
+	
+		return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
+	
 	}
 
 	/**
-	 * Creates a join of a data stream based on the given positions.
+	 * Groups the elements of a {@link DataStream} by the key extracted by the
+	 * {@link KeySelector} to be used with grouped operators like
+	 * {@link GroupedDataStream#reduce(ReduceFunction)}
 	 * 
-	 * @param dataStreamToJoin
-	 *            {@link DataStream} to join with.
-	 * @param windowSize
-	 *            Size of the windows that will be aligned for both streams in
-	 *            milliseconds.
-	 * @param slideInterval
-	 *            After every function call the windows will be slid by this
-	 *            interval.
-	 * @param timestamp1
-	 *            User defined time stamps for the first input.
-	 * @param timestamp2
-	 *            User defined time stamps for the second input.
-	 * @param fieldIn1
-	 *            The field in the first stream to be matched.
-	 * @param fieldIn2
-	 *            The field in the second stream to be matched.
-	 * @return The transformed {@link DataStream}.
+	 * @param keySelector
+	 *            The {@link KeySelector} that will be used to extract keys for
+	 *            the values
+	 * @return The grouped {@link DataStream}
 	 */
-	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
-			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
-			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
-		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
-				timestamp2, fieldIn1, fieldIn2);
+	public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
+		return new GroupedDataStream<OUT>(this, keySelector);
 	}
 
 	/**
@@ -454,7 +286,7 @@ public class DataStream<OUT> {
 	 * @return The DataStream with fields partitioning set.
 	 */
 	public DataStream<OUT> partitionBy(int... fields) {
-
+	
 		return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
 				getOutputType(), fields)));
 	}
@@ -468,7 +300,7 @@ public class DataStream<OUT> {
 	 * @return The DataStream with fields partitioning set.
 	 */
 	public DataStream<OUT> partitionBy(String... fields) {
-
+	
 		return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(
 				getOutputType(), fields)));
 	}
@@ -526,6 +358,32 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Initiates an iterative part of the program that executes multiple times
+	 * and feeds back data streams. The iterative part needs to be closed by
+	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The
+	 * transformation of this IterativeDataStream will be the iteration head.
+	 * The data stream given to the {@code closeWith(DataStream)} method is the
+	 * data stream that will be fed back and used as the input for the iteration
+	 * head. Unlike in batch processing by default the output of the iteration
+	 * stream is directed to both to the iteration head and the next component.
+	 * To direct tuples to the iteration head or the output specifically one can
+	 * use the {@code split(OutputSelector)} on the iteration tail while
+	 * referencing the iteration head as 'iterate'.
+	 * <p>
+	 * The iteration edge will be partitioned the same way as the first input of
+	 * the iteration head.
+	 * <p>
+	 * By default a DataStream with iteration will never terminate, but the user
+	 * can use the {@link IterativeDataStream#setMaxWaitTime} call to set a max
+	 * waiting time for the iteration.
+	 * 
+	 * @return The iterative data stream created.
+	 */
+	public IterativeDataStream<OUT> iterate() {
+		return new IterativeDataStream<OUT>(this);
+	}
+
+	/**
 	 * Applies a Map transformation on a {@link DataStream}. The transformation
 	 * calls a {@link MapFunction} for each element of the DataStream. Each
 	 * MapFunction call returns exactly one element. The user can also extend
@@ -593,6 +451,28 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies a Filter transformation on a {@link DataStream}. The
+	 * transformation calls a {@link FilterFunction} for each element of the
+	 * DataStream and retains only those element for which the function returns
+	 * true. Elements for which the function returns false are filtered. The
+	 * user can also extend {@link RichFilterFunction} to gain access to other
+	 * features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * 
+	 * @param filter
+	 *            The FilterFunction that is called for each element of the
+	 *            DataSet.
+	 * @return The filtered DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
+		FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
+				FilterFunction.class, 0);
+	
+		return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
+				filter));
+	}
+
+	/**
 	 * Initiates a Project transformation on a {@link Tuple} {@link DataStream}.<br/>
 	 * <b>Note: Only Tuple DataStreams can be projected.</b></br> The
 	 * transformation projects each Tuple of the DataSet onto a (sub)set of
@@ -616,69 +496,6 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Groups the elements of a {@link DataStream} by the given key positions to
-	 * be used with grouped operators like
-	 * {@link GroupedDataStream#reduce(ReduceFunction)}
-	 * 
-	 * @param fields
-	 *            The position of the fields on which the {@link DataStream}
-	 *            will be grouped.
-	 * @return The grouped {@link DataStream}
-	 */
-	public GroupedDataStream<OUT> groupBy(int... fields) {
-
-		return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
-
-	}
-
-	/**
-	 * Groups a {@link DataStream} using field expressions. A field expression
-	 * is either the name of a public field or a getter method with parentheses
-	 * of the {@link DataStream}S underlying type. A dot can be used to drill
-	 * down into objects, as in {@code "field1.getInnerField2()" }. This method
-	 * returns an {@link GroupedDataStream}.
-	 * 
-	 * @param fields
-	 *            One or more field expressions on which the DataStream will be
-	 *            grouped.
-	 * @return The grouped {@link DataStream}
-	 **/
-	public GroupedDataStream<OUT> groupBy(String... fields) {
-
-		return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
-
-	}
-
-	/**
-	 * Groups the elements of a {@link DataStream} by the key extracted by the
-	 * {@link KeySelector} to be used with grouped operators like
-	 * {@link GroupedDataStream#reduce(ReduceFunction)}
-	 * 
-	 * @param keySelector
-	 *            The {@link KeySelector} that will be used to extract keys for
-	 *            the values
-	 * @return The grouped {@link DataStream}
-	 */
-	public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
-		return new GroupedDataStream<OUT>(this, keySelector);
-	}
-
-	/**
-	 * This allows you to set up windowing through a nice API using
-	 * {@link WindowingHelper} such as {@link Time}, {@link Count} and
-	 * {@link Delta}.
-	 * 
-	 * @param policyHelpers
-	 *            Any {@link WindowingHelper} such as {@link Time},
-	 *            {@link Count} and {@link Delta}.
-	 * @return A {@link WindowedDataStream} providing further operations.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
-		return new WindowedDataStream<OUT>(this, policyHelpers);
-	}
-
-	/**
 	 * Applies an aggregation that sums the data stream at the given position.
 	 * 
 	 * @param positionToSum
@@ -709,15 +526,6 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Syntactic sugar for sum(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> sum() {
-		return sum(0);
-	}
-
-	/**
 	 * Applies an aggregation that that gives the minimum of the data stream at
 	 * the given position.
 	 * 
@@ -839,15 +647,6 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Syntactic sugar for min(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> min() {
-		return min(0);
-	}
-
-	/**
 	 * Applies an aggregation that gives the maximum of the data stream at the
 	 * given position.
 	 * 
@@ -876,76 +675,222 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position, if more elements have the maximum
-	 * value at the given position, the operator returns either the first or
-	 * last one, depending on the parameter set.
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position, if more elements have the maximum
+	 * value at the given position, the operator returns either the first or
+	 * last one, depending on the parameter set.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize.
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
+		checkFieldRange(positionToMaxBy);
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+				AggregationType.MAXBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that gives the count of the values.
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<Long, ?> count() {
+		TypeWrapper<OUT> inTypeWrapper = outTypeWrapper;
+		TypeWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(Long.valueOf(0));
+
+		return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
+				new CounterInvokable<OUT>());
+	}
+
+	/**
+	 * This allows you to set up windowing through a nice API using
+	 * {@link WindowingHelper} such as {@link Time}, {@link Count} and
+	 * {@link Delta}. Windowing allows the user to apply different user defined
+	 * functions on predefined chunks of the data stream. For example a reducer
+	 * could be applied on every 5 seconds of data to count the elements in that
+	 * time window.
+	 * 
+	 * @param policyHelpers
+	 *            Any {@link WindowingHelper} such as {@link Time},
+	 *            {@link Count} and {@link Delta}.
+	 * @return A {@link WindowedDataStream} providing further operations.
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
+		return new WindowedDataStream<OUT>(this, policyHelpers);
+	}
+
+	/**
+	 * Set up a windowed data stream using the given {@link TriggerPolicy}s and
+	 * {@link EvictionPolicy}s. Windowing allows the user to apply different
+	 * user defined functions on predefined chunks of the data stream. For
+	 * example a reducer could be applied on every 5 seconds of data to count
+	 * the elements in that time window.
+	 * 
+	 * @param triggers
+	 *            The list of {@link TriggerPolicy}s that will determine how
+	 *            often the user function is called on the window.
+	 * @param evicters
+	 *            The list of {@link EvictionPolicy}s that will determine the
+	 *            number of elements in each time window.
+	 * @return A {@link WindowedDataStream} providing further operations.
+	 */
+	public WindowedDataStream<OUT> window(List<TriggerPolicy<OUT>> triggers,
+			List<EvictionPolicy<OUT>> evicters) {
+		return new WindowedDataStream<OUT>(this, triggers, evicters);
+	}
+
+	/**
+	 * Creates a cross (Cartesian product) of a data stream window. The user can
+	 * implement their own time stamps or use the system time by default.
+	 * 
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param dataStreamToCross
+	 * @param windowSize
+	 * @param slideInterval
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowCross(
+			DataStream<IN2> dataStreamToCross, long windowSize, long slideInterval) {
+		return this.windowCross(dataStreamToCross, windowSize, slideInterval,
+				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>());
+	}
+
+	/**
+	 * Creates a cross (Cartesian product) of a data stream window.
 	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize.
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
+	 * @param dataStreamToCross
+	 *            {@link DataStream} to cross with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param timestamp1
+	 *            User defined time stamps for the first input.
+	 * @param timestamp2
+	 *            User defined time stamps for the second input.
+	 * @return The transformed {@link DataStream}.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
-		checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
-				AggregationType.MAXBY, first));
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowCross(
+			DataStream<IN2> dataStreamToCross, long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2) {
+		return this.connect(dataStreamToCross).windowCross(windowSize, slideInterval, timestamp1,
+				timestamp2);
 	}
 
 	/**
-	 * Syntactic sugar for max(0)
+	 * Creates a join of a data stream based on the given positions.
 	 * 
-	 * @return The transformed DataStream.
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> max() {
-		return max(0);
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, int fieldIn1,
+			int fieldIn2) {
+		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
 	}
 
 	/**
-	 * Applies an aggregation that gives the count of the values.
+	 * Creates a join of a data stream based on the given positions.
 	 * 
-	 * @return The transformed DataStream.
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
 	 */
-	public SingleOutputStreamOperator<Long, ?> count() {
-		TypeWrapper<OUT> inTypeWrapper = outTypeWrapper;
-		TypeWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(Long.valueOf(0));
-
-		return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
-				new CounterInvokable<OUT>());
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, String fieldIn1,
+			String fieldIn2) {
+		return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+				new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
 	}
 
-	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-
-		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
-
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
-				outTypeWrapper, outTypeWrapper, invokable);
-
-		return returnStream;
+	/**
+	 * Creates a join of a data stream based on the given positions.
+	 * 
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param timestamp1
+	 *            User defined time stamps for the first input.
+	 * @param timestamp2
+	 *            User defined time stamps for the second input.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
+		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+				timestamp2, fieldIn1, fieldIn2);
 	}
 
 	/**
-	 * Applies a Filter transformation on a {@link DataStream}. The
-	 * transformation calls a {@link FilterFunction} for each element of the
-	 * DataStream and retains only those element for which the function returns
-	 * true. Elements for which the function returns false are filtered. The
-	 * user can also extend {@link RichFilterFunction} to gain access to other
-	 * features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * Creates a join of a data stream based on the given positions.
 	 * 
-	 * @param filter
-	 *            The FilterFunction that is called for each element of the
-	 *            DataSet.
-	 * @return The filtered DataStream.
+	 * @param dataStreamToJoin
+	 *            {@link DataStream} to join with.
+	 * @param windowSize
+	 *            Size of the windows that will be aligned for both streams in
+	 *            milliseconds.
+	 * @param slideInterval
+	 *            After every function call the windows will be slid by this
+	 *            interval.
+	 * @param timestamp1
+	 *            User defined time stamps for the first input.
+	 * @param timestamp2
+	 *            User defined time stamps for the second input.
+	 * @param fieldIn1
+	 *            The field in the first stream to be matched.
+	 * @param fieldIn2
+	 *            The field in the second stream to be matched.
+	 * @return The transformed {@link DataStream}.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
-				FilterFunction.class, 0);
-
-		return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
-				filter));
+	public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+			DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+		return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+				timestamp2, fieldIn1, fieldIn2);
 	}
 
 	/**
@@ -1262,30 +1207,14 @@ public class DataStream<OUT> {
 		return returnStream;
 	}
 
-	/**
-	 * Initiates an iterative part of the program that executes multiple times
-	 * and feeds back data streams. The iterative part needs to be closed by
-	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The
-	 * transformation of this IterativeDataStream will be the iteration head.
-	 * The data stream given to the {@code closeWith(DataStream)} method is the
-	 * data stream that will be fed back and used as the input for the iteration
-	 * head. Unlike in batch processing by default the output of the iteration
-	 * stream is directed to both to the iteration head and the next component.
-	 * To direct tuples to the iteration head or the output specifically one can
-	 * use the {@code split(OutputSelector)} on the iteration tail while
-	 * referencing the iteration head as 'iterate'.
-	 * <p>
-	 * The iteration edge will be partitioned the same way as the first input of
-	 * the iteration head.
-	 * <p>
-	 * By default a DataStream with iteration will never terminate, but the user
-	 * can use the {@link IterativeDataStream#setMaxWaitTime} call to set a max
-	 * waiting time for the iteration.
-	 * 
-	 * @return The iterative data stream created.
-	 */
-	public IterativeDataStream<OUT> iterate() {
-		return new IterativeDataStream<OUT>(this);
+	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
+	
+		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
+	
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
+				outTypeWrapper, outTypeWrapper, invokable);
+	
+		return returnStream;
 	}
 
 	protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
@@ -1414,6 +1343,75 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Gets the class of the field at the given position
+	 * 
+	 * @param pos
+	 *            Position of the field
+	 * @return The class of the field
+	 */
+	@SuppressWarnings("rawtypes")
+	protected Class<?> getClassAtPos(int pos) {
+		Class<?> type;
+		TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
+		if (outTypeInfo.isTupleType()) {
+			type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
+
+		} else if (outTypeInfo instanceof BasicArrayTypeInfo) {
+
+			type = ((BasicArrayTypeInfo) outTypeInfo).getComponentTypeClass();
+
+		} else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) {
+			Class<?> clazz = outTypeInfo.getTypeClass();
+			if (clazz == boolean[].class) {
+				type = Boolean.class;
+			} else if (clazz == short[].class) {
+				type = Short.class;
+			} else if (clazz == int[].class) {
+				type = Integer.class;
+			} else if (clazz == long[].class) {
+				type = Long.class;
+			} else if (clazz == float[].class) {
+				type = Float.class;
+			} else if (clazz == double[].class) {
+				type = Double.class;
+			} else if (clazz == char[].class) {
+				type = Character.class;
+			} else {
+				throw new IndexOutOfBoundsException("Type could not be determined for array");
+			}
+
+		} else if (pos == 0) {
+			type = outTypeInfo.getTypeClass();
+		} else {
+			throw new IndexOutOfBoundsException("Position is out of range");
+		}
+		return type;
+	}
+
+	/**
+	 * Checks if the given field position is allowed for the output type
+	 * 
+	 * @param pos
+	 *            Position to check
+	 */
+	protected void checkFieldRange(int pos) {
+		try {
+			getClassAtPos(pos);
+		} catch (IndexOutOfBoundsException e) {
+			throw new RuntimeException("Selected field is out of range");
+
+		}
+	}
+
+	private void validateMerge(String id) {
+		for (DataStream<OUT> ds : this.mergedStreams) {
+			if (ds.getId().equals(id)) {
+				throw new RuntimeException("A DataStream cannot be merged with itself");
+			}
+		}
+	}
+
+	/**
 	 * Creates a copy of the {@link DataStream}
 	 * 
 	 * @return The copy

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index dd26e64..622c03d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -30,9 +30,9 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A
 import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowingGroupInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowingReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -57,14 +57,17 @@ public class WindowedDataStream<OUT> {
 	protected boolean isGrouped;
 	protected KeySelector<OUT, ?> keySelector;
 
-	protected List<WindowingHelper<OUT>> triggerPolicies;
-	protected List<WindowingHelper<OUT>> evictionPolicies;
+	protected List<WindowingHelper<OUT>> triggerHelpers;
+	protected List<WindowingHelper<OUT>> evictionHelpers;
+
+	protected LinkedList<TriggerPolicy<OUT>> userTriggers;
+	protected LinkedList<EvictionPolicy<OUT>> userEvicters;
 
 	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) {
 		this.dataStream = dataStream.copy();
-		this.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
+		this.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
 		for (WindowingHelper<OUT> helper : policyHelpers) {
-			this.triggerPolicies.add(helper);
+			this.triggerHelpers.add(helper);
 		}
 
 		if (dataStream instanceof GroupedDataStream) {
@@ -76,86 +79,49 @@ public class WindowedDataStream<OUT> {
 		}
 	}
 
-	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
-		this.dataStream = windowedDataStream.dataStream.copy();
-		this.isGrouped = windowedDataStream.isGrouped;
-		this.keySelector = windowedDataStream.keySelector;
-		this.triggerPolicies = windowedDataStream.triggerPolicies;
-		this.evictionPolicies = windowedDataStream.evictionPolicies;
-	}
-
-	protected LinkedList<TriggerPolicy<OUT>> getTriggers() {
-		LinkedList<TriggerPolicy<OUT>> triggerPolicyList = new LinkedList<TriggerPolicy<OUT>>();
+	protected WindowedDataStream(DataStream<OUT> dataStream, List<TriggerPolicy<OUT>> triggers,
+			List<EvictionPolicy<OUT>> evicters) {
+		this.dataStream = dataStream.copy();
 
-		for (WindowingHelper<OUT> helper : triggerPolicies) {
-			triggerPolicyList.add(helper.toTrigger());
+		if (triggers != null) {
+			this.userTriggers = new LinkedList<TriggerPolicy<OUT>>();
+			this.userTriggers.addAll(triggers);
 		}
 
-		return triggerPolicyList;
-	}
+		if (evicters != null) {
+			this.userEvicters = new LinkedList<EvictionPolicy<OUT>>();
+			this.userEvicters.addAll(evicters);
+		}
 
-	protected LinkedList<EvictionPolicy<OUT>> getEvicters() {
-		LinkedList<EvictionPolicy<OUT>> evictionPolicyList = new LinkedList<EvictionPolicy<OUT>>();
+		if (dataStream instanceof GroupedDataStream) {
+			this.isGrouped = true;
+			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
 
-		if (evictionPolicies != null) {
-			for (WindowingHelper<OUT> helper : evictionPolicies) {
-				evictionPolicyList.add(helper.toEvict());
-			}
 		} else {
-			evictionPolicyList.add(new TumblingEvictionPolicy<OUT>());
+			this.isGrouped = false;
 		}
-
-		return evictionPolicyList;
 	}
 
-	protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
-		LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
-		// Add Time triggers to central triggers
-		for (TriggerPolicy<OUT> trigger : getTriggers()) {
-			if (trigger instanceof TimeTriggerPolicy) {
-				cTriggers.add(trigger);
-			}
-		}
-		return cTriggers;
+	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
+		this.dataStream = windowedDataStream.dataStream.copy();
+		this.isGrouped = windowedDataStream.isGrouped;
+		this.keySelector = windowedDataStream.keySelector;
+		this.triggerHelpers = windowedDataStream.triggerHelpers;
+		this.evictionHelpers = windowedDataStream.evictionHelpers;
+		this.userTriggers = windowedDataStream.userTriggers;
+		this.userEvicters = windowedDataStream.userEvicters;
 	}
 
-	protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
-		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
-
-		// Everything except Time triggers are distributed
-		for (TriggerPolicy<OUT> trigger : getTriggers()) {
-			if (!(trigger instanceof TimeTriggerPolicy)) {
-				dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
-			}
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public WindowedDataStream<OUT> every(WindowingHelper... policyHelpers) {
+		WindowedDataStream<OUT> ret = this.copy();
+		if (ret.evictionHelpers == null) {
+			ret.evictionHelpers = ret.triggerHelpers;
+			ret.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
 		}
-
-		return dTriggers;
-	}
-
-	protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
-		LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
-
-		for (EvictionPolicy<OUT> evicter : getEvicters()) {
-			evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+		for (WindowingHelper<OUT> helper : policyHelpers) {
+			ret.triggerHelpers.add(helper);
 		}
-
-		return evicters;
-	}
-
-	/**
-	 * Groups the elements of the {@link WindowedDataStream} by the given
-	 * {@link KeySelector} to be used with grouped operators.
-	 * 
-	 * @param keySelector
-	 *            The specification of the key on which the
-	 *            {@link WindowedDataStream} will be grouped.
-	 * @return The transformed {@link WindowedDataStream}
-	 */
-	public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
-		WindowedDataStream<OUT> ret = this.copy();
-		ret.dataStream = ret.dataStream.groupBy(keySelector);
-		ret.isGrouped = true;
-		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
 		return ret;
 	}
 
@@ -192,12 +158,29 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
+	 * Groups the elements of the {@link WindowedDataStream} by the given
+	 * {@link KeySelector} to be used with grouped operators.
+	 * 
+	 * @param keySelector
+	 *            The specification of the key on which the
+	 *            {@link WindowedDataStream} will be grouped.
+	 * @return The transformed {@link WindowedDataStream}
+	 */
+	public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
+		WindowedDataStream<OUT> ret = this.copy();
+		ret.dataStream = ret.dataStream.groupBy(keySelector);
+		ret.isGrouped = true;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+		return ret;
+	}
+
+	/**
 	 * This is a prototype implementation for new windowing features based on
 	 * trigger and eviction policies
 	 * 
-	 * @param triggerPolicies
+	 * @param triggerHelpers
 	 *            A list of trigger policies
-	 * @param evictionPolicies
+	 * @param evictionHelpers
 	 *            A list of eviction policies
 	 * @param sample
 	 *            A sample of the OUT data type required to gather type
@@ -442,63 +425,123 @@ public class WindowedDataStream<OUT> {
 				AggregationType.MAXBY, first));
 	}
 
-	/**
-	 * Gets the output type.
-	 * 
-	 * @return The output type.
-	 */
-	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
+	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
+		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
+
+		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
+				aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+
+		return returnStream;
 	}
 
-	protected WindowedDataStream<OUT> copy() {
-		return new WindowedDataStream<OUT>(this);
+	private LinkedList<TriggerPolicy<OUT>> getTriggers() {
+
+		LinkedList<TriggerPolicy<OUT>> triggers = new LinkedList<TriggerPolicy<OUT>>();
+
+		if (triggerHelpers != null) {
+			for (WindowingHelper<OUT> helper : triggerHelpers) {
+				triggers.add(helper.toTrigger());
+			}
+		}
+
+		if (userTriggers != null) {
+			triggers.addAll(userTriggers);
+		}
+
+		return triggers;
+
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public WindowedDataStream<OUT> every(WindowingHelper... policyHelpers) {
-		WindowedDataStream<OUT> ret = this.copy();
-		if (ret.evictionPolicies == null) {
-			ret.evictionPolicies = ret.triggerPolicies;
-			ret.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
+	private LinkedList<EvictionPolicy<OUT>> getEvicters() {
+
+		LinkedList<EvictionPolicy<OUT>> evicters = new LinkedList<EvictionPolicy<OUT>>();
+
+		if (evictionHelpers != null) {
+			for (WindowingHelper<OUT> helper : evictionHelpers) {
+				evicters.add(helper.toEvict());
+			}
+		} else {
+			if (userEvicters == null) {
+				evicters.add(new TumblingEvictionPolicy<OUT>());
+			}
 		}
-		for (WindowingHelper<OUT> helper : policyHelpers) {
-			ret.triggerPolicies.add(helper);
+
+		if (userEvicters != null) {
+			evicters.addAll(userEvicters);
 		}
-		return ret;
+
+		return evicters;
 	}
 
-	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
-		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
+	private LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
+		LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
+		// Add Time triggers to central triggers
+		for (TriggerPolicy<OUT> trigger : getTriggers()) {
+			if (trigger instanceof TimeTriggerPolicy) {
+				cTriggers.add(trigger);
+			}
+		}
+		return cTriggers;
+	}
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
-				aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+	private LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
+		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
 
-		return returnStream;
+		// Everything except Time triggers are distributed
+		for (TriggerPolicy<OUT> trigger : getTriggers()) {
+			if (!(trigger instanceof TimeTriggerPolicy)) {
+				dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+			}
+		}
+
+		return dTriggers;
 	}
 
-	protected <R> StreamInvokable<OUT, R> getReduceGroupInvokable(
-			GroupReduceFunction<OUT, R> reducer) {
+	private LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
+		LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+
+		for (EvictionPolicy<OUT> evicter : getEvicters()) {
+			evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+		}
+
+		return evicters;
+	}
+
+	private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
 		StreamInvokable<OUT, R> invokable;
 		if (isGrouped) {
-			invokable = new GroupedWindowingInvokable<OUT, R>(reducer, keySelector,
+			invokable = new GroupedWindowInvokable<OUT, R>(reducer, keySelector,
 					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
 
 		} else {
-			invokable = new WindowingGroupInvokable<OUT, R>(reducer, getTriggers(), getEvicters());
+			invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, getTriggers(),
+					getEvicters());
 		}
 		return invokable;
 	}
 
-	protected StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+	private StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
 		StreamInvokable<OUT, OUT> invokable;
 		if (isGrouped) {
-			invokable = new GroupedWindowingInvokable<OUT, OUT>(reducer, keySelector,
+			invokable = new GroupedWindowInvokable<OUT, OUT>(reducer, keySelector,
 					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
 
 		} else {
-			invokable = new WindowingReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
+			invokable = new WindowReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
 		}
 		return invokable;
 	}
+
+	/**
+	 * Gets the output type.
+	 * 
+	 * @return The output type.
+	 */
+	public TypeInformation<OUT> getOutputType() {
+		return dataStream.getOutputType();
+	}
+
+	protected WindowedDataStream<OUT> copy() {
+		return new WindowedDataStream<OUT>(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
new file mode 100644
index 0000000..844a488
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This invokable allows windowing based on {@link TriggerPolicy} and
+ * {@link EvictionPolicy} instances including their active and cloneable
+ * versions. It is additionally aware of the creation of windows per group.
+ * 
+ * A {@link KeySelector} is used to specify the key position or key extraction.
+ * The {@link ReduceFunction} will be executed on each group separately. Trigger
+ * policies might either be centralized or distributed. Eviction policies are
+ * always distributed. A distributed policy have to be a
+ * {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it will
+ * be cloned to have separated instances for each group. At the startup time the
+ * distributed policies will be stored as sample, and only clones of them will
+ * be used to maintain the groups. Therefore, each group starts with the initial
+ * policy states.
+ * 
+ * While a distributed policy only gets notified with the elements belonging to
+ * the respective group, a centralized policy get notified with all arriving
+ * elements. When a centralized trigger occurred, all groups get triggered. This
+ * is done by submitting the element which caused the trigger as real element to
+ * the groups it belongs to and as fake element to all other groups. Within the
+ * groups the element might be further processed, causing more triggered,
+ * prenotifications of active distributed policies and evictions like usual.
+ * 
+ * Central policies can be instance of {@link ActiveTriggerPolicy} and also
+ * implement the
+ * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
+ * method. Fake elements created on prenotification will be forwarded to all
+ * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
+ * it forwards/distributed calls all groups.
+ *
+ * @param <IN>
+ *            The type of input elements handled by this operator invokable.
+ */
+public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
+
+	/**
+	 * Auto-generated serial version UID
+	 */
+	private static final long serialVersionUID = -3469545957144404137L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(GroupedWindowInvokable.class);
+
+	private KeySelector<IN, ?> keySelector;
+	private Configuration parameters;
+	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
+	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
+	private Map<Object, WindowInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
+	private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+
+	/**
+	 * This constructor creates an instance of the grouped windowing invokable.
+	 * A {@link KeySelector} is used to specify the key position or key
+	 * extraction. The {@link ReduceFunction} will be executed on each group
+	 * separately. Trigger policies might either be centralized or distributed.
+	 * Eviction policies are always distributed. A distributed policy have to be
+	 * a {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it
+	 * will be cloned to have separated instances for each group. At the startup
+	 * time the distributed policies will be stored as sample, and only clones
+	 * of them will be used to maintain the groups. Therefore, each group starts
+	 * with the initial policy states.
+	 * 
+	 * While a distributed policy only gets notified with the elements belonging
+	 * to the respective group, a centralized policy get notified with all
+	 * arriving elements. When a centralized trigger occurred, all groups get
+	 * triggered. This is done by submitting the element which caused the
+	 * trigger as real element to the groups it belongs to and as fake element
+	 * to all other groups. Within the groups the element might be further
+	 * processed, causing more triggered, prenotifications of active distributed
+	 * policies and evictions like usual.
+	 * 
+	 * Central policies can be instance of {@link ActiveTriggerPolicy} and also
+	 * implement the
+	 * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
+	 * method. Fake elements created on prenotification will be forwarded to all
+	 * groups. The {@link ActiveTriggerCallback} is also implemented in a way,
+	 * that it forwards/distributed calls all groups.
+	 * 
+	 * @param userFunction
+	 *            The user defined function.
+	 * @param keySelector
+	 *            A key selector to extract the key for the groups from the
+	 *            input data.
+	 * @param distributedTriggerPolicies
+	 *            Trigger policies to be distributed and maintained individually
+	 *            within each group.
+	 * @param distributedEvictionPolicies
+	 *            Eviction policies to be distributed and maintained
+	 *            individually within each group. There are no central eviction
+	 *            policies because there is no central element buffer but only a
+	 *            buffer per group. Therefore evictions might always be done per
+	 *            group.
+	 * @param centralTriggerPolicies
+	 *            Trigger policies which will only exist once at a central
+	 *            place. In case a central policy triggers, it will cause all
+	 *            groups to be emitted. (Remark: Empty groups cannot be emitted.
+	 *            If only one element is contained a group, this element itself
+	 *            is returned as aggregated result.)
+	 */
+	public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
+			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
+			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
+			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
+
+		super(userFunction);
+		this.keySelector = keySelector;
+		this.centralTriggerPolicies = centralTriggerPolicies;
+		this.distributedTriggerPolicies = distributedTriggerPolicies;
+		this.distributedEvictionPolicies = distributedEvictionPolicies;
+
+		for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
+			if (trigger instanceof ActiveTriggerPolicy) {
+				this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+			}
+		}
+	}
+
+	@Override
+	protected void immutableInvoke() throws Exception {
+		// Prevent empty data streams
+		if ((reuse = recordIterator.next(reuse)) == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+
+		// Continuously run
+		while (reuse != null) {
+			WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
+					.getKey(reuse.getObject()));
+			if (groupInvokable == null) {
+				groupInvokable = makeNewGroup(reuse);
+			}
+
+			// Run the precalls for central active triggers
+			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
+				Object[] result = trigger.preNotifyTrigger(reuse.getObject());
+				for (Object in : result) {
+					for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
+						group.processFakeElement(in, trigger);
+					}
+				}
+			}
+
+			// Process non-active central triggers
+			for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
+				if (triggerPolicy.notifyTrigger(reuse.getObject())) {
+					currentTriggerPolicies.add(triggerPolicy);
+				}
+			}
+
+			if (currentTriggerPolicies.isEmpty()) {
+				// only add the element to its group
+				groupInvokable.processRealElement(reuse.getObject());
+			} else {
+				// call user function for all groups
+				for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
+					if (group == groupInvokable) {
+						// process real with initialized policies
+						group.processRealElement(reuse.getObject(), currentTriggerPolicies);
+					} else {
+						// process like a fake but also initialized with
+						// policies
+						group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
+					}
+				}
+			}
+
+			// clear current trigger list
+			currentTriggerPolicies.clear();
+
+			// Recreate the reuse-StremRecord object and load next StreamRecord
+			resetReuse();
+			reuse = recordIterator.next(reuse);
+		}
+
+		// Stop all remaining threads from policies
+		for (Thread t : activePolicyThreads) {
+			t.interrupt();
+		}
+
+		// finally trigger the buffer.
+		for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
+			group.emitFinalWindow(centralTriggerPolicies);
+		}
+
+	}
+
+	/**
+	 * This method creates a new group. The method gets called in case an
+	 * element arrives which has a key which was not seen before. The method
+	 * created a nested {@link WindowInvokable} and therefore created clones
+	 * of all distributed trigger and eviction policies.
+	 * 
+	 * @param element
+	 *            The element which leads to the generation of a new group
+	 *            (previously unseen key)
+	 * @throws Exception
+	 *             In case the {@link KeySelector} throws an exception in
+	 *             {@link KeySelector#getKey(Object)}, the exception is not
+	 *             catched by this method.
+	 */
+	@SuppressWarnings("unchecked")
+	private WindowInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception {
+		// clone the policies
+		LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+		LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
+		for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) {
+			clonedDistributedTriggerPolicies.add(trigger.clone());
+		}
+		for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) {
+			clonedDistributedEvictionPolicies.add(eviction.clone());
+		}
+
+		WindowInvokable<IN, OUT> groupInvokable;
+		if (userFunction instanceof ReduceFunction) {
+			groupInvokable = (WindowInvokable<IN, OUT>) new WindowReduceInvokable<IN>(
+					(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
+					clonedDistributedEvictionPolicies);
+		} else {
+			groupInvokable = new WindowGroupReduceInvokable<IN, OUT>(
+					(GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies,
+					clonedDistributedEvictionPolicies);
+		}
+
+		groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
+		groupInvokable.open(this.parameters);
+		windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
+
+		return groupInvokable;
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
+		}
+		immutableInvoke();
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		// This method gets never called directly. The user function calls are
+		// all delegated to the invokable instanced which handle/represent the
+		// groups.
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.parameters = parameters;
+		for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) {
+			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
+			if (target != null) {
+				Thread thread = new Thread(target);
+				activePolicyThreads.add(thread);
+				thread.start();
+			}
+		}
+	};
+
+	/**
+	 * This callback class allows to handle the the callbacks done by threads
+	 * defined in active trigger policies
+	 * 
+	 * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
+	 */
+	private class WindowingCallback implements ActiveTriggerCallback {
+		private ActiveTriggerPolicy<IN> policy;
+
+		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+			this.policy = policy;
+		}
+
+		@Override
+		public void sendFakeElement(Object datapoint) {
+			for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
+				group.processFakeElement(datapoint, policy);
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
deleted file mode 100644
index c341e56..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This invokable allows windowing based on {@link TriggerPolicy} and
- * {@link EvictionPolicy} instances including their active and cloneable
- * versions. It is additionally aware of the creation of windows per group.
- * 
- * A {@link KeySelector} is used to specify the key position or key extraction.
- * The {@link ReduceFunction} will be executed on each group separately. Trigger
- * policies might either be centralized or distributed. Eviction policies are
- * always distributed. A distributed policy have to be a
- * {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it will
- * be cloned to have separated instances for each group. At the startup time the
- * distributed policies will be stored as sample, and only clones of them will
- * be used to maintain the groups. Therefore, each group starts with the initial
- * policy states.
- * 
- * While a distributed policy only gets notified with the elements belonging to
- * the respective group, a centralized policy get notified with all arriving
- * elements. When a centralized trigger occurred, all groups get triggered. This
- * is done by submitting the element which caused the trigger as real element to
- * the groups it belongs to and as fake element to all other groups. Within the
- * groups the element might be further processed, causing more triggered,
- * prenotifications of active distributed policies and evictions like usual.
- * 
- * Central policies can be instance of {@link ActiveTriggerPolicy} and also
- * implement the
- * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
- * method. Fake elements created on prenotification will be forwarded to all
- * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
- * it forwards/distributed calls all groups.
- *
- * @param <IN>
- *            The type of input elements handled by this operator invokable.
- */
-public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
-
-	/**
-	 * Auto-generated serial version UID
-	 */
-	private static final long serialVersionUID = -3469545957144404137L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(GroupedWindowingInvokable.class);
-
-	private KeySelector<IN, ?> keySelector;
-	private Configuration parameters;
-	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
-	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
-	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
-	private Map<Object, WindowingInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowingInvokable<IN, OUT>>();
-	private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
-	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-
-	/**
-	 * This constructor creates an instance of the grouped windowing invokable.
-	 * A {@link KeySelector} is used to specify the key position or key
-	 * extraction. The {@link ReduceFunction} will be executed on each group
-	 * separately. Trigger policies might either be centralized or distributed.
-	 * Eviction policies are always distributed. A distributed policy have to be
-	 * a {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it
-	 * will be cloned to have separated instances for each group. At the startup
-	 * time the distributed policies will be stored as sample, and only clones
-	 * of them will be used to maintain the groups. Therefore, each group starts
-	 * with the initial policy states.
-	 * 
-	 * While a distributed policy only gets notified with the elements belonging
-	 * to the respective group, a centralized policy get notified with all
-	 * arriving elements. When a centralized trigger occurred, all groups get
-	 * triggered. This is done by submitting the element which caused the
-	 * trigger as real element to the groups it belongs to and as fake element
-	 * to all other groups. Within the groups the element might be further
-	 * processed, causing more triggered, prenotifications of active distributed
-	 * policies and evictions like usual.
-	 * 
-	 * Central policies can be instance of {@link ActiveTriggerPolicy} and also
-	 * implement the
-	 * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
-	 * method. Fake elements created on prenotification will be forwarded to all
-	 * groups. The {@link ActiveTriggerCallback} is also implemented in a way,
-	 * that it forwards/distributed calls all groups.
-	 * 
-	 * @param userFunction
-	 *            The user defined function.
-	 * @param keySelector
-	 *            A key selector to extract the key for the groups from the
-	 *            input data.
-	 * @param distributedTriggerPolicies
-	 *            Trigger policies to be distributed and maintained individually
-	 *            within each group.
-	 * @param distributedEvictionPolicies
-	 *            Eviction policies to be distributed and maintained
-	 *            individually within each group. There are no central eviction
-	 *            policies because there is no central element buffer but only a
-	 *            buffer per group. Therefore evictions might always be done per
-	 *            group.
-	 * @param centralTriggerPolicies
-	 *            Trigger policies which will only exist once at a central
-	 *            place. In case a central policy triggers, it will cause all
-	 *            groups to be emitted. (Remark: Empty groups cannot be emitted.
-	 *            If only one element is contained a group, this element itself
-	 *            is returned as aggregated result.)
-	 */
-	public GroupedWindowingInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
-			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
-			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
-			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
-
-		super(userFunction);
-		this.keySelector = keySelector;
-		this.centralTriggerPolicies = centralTriggerPolicies;
-		this.distributedTriggerPolicies = distributedTriggerPolicies;
-		this.distributedEvictionPolicies = distributedEvictionPolicies;
-
-		for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
-			if (trigger instanceof ActiveTriggerPolicy) {
-				this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
-			}
-		}
-	}
-
-	@Override
-	protected void immutableInvoke() throws Exception {
-		// Prevent empty data streams
-		if ((reuse = recordIterator.next(reuse)) == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
-
-		// Continuously run
-		while (reuse != null) {
-			WindowingInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
-					.getKey(reuse.getObject()));
-			if (groupInvokable == null) {
-				groupInvokable = makeNewGroup(reuse);
-			}
-
-			// Run the precalls for central active triggers
-			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
-				Object[] result = trigger.preNotifyTrigger(reuse.getObject());
-				for (Object in : result) {
-					for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
-						group.processFakeElement(in, trigger);
-					}
-				}
-			}
-
-			// Process non-active central triggers
-			for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
-				if (triggerPolicy.notifyTrigger(reuse.getObject())) {
-					currentTriggerPolicies.add(triggerPolicy);
-				}
-			}
-
-			if (currentTriggerPolicies.isEmpty()) {
-				// only add the element to its group
-				groupInvokable.processRealElement(reuse.getObject());
-			} else {
-				// call user function for all groups
-				for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
-					if (group == groupInvokable) {
-						// process real with initialized policies
-						group.processRealElement(reuse.getObject(), currentTriggerPolicies);
-					} else {
-						// process like a fake but also initialized with
-						// policies
-						group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
-					}
-				}
-			}
-
-			// clear current trigger list
-			currentTriggerPolicies.clear();
-
-			// Recreate the reuse-StremRecord object and load next StreamRecord
-			resetReuse();
-			reuse = recordIterator.next(reuse);
-		}
-
-		// Stop all remaining threads from policies
-		for (Thread t : activePolicyThreads) {
-			t.interrupt();
-		}
-
-		// finally trigger the buffer.
-		for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
-			group.emitFinalWindow(centralTriggerPolicies);
-		}
-
-	}
-
-	/**
-	 * This method creates a new group. The method gets called in case an
-	 * element arrives which has a key which was not seen before. The method
-	 * created a nested {@link WindowingInvokable} and therefore created clones
-	 * of all distributed trigger and eviction policies.
-	 * 
-	 * @param element
-	 *            The element which leads to the generation of a new group
-	 *            (previously unseen key)
-	 * @throws Exception
-	 *             In case the {@link KeySelector} throws an exception in
-	 *             {@link KeySelector#getKey(Object)}, the exception is not
-	 *             catched by this method.
-	 */
-	@SuppressWarnings("unchecked")
-	private WindowingInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception {
-		// clone the policies
-		LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-		LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
-		for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) {
-			clonedDistributedTriggerPolicies.add(trigger.clone());
-		}
-		for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) {
-			clonedDistributedEvictionPolicies.add(eviction.clone());
-		}
-
-		WindowingInvokable<IN, OUT> groupInvokable;
-		if (userFunction instanceof ReduceFunction) {
-			groupInvokable = (WindowingInvokable<IN, OUT>) new WindowingReduceInvokable<IN>(
-					(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
-					clonedDistributedEvictionPolicies);
-		} else {
-			groupInvokable = new WindowingGroupInvokable<IN, OUT>(
-					(GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies,
-					clonedDistributedEvictionPolicies);
-		}
-
-		groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
-		groupInvokable.open(this.parameters);
-		windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
-
-		return groupInvokable;
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
-		}
-		immutableInvoke();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		// This method gets never called directly. The user function calls are
-		// all delegated to the invokable instanced which handle/represent the
-		// groups.
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.parameters = parameters;
-		for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) {
-			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
-			if (target != null) {
-				Thread thread = new Thread(target);
-				activePolicyThreads.add(thread);
-				thread.start();
-			}
-		}
-	};
-
-	/**
-	 * This callback class allows to handle the the callbacks done by threads
-	 * defined in active trigger policies
-	 * 
-	 * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
-	 */
-	private class WindowingCallback implements ActiveTriggerCallback {
-		private ActiveTriggerPolicy<IN> policy;
-
-		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
-			this.policy = policy;
-		}
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-			for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
-				group.processFakeElement(datapoint, policy);
-			}
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
new file mode 100644
index 0000000..9d0b584
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	GroupReduceFunction<IN, OUT> reducer;
+
+	public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> userFunction,
+			LinkedList<TriggerPolicy<IN>> triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(userFunction, triggerPolicies, evictionPolicies);
+		this.reducer = userFunction;
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		reducer.reduce(buffer, collector);
+	}
+
+}