You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:20 UTC

[15/34] incubator-flink git commit: [streaming] New windowing API merge and cleanup + several minor fixes

[streaming] New windowing API merge and cleanup + several minor fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f96ba06e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f96ba06e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f96ba06e

Branch: refs/heads/master
Commit: f96ba06e0510690af8c44ae5ef09dda15d2e1ff4
Parents: 751f101
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Nov 23 18:54:44 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/BatchedDataStream.java       | 414 -------------------
 .../streaming/api/datastream/DataStream.java    |  96 -----
 .../api/datastream/WindowDataStream.java        |  92 -----
 .../api/datastream/WindowedDataStream.java      | 403 ++++++++++++++++--
 .../operator/BatchGroupReduceInvokable.java     | 163 --------
 .../operator/BatchReduceInvokable.java          | 224 ----------
 .../GroupedBatchGroupReduceInvokable.java       |  61 ---
 .../operator/GroupedBatchReduceInvokable.java   |  58 ---
 .../GroupedWindowGroupReduceInvokable.java      | 128 ------
 .../operator/GroupedWindowReduceInvokable.java  | 106 -----
 .../operator/GroupedWindowingInvokable.java     |  41 +-
 .../operator/WindowGroupReduceInvokable.java    | 108 -----
 .../operator/WindowReduceInvokable.java         | 124 ------
 .../operator/WindowingGroupInvokable.java       |  43 ++
 .../invokable/operator/WindowingInvokable.java  |  44 +-
 .../operator/WindowingReduceInvokable.java      |  59 +++
 .../streaming/api/windowing/helper/Count.java   |  25 +-
 .../streaming/api/windowing/helper/Time.java    |  79 ++--
 .../windowing/policy/CountTriggerPolicy.java    |   2 +-
 .../windowing/policy/TimeEvictionPolicy.java    |   4 +-
 .../api/windowing/policy/TimeTriggerPolicy.java |  55 +--
 .../apache/flink/streaming/api/PrintTest.java   |  60 ++-
 .../operator/BatchGroupReduceTest.java          |  98 -----
 .../api/invokable/operator/BatchReduceTest.java |  85 ----
 .../operator/GroupedBatchGroupReduceTest.java   | 101 -----
 .../operator/GroupedBatchReduceTest.java        | 107 -----
 .../GroupedWindowGroupReduceInvokableTest.java  |  99 -----
 .../operator/GroupedWindowingInvokableTest.java |  44 +-
 .../WindowGroupReduceInvokableTest.java         | 124 ------
 .../operator/WindowReduceInvokableTest.java     | 130 ------
 .../operator/WindowingInvokableTest.java        |  42 +-
 .../windowing/policy/TimeTriggerPolicyTest.java |  39 +-
 .../ml/IncrementalLearningSkeleton.java         |   6 +-
 .../examples/windowing/BasicExample.java        |  76 ----
 .../windowing/MultiplePoliciesExample.java      |  38 +-
 .../examples/windowing/SlidingExample.java      |  27 +-
 .../windowing/TimeWindowingExample.java         |  57 +--
 37 files changed, 771 insertions(+), 2691 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
deleted file mode 100755
index c8a49c6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-
-/**
- * A {@link BatchedDataStream} represents a data stream whose elements are
- * batched together in a sliding batch. operations like
- * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
- * are applied for each batch and the batch is slid afterwards.
- *
- * @param <OUT>
- *            The output type of the {@link BatchedDataStream}
- */
-public class BatchedDataStream<OUT> {
-
-	protected DataStream<OUT> dataStream;
-	protected boolean isGrouped;
-	protected KeySelector<OUT, ?> keySelector;
-	protected long batchSize;
-	protected long slideSize;
-
-	protected BatchedDataStream(DataStream<OUT> dataStream, long batchSize, long slideSize) {
-		if (dataStream instanceof GroupedDataStream) {
-			this.isGrouped = true;
-			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
-		} else {
-			this.isGrouped = false;
-		}
-		this.dataStream = dataStream.copy();
-		this.batchSize = batchSize;
-		this.slideSize = slideSize;
-	}
-
-	protected BatchedDataStream(BatchedDataStream<OUT> batchedDataStream) {
-		this.dataStream = batchedDataStream.dataStream.copy();
-		this.isGrouped = batchedDataStream.isGrouped;
-		this.keySelector = batchedDataStream.keySelector;
-		this.batchSize = batchedDataStream.batchSize;
-		this.slideSize = batchedDataStream.slideSize;
-	}
-
-	/**
-	 * Groups the elements of the {@link BatchedDataStream} by the given key
-	 * positions to be used with grouped operators.
-	 * 
-	 * @param fields
-	 *            The position of the fields on which the
-	 *            {@link BatchedDataStream} will be grouped.
-	 * @return The transformed {@link BatchedDataStream}
-	 */
-	public BatchedDataStream<OUT> groupBy(int... fields) {
-		return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
-	}
-
-	/**
-	 * Groups a {@link BatchedDataStream} using field expressions. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link BatchedDataStream}S underlying type. A dot can
-	 * be used to drill down into objects, as in
-	 * {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param fields
-	 *            One or more field expressions on which the DataStream will be
-	 *            grouped.
-	 * @return The grouped {@link BatchedDataStream}
-	 **/
-	public BatchedDataStream<OUT> groupBy(String... fields) {
-
-		return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
-
-	}
-
-	/**
-	 * Applies a reduce transformation on every sliding batch/window of the data
-	 * stream. If the data stream is grouped then the reducer is applied on
-	 * every group of elements sharing the same key. This type of reduce is much
-	 * faster than reduceGroup since the reduce function can be applied
-	 * incrementally. The user can also extend the {@link RichReduceFunction} to
-	 * gain access to other features provided by the {@link RichFuntion}
-	 * interface.
-	 * 
-	 * @param reducer
-	 *            The {@link ReduceFunction} that will be called for every
-	 *            element of the input values in the batch/window.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), getReduceInvokable(reducer));
-	}
-
-	/**
-	 * Applies a reduceGroup transformation on preset batches/windows of the
-	 * DataStream. The transformation calls a {@link GroupReduceFunction} for
-	 * each batch/window. Each GroupReduceFunction call can return any number of
-	 * elements including none. The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
-	 * 
-	 * @param reducer
-	 *            The {@link GroupReduceFunction} that will be called for every
-	 *            batch/window.
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) {
-		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
-				GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer));
-	}
-
-	/**
-	 * Applies an aggregation that sums every sliding batch/window of the data
-	 * stream at the given position.
-	 * 
-	 * @param positionToSum
-	 *            The position in the data point to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
-		dataStream.checkFieldRange(positionToSum);
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
-				dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
-	}
-
-	/**
-	 * Syntactic sugar for sum(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> sum() {
-		return sum(0);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the sum of the pojo data stream at
-	 * the given field expression. A field expression is either the name of a
-	 * public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
-				getOutputType()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum of every sliding
-	 * batch/window of the data stream at the given position.
-	 * 
-	 * @param positionToMin
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
-		dataStream.checkFieldRange(positionToMin);
-		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
-				AggregationType.MIN));
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same minimum value the operator returns the first element by
-	 * default.
-	 * 
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same minimum value the operator returns either the first or last
-	 * one depending on the parameter setting.
-	 * 
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            minimum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
-		dataStream.checkFieldRange(positionToMinBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
-				AggregationType.MINBY, first));
-	}
-
-	/**
-	 * Syntactic sugar for min(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> min() {
-		return min(0);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum of every sliding
-	 * batch/window of the data stream at the given position.
-	 * 
-	 * @param positionToMax
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
-		dataStream.checkFieldRange(positionToMax);
-		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
-				AggregationType.MAX));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same maximum value the operator returns the first by default.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same maximum value the operator returns either the first or last
-	 * one depending on the parameter setting.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
-		dataStream.checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
-				AggregationType.MAXBY, first));
-	}
-
-	/**
-	 * Syntactic sugar for max(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> max() {
-		return max(0);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum of the pojo data
-	 * stream at the given field expression. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> min(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MIN, false));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum of the pojo data
-	 * stream at the given field expression. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAX, false));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum element of the pojo
-	 * data stream by the given field expression. A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MINBY, first));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum element of the pojo
-	 * data stream by the given field expression. A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAXBY, first));
-	}
-
-	/**
-	 * Gets the output type.
-	 * 
-	 * @return The output type.
-	 */
-	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
-	}
-
-	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
-
-		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
-				aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
-
-		return returnStream;
-	}
-
-	protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
-		BatchReduceInvokable<OUT> invokable;
-		if (isGrouped) {
-			invokable = new GroupedBatchReduceInvokable<OUT>(reducer, batchSize, slideSize,
-					keySelector);
-		} else {
-			invokable = new BatchReduceInvokable<OUT>(reducer, batchSize, slideSize);
-		}
-		return invokable;
-	}
-
-	protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
-			GroupReduceFunction<OUT, R> reducer) {
-		BatchGroupReduceInvokable<OUT, R> invokable;
-		if (isGrouped) {
-			invokable = new GroupedBatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
-					keySelector);
-		} else {
-			invokable = new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize);
-		}
-		return invokable;
-	}
-
-	protected BatchedDataStream<OUT> copy() {
-		return new BatchedDataStream<OUT>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c38a638..2ce28d9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -663,7 +663,6 @@ public class DataStream<OUT> {
 		return new GroupedDataStream<OUT>(this, keySelector);
 	}
 
-
 	/**
 	 * This allows you to set up windowing through a nice API using
 	 * {@link WindowingHelper} such as {@link Time}, {@link Count} and
@@ -678,101 +677,6 @@ public class DataStream<OUT> {
 	public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
 		return new WindowedDataStream<OUT>(this, policyHelpers);
 	}
-	
-	/**
-	 * Collects the data stream elements into sliding batches creating a new
-	 * {@link BatchedDataStream}. The user can apply transformations like
-	 * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
-	 * or aggregations on the {@link BatchedDataStream}.
-	 * 
-	 * @param batchSize
-	 *            The number of elements in each batch at each operator
-	 * @param slideSize
-	 *            The number of elements with which the batches are slid by
-	 *            after each transformation.
-	 * @return The transformed {@link DataStream}
-	 */
-	public BatchedDataStream<OUT> batch(long batchSize, long slideSize) {
-		if (batchSize < 1) {
-			throw new IllegalArgumentException("Batch size must be positive");
-		}
-		if (slideSize < 1) {
-			throw new IllegalArgumentException("Slide size must be positive");
-		}
-		return new BatchedDataStream<OUT>(this, batchSize, slideSize);
-	}
-
-	/**
-	 * Collects the data stream elements into sliding batches creating a new
-	 * {@link BatchedDataStream}. The user can apply transformations like
-	 * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
-	 * or aggregations on the {@link BatchedDataStream}.
-	 * 
-	 * @param batchSize
-	 *            The number of elements in each batch at each operator
-	 * @return The transformed {@link DataStream}
-	 */
-	public BatchedDataStream<OUT> batch(long batchSize) {
-		return batch(batchSize, batchSize);
-	}
-
-	/**
-	 * Collects the data stream elements into sliding windows creating a new
-	 * {@link WindowDataStream}. The user can apply transformations like
-	 * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
-	 * aggregations on the {@link WindowDataStream}.
-	 * 
-	 * @param windowSize
-	 *            The length of the window in milliseconds.
-	 * @param slideInterval
-	 *            The number of milliseconds with which the windows are slid by
-	 *            after each transformation.
-	 * @param timestamp
-	 *            User defined function for extracting time-stamps from each
-	 *            element
-	 * @return The transformed {@link DataStream}
-	 */
-	public WindowDataStream<OUT> window(long windowSize, long slideInterval,
-			TimeStamp<OUT> timestamp) {
-		if (windowSize < 1) {
-			throw new IllegalArgumentException("Window size must be positive");
-		}
-		if (slideInterval < 1) {
-			throw new IllegalArgumentException("Slide interval must be positive");
-		}
-		return new WindowDataStream<OUT>(this, windowSize, slideInterval, timestamp);
-	}
-
-	/**
-	 * Collects the data stream elements into sliding windows creating a new
-	 * {@link WindowDataStream}. The user can apply transformations like
-	 * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
-	 * aggregations on the {@link WindowDataStream}.
-	 * 
-	 * @param windowSize
-	 *            The length of the window in milliseconds.
-	 * @param slideInterval
-	 *            The number of milliseconds with which the windows are slid by
-	 *            after each transformation.
-	 * @return The transformed {@link DataStream}
-	 */
-	public WindowDataStream<OUT> window(long windowSize, long slideInterval) {
-		return window(windowSize, slideInterval, new DefaultTimeStamp<OUT>());
-	}
-
-	/**
-	 * Collects the data stream elements into sliding windows creating a new
-	 * {@link WindowDataStream}. The user can apply transformations like
-	 * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
-	 * aggregations on the {@link WindowDataStream}.
-	 * 
-	 * @param windowSize
-	 *            The length of the window in milliseconds.
-	 * @return The transformed {@link DataStream}
-	 */
-	public WindowDataStream<OUT> window(long windowSize) {
-		return window(windowSize, windowSize);
-	}
 
 	/**
 	 * Applies an aggregation that sums the data stream at the given position.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
deleted file mode 100755
index be0e37f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-/**
- * A {@link WindowDataStream} represents a data stream whose elements are
- * batched together in a sliding window. operations like
- * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
- * are applied for each window and the window is slid afterwards.
- *
- * @param <OUT>
- *            The output type of the {@link WindowDataStream}
- */
-public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
-
-	TimeStamp<OUT> timeStamp;
-
-	protected WindowDataStream(DataStream<OUT> dataStream, long windowSize, long slideInterval,
-			TimeStamp<OUT> timeStamp) {
-		super(dataStream, windowSize, slideInterval);
-		this.timeStamp = timeStamp;
-	}
-
-	protected WindowDataStream(WindowDataStream<OUT> windowDataStream) {
-		super(windowDataStream);
-		this.timeStamp = windowDataStream.timeStamp;
-	}
-
-	public WindowDataStream<OUT> groupBy(int... keyPositions) {
-		return new WindowDataStream<OUT>(dataStream.groupBy(keyPositions), batchSize, slideSize,
-				timeStamp);
-	}
-
-	public WindowDataStream<OUT> groupBy(String... fields) {
-		return new WindowDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize,
-				timeStamp);
-	}
-
-	protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
-		WindowReduceInvokable<OUT> invokable;
-		if (isGrouped) {
-			invokable = new GroupedWindowReduceInvokable<OUT>(reducer, batchSize, slideSize,
-					keySelector, timeStamp);
-		} else {
-			invokable = new WindowReduceInvokable<OUT>(reducer, batchSize, slideSize, timeStamp);
-		}
-		return invokable;
-	}
-
-	protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
-			GroupReduceFunction<OUT, R> reducer) {
-		BatchGroupReduceInvokable<OUT, R> invokable;
-		if (isGrouped) {
-			invokable = new GroupedWindowGroupReduceInvokable<OUT, R>(reducer, batchSize,
-					slideSize, keySelector, timeStamp);
-		} else {
-			invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
-					timeStamp);
-		}
-		return invokable;
-	}
-
-	public WindowDataStream<OUT> copy() {
-		return new WindowDataStream<OUT>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 2e6b885..dd26e64 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -17,25 +17,34 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 
-import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.operator.WindowingInvokable;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowingGroupInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowingReduceInvokable;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 
 /**
- * A {@link WindowedDataStream} represents a data stream whose elements
- * are batched together in a sliding batch. operations like
+ * A {@link WindowedDataStream} represents a data stream whose elements are
+ * batched together in a sliding batch. operations like
  * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
  * are applied for each batch and the batch is slid afterwards.
  *
@@ -48,19 +57,31 @@ public class WindowedDataStream<OUT> {
 	protected boolean isGrouped;
 	protected KeySelector<OUT, ?> keySelector;
 
-	protected WindowingHelper<OUT>[] triggerPolicies;
-	protected WindowingHelper<OUT>[] evictionPolicies;
+	protected List<WindowingHelper<OUT>> triggerPolicies;
+	protected List<WindowingHelper<OUT>> evictionPolicies;
+
+	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) {
+		this.dataStream = dataStream.copy();
+		this.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
+		for (WindowingHelper<OUT> helper : policyHelpers) {
+			this.triggerPolicies.add(helper);
+		}
 
-	protected WindowedDataStream(DataStream<OUT> dataStream,
-			WindowingHelper<OUT>... policyHelpers) {
 		if (dataStream instanceof GroupedDataStream) {
 			this.isGrouped = true;
 			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+
 		} else {
 			this.isGrouped = false;
 		}
-		this.dataStream = dataStream.copy();
-		this.triggerPolicies = policyHelpers;
+	}
+
+	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
+		this.dataStream = windowedDataStream.dataStream.copy();
+		this.isGrouped = windowedDataStream.isGrouped;
+		this.keySelector = windowedDataStream.keySelector;
+		this.triggerPolicies = windowedDataStream.triggerPolicies;
+		this.evictionPolicies = windowedDataStream.evictionPolicies;
 	}
 
 	protected LinkedList<TriggerPolicy<OUT>> getTriggers() {
@@ -87,18 +108,43 @@ public class WindowedDataStream<OUT> {
 		return evictionPolicyList;
 	}
 
-	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
-		this.dataStream = windowedDataStream.dataStream.copy();
-		this.isGrouped = windowedDataStream.isGrouped;
-		this.keySelector = windowedDataStream.keySelector;
-		this.triggerPolicies = windowedDataStream.triggerPolicies;
-		this.evictionPolicies = windowedDataStream.evictionPolicies;
+	protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
+		LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
+		// Add Time triggers to central triggers
+		for (TriggerPolicy<OUT> trigger : getTriggers()) {
+			if (trigger instanceof TimeTriggerPolicy) {
+				cTriggers.add(trigger);
+			}
+		}
+		return cTriggers;
+	}
+
+	protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
+		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
+
+		// Everything except Time triggers are distributed
+		for (TriggerPolicy<OUT> trigger : getTriggers()) {
+			if (!(trigger instanceof TimeTriggerPolicy)) {
+				dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+			}
+		}
+
+		return dTriggers;
+	}
+
+	protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
+		LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
 
+		for (EvictionPolicy<OUT> evicter : getEvicters()) {
+			evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+		}
+
+		return evicters;
 	}
 
 	/**
 	 * Groups the elements of the {@link WindowedDataStream} by the given
-	 * key position to be used with grouped operators.
+	 * {@link KeySelector} to be used with grouped operators.
 	 * 
 	 * @param keySelector
 	 *            The specification of the key on which the
@@ -109,7 +155,39 @@ public class WindowedDataStream<OUT> {
 		WindowedDataStream<OUT> ret = this.copy();
 		ret.dataStream = ret.dataStream.groupBy(keySelector);
 		ret.isGrouped = true;
-		ret.keySelector = keySelector;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+		return ret;
+	}
+
+	/**
+	 * Groups the elements of the {@link WindowedDataStream} by the given key
+	 * positions to be used with grouped operators.
+	 * 
+	 * @param fields
+	 *            The position of the fields to group by.
+	 * @return The transformed {@link WindowedDataStream}
+	 */
+	public WindowedDataStream<OUT> groupBy(int... fields) {
+		WindowedDataStream<OUT> ret = this.copy();
+		ret.dataStream = ret.dataStream.groupBy(fields);
+		ret.isGrouped = true;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+		return ret;
+	}
+
+	/**
+	 * Groups the elements of the {@link WindowedDataStream} by the given field
+	 * expressions to be used with grouped operators.
+	 * 
+	 * @param fields
+	 *            The position of the fields to group by.
+	 * @return The transformed {@link WindowedDataStream}
+	 */
+	public WindowedDataStream<OUT> groupBy(String... fields) {
+		WindowedDataStream<OUT> ret = this.copy();
+		ret.dataStream = ret.dataStream.groupBy(fields);
+		ret.isGrouped = true;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
 		return ret;
 	}
 
@@ -126,13 +204,242 @@ public class WindowedDataStream<OUT> {
 	 *            information
 	 * @return The single output operator
 	 */
-	public SingleOutputStreamOperator<Tuple2<OUT, String[]>, ?> reduce(
-			ReduceFunction<OUT> reduceFunction) {
+	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
 		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
-				new FunctionTypeWrapper<OUT>(reduceFunction, ReduceFunction.class, 0),
-				new CombineTypeWrapper<OUT, String[]>(dataStream.outTypeWrapper,
-						new ObjectTypeWrapper<String[]>(new String[] { "" })),
-				new WindowingInvokable<OUT>(reduceFunction, getTriggers(), getEvicters()));
+				dataStream.outTypeWrapper, dataStream.outTypeWrapper,
+				getReduceInvokable(reduceFunction));
+	}
+
+	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
+			GroupReduceFunction<OUT, R> reduceFunction) {
+		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
+				dataStream.outTypeWrapper, new FunctionTypeWrapper<R>(reduceFunction,
+						GroupReduceFunction.class, 1), getReduceGroupInvokable(reduceFunction));
+	}
+
+	/**
+	 * Applies an aggregation that sums every sliding batch/window of the data
+	 * stream at the given position.
+	 * 
+	 * @param positionToSum
+	 *            The position in the data point to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
+		dataStream.checkFieldRange(positionToSum);
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
+				dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
+	}
+
+	/**
+	 * Syntactic sugar for sum(0)
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum() {
+		return sum(0);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the sum of the pojo data stream at
+	 * the given field expression. A field expression is either the name of a
+	 * public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
+				getOutputType()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum of every sliding
+	 * batch/window of the data stream at the given position.
+	 * 
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
+		dataStream.checkFieldRange(positionToMin);
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+				AggregationType.MIN));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same minimum value the operator returns the first element by
+	 * default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same minimum value the operator returns either the first or last
+	 * one depending on the parameter setting.
+	 * 
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            minimum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
+		dataStream.checkFieldRange(positionToMinBy);
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+				AggregationType.MINBY, first));
+	}
+
+	/**
+	 * Syntactic sugar for min(0)
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min() {
+		return min(0);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum of every sliding
+	 * batch/window of the data stream at the given position.
+	 * 
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+		dataStream.checkFieldRange(positionToMax);
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+				AggregationType.MAX));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same maximum value the operator returns the first by default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every sliding
+	 * batch/window of the data stream by the given position. If more elements
+	 * have the same maximum value the operator returns either the first or last
+	 * one depending on the parameter setting.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
+		dataStream.checkFieldRange(positionToMaxBy);
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+				AggregationType.MAXBY, first));
+	}
+
+	/**
+	 * Syntactic sugar for max(0)
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max() {
+		return max(0);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MIN, false));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum of the pojo data
+	 * stream at the given field expression. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MAX, false));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MINBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression. A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MAXBY, first));
 	}
 
 	/**
@@ -153,11 +460,45 @@ public class WindowedDataStream<OUT> {
 		WindowedDataStream<OUT> ret = this.copy();
 		if (ret.evictionPolicies == null) {
 			ret.evictionPolicies = ret.triggerPolicies;
-			ret.triggerPolicies = policyHelpers;
-		} else {
-			ret.triggerPolicies = (WindowingHelper<OUT>[]) ArrayUtils.addAll(triggerPolicies,
-					policyHelpers);
+			ret.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
+		}
+		for (WindowingHelper<OUT> helper : policyHelpers) {
+			ret.triggerPolicies.add(helper);
 		}
 		return ret;
 	}
+
+	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
+		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
+
+		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
+				aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+
+		return returnStream;
+	}
+
+	protected <R> StreamInvokable<OUT, R> getReduceGroupInvokable(
+			GroupReduceFunction<OUT, R> reducer) {
+		StreamInvokable<OUT, R> invokable;
+		if (isGrouped) {
+			invokable = new GroupedWindowingInvokable<OUT, R>(reducer, keySelector,
+					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+
+		} else {
+			invokable = new WindowingGroupInvokable<OUT, R>(reducer, getTriggers(), getEvicters());
+		}
+		return invokable;
+	}
+
+	protected StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+		StreamInvokable<OUT, OUT> invokable;
+		if (isGrouped) {
+			invokable = new GroupedWindowingInvokable<OUT, OUT>(reducer, keySelector,
+					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+
+		} else {
+			invokable = new WindowingReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
+		}
+		return invokable;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
deleted file mode 100755
index 134c33d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.io.Serializable;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.CircularFifoList;
-
-public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-	protected GroupReduceFunction<IN, OUT> reducer;
-
-	protected long slideSize;
-
-	protected long batchSize;
-	protected int granularity;
-	protected int batchPerSlide;
-	protected StreamBatch batch;
-	protected StreamBatch currentBatch;
-	protected long numberOfBatches;
-
-	public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
-			long slideSize) {
-		super(reduceFunction);
-		this.reducer = reduceFunction;
-		this.batchSize = batchSize;
-		this.slideSize = slideSize;
-		this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
-		this.batchPerSlide = (int) (slideSize / granularity);
-		this.numberOfBatches = batchSize / granularity;
-		this.batch = new StreamBatch();
-	}
-
-	@Override
-	protected void immutableInvoke() throws Exception {
-		if ((reuse = recordIterator.next(reuse)) == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
-
-		while (reuse != null) {
-			StreamBatch batch = getBatch(reuse);
-			batch.addToBuffer(reuse.getObject());
-
-			resetReuse();
-			reuse = recordIterator.next(reuse);
-		}
-
-		reduceLastBatch();
-	}
-
-	@Override
-	// TODO: implement mutableInvoke for reduce
-	protected void mutableInvoke() throws Exception {
-		System.out.println("Immutable setting is used");
-		immutableInvoke();
-	}
-
-	protected StreamBatch getBatch(StreamRecord<IN> next) {
-		return batch;
-	}
-
-	protected void reduce(StreamBatch batch) {
-		this.currentBatch = batch;
-		callUserFunctionAndLogException();
-	}
-
-	protected void reduceLastBatch() {
-		batch.reduceLastBatch();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		if(!currentBatch.circularList.isEmpty()){
-			reducer.reduce(currentBatch.circularList.getIterable(), collector);
-		}
-	}
-
-	protected class StreamBatch implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-		private long counter;
-		protected long minibatchCounter;
-
-		protected CircularFifoList<IN> circularList;
-
-		public StreamBatch() {
-			this.circularList = new CircularFifoList<IN>();
-			this.counter = 0;
-			this.minibatchCounter = 0;
-		}
-
-		public void addToBuffer(IN nextValue) throws Exception {
-			circularList.add(nextValue);
-
-			counter++;
-
-			if (miniBatchEnd()) {
-				circularList.newSlide();
-				minibatchCounter++;
-				if (batchEnd()) {
-					reduceBatch();
-					circularList.shiftWindow(batchPerSlide);
-				}
-			}
-
-		}
-
-		protected boolean miniBatchEnd() {
-			if( (counter % granularity) == 0){
-				counter = 0;
-				return true;
-			}else{
-				return false;
-			}
-		}
-		
-		
-		public boolean batchEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-		public void reduceBatch() {
-			reduce(this);
-		}
-
-		public void reduceLastBatch() {
-			if (!miniBatchEnd()) {
-				reduceBatch();
-			}
-		}
-		
-		@Override
-		public String toString(){
-			return circularList.toString();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
deleted file mode 100755
index 6b22555..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.NullableCircularBuffer;
-
-public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
-
-	private static final long serialVersionUID = 1L;
-	protected ReduceFunction<OUT> reducer;
-
-	protected long slideSize;
-
-	protected long batchSize;
-	protected int granularity;
-	protected long batchPerSlide;
-	protected long numberOfBatches;
-	protected StreamBatch batch;
-	protected StreamBatch currentBatch;
-
-	protected TypeSerializer<OUT> serializer;
-
-	public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize, long slideSize) {
-		super(reduceFunction);
-		this.reducer = reduceFunction;
-		this.batchSize = batchSize;
-		this.slideSize = slideSize;
-		this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
-		this.batchPerSlide = slideSize / granularity;
-		this.numberOfBatches = batchSize / granularity;
-	}
-
-	@Override
-	protected void immutableInvoke() throws Exception {
-		if ((reuse = recordIterator.next(reuse)) == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
-
-		while (reuse != null) {
-			StreamBatch batch = getBatch(reuse);
-
-			batch.reduceToBuffer(reuse.getObject());
-
-			resetReuse();
-			reuse = recordIterator.next(reuse);
-		}
-
-		reduceLastBatch();
-
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		System.out.println("Immutable setting is used");
-		immutableInvoke();
-	}
-
-	protected StreamBatch getBatch(StreamRecord<OUT> next) {
-		return batch;
-	}
-
-	protected void reduce(StreamBatch batch) {
-		this.currentBatch = batch;
-		callUserFunctionAndLogException();
-	}
-
-	protected void reduceLastBatch() throws Exception {
-		batch.reduceLastBatch();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<OUT> reducedIterator = currentBatch.getIterator();
-		OUT reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			OUT next = reducedIterator.next();
-			if (next != null) {
-				reduced = reducer.reduce(serializer.copy(reduced), serializer.copy(next));
-			}
-		}
-		if (reduced != null) {
-			collector.collect(reduced);
-		}
-	}
-
-	protected class StreamBatch implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-		protected long counter;
-		protected long minibatchCounter;
-		protected OUT currentValue;
-		boolean changed;
-
-		protected NullableCircularBuffer circularBuffer;
-
-		public StreamBatch() {
-
-			this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
-			this.counter = 0;
-			this.minibatchCounter = 0;
-			this.changed = false;
-		}
-
-		public void reduceToBuffer(OUT nextValue) throws Exception {
-
-			if (currentValue != null) {
-				currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
-			} else {
-				currentValue = nextValue;
-			}
-
-			counter++;
-
-			if (miniBatchEnd()) {
-				addToBuffer();
-				if (batchEnd()) {
-					reduceBatch();
-				}
-			}
-
-		}
-
-		protected void addToBuffer() {
-			circularBuffer.add(currentValue);
-			changed = true;
-			minibatchCounter++;
-			currentValue = null;
-		}
-
-		protected boolean miniBatchEnd() {
-			if ((counter % granularity) == 0) {
-				counter = 0;
-				return true;
-			} else {
-				return false;
-			}
-		}
-
-		public boolean batchEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-		public void reduceLastBatch() throws Exception {
-
-			if (miniBatchInProgress()) {
-				addToBuffer();
-			}
-			if (changed == true && minibatchCounter >= 0) {
-				if (circularBuffer.isFull()) {
-					for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
-						if (!circularBuffer.isEmpty()) {
-							circularBuffer.remove();
-						}
-					}
-				}
-				if (!circularBuffer.isEmpty()) {
-					reduce(this);
-				}
-			}
-		}
-
-		public boolean miniBatchInProgress() {
-			return currentValue != null;
-		}
-
-		public void reduceBatch() {
-			reduce(this);
-			changed = false;
-		}
-
-		@SuppressWarnings("unchecked")
-		public Iterator<OUT> getIterator() {
-			return circularBuffer.iterator();
-		}
-
-		@Override
-		public String toString() {
-			return circularBuffer.toString();
-		}
-
-	}
-
-
-	@Override
-	public void open(Configuration config) throws Exception{
-		super.open(config);
-		serializer = inSerializer.getObjectSerializer();
-		this.batch = new StreamBatch();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java
deleted file mode 100755
index 3564829..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceInvokable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedBatchGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	Map<Object, StreamBatch> streamBatches;
-	KeySelector<IN, ?> keySelector;
-	
-
-	public GroupedBatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
-			long slideSize, KeySelector<IN, ?> keySelector) {
-		super(reduceFunction, batchSize, slideSize);
-		this.keySelector = keySelector;
-		this.streamBatches = new HashMap<Object, StreamBatch>();
-	}
-	
-	@Override
-	protected StreamBatch getBatch(StreamRecord<IN> next) {
-		Object key = next.getKey(keySelector);
-		StreamBatch batch = streamBatches.get(key);
-		if(batch == null){
-			batch=new StreamBatch();
-			streamBatches.put(key, batch);
-		}
-		return batch;
-	}
-
-	@Override
-	protected void reduceLastBatch() {
-		for(StreamBatch batch: streamBatches.values()){
-			batch.reduceLastBatch();
-		}		
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
deleted file mode 100755
index cfb128c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceInvokable.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-	KeySelector<OUT, ?> keySelector;
-	Map<Object, StreamBatch> streamBatches;
-
-	public GroupedBatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize,
-			long slideSize, KeySelector<OUT, ?> keySelector) {
-		super(reduceFunction, batchSize, slideSize);
-		this.keySelector = keySelector;
-		this.streamBatches = new HashMap<Object, StreamBatch>();
-	}
-
-	@Override
-	protected StreamBatch getBatch(StreamRecord<OUT> next) {
-		Object key = next.getKey(keySelector);
-		StreamBatch batch = streamBatches.get(key);
-		if (batch == null) {
-			batch = new StreamBatch();
-			streamBatches.put(key, batch);
-		}
-		return batch;
-	}
-
-	@Override
-	protected void reduceLastBatch() throws Exception {
-		for (StreamBatch batch : streamBatches.values()) {
-			batch.reduceLastBatch();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
deleted file mode 100755
index 0641ae3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduceInvokable<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	KeySelector<IN, ?> keySelector;
-	Map<Object, StreamWindow> streamWindows;
-	List<Object> cleanList;
-	long currentMiniBatchCount = 0;
-
-	public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction,
-			long windowSize, long slideInterval, KeySelector<IN, ?> keySelector,
-			TimeStamp<IN> timestamp) {
-		super(reduceFunction, windowSize, slideInterval, timestamp);
-		this.keySelector = keySelector;
-		this.reducer = reduceFunction;
-		this.streamWindows = new HashMap<Object, StreamWindow>();
-	}
-
-	@Override
-	protected StreamBatch getBatch(StreamRecord<IN> next) {
-		Object key = next.getKey(keySelector);
-		StreamWindow window = streamWindows.get(key);
-		if (window == null) {
-			window = new GroupedStreamWindow();
-			for (int i = 0; i < currentMiniBatchCount; i++) {
-				window.circularList.newSlide();
-			}
-			streamWindows.put(key, window);
-		}
-		this.window = window;
-		return window;
-	}
-
-	@Override
-	protected void reduceLastBatch() {
-		for (StreamBatch window : streamWindows.values()) {
-			window.reduceLastBatch();
-		}
-	}
-
-	private void shiftGranularityAllWindows() {
-		for (StreamBatch window : streamWindows.values()) {
-			window.circularList.newSlide();
-		}
-	}
-
-	private void slideAllWindows() {
-		currentMiniBatchCount -= batchPerSlide;
-		for (StreamBatch window : streamWindows.values()) {
-			window.circularList.shiftWindow(batchPerSlide);
-		}
-	}
-
-	private void reduceAllWindows() {
-		for (StreamBatch window : streamWindows.values()) {
-			window.reduceBatch();
-		}
-	}
-
-	protected class GroupedStreamWindow extends StreamWindow {
-
-		private static final long serialVersionUID = 1L;
-
-		public GroupedStreamWindow() {
-			super();
-		}
-
-		@Override
-		public void addToBuffer(IN nextValue) throws Exception {
-			checkWindowEnd(timestamp.getTimestamp(nextValue));
-			if (currentMiniBatchCount >= 0) {
-				circularList.add(nextValue);
-			}
-		}
-
-		@Override
-		protected synchronized void checkWindowEnd(long timeStamp) {
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				shiftGranularityAllWindows();
-				currentMiniBatchCount += 1;
-				if (batchEnd()) {
-					reduceAllWindows();
-					slideAllWindows();
-				}
-			}
-		}
-
-		@Override
-		public boolean batchEnd() {
-			if (currentMiniBatchCount == numberOfBatches) {
-				return true;
-			}
-			return false;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
deleted file mode 100644
index 2248096..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-	private KeySelector<OUT, ?> keySelector;
-	private Map<Object, StreamWindow> streamWindows;
-	private long currentMiniBatchCount = 0;
-
-	public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
-			long slideInterval, KeySelector<OUT, ?> keySelector, TimeStamp<OUT> timestamp) {
-		super(reduceFunction, windowSize, slideInterval, timestamp);
-		this.keySelector = keySelector;
-		this.streamWindows = new HashMap<Object, StreamWindow>();
-	}
-
-	@Override
-	protected StreamBatch getBatch(StreamRecord<OUT> next) {
-		Object key = next.getKey(keySelector);
-		StreamWindow window = streamWindows.get(key);
-		if (window == null) {
-			window = new GroupedStreamWindow();
-			window.minibatchCounter = currentMiniBatchCount;
-			streamWindows.put(key, window);
-		}
-		this.window = window;
-		return window;
-	}
-
-	private void addToAllBuffers() {
-		for (StreamBatch window : streamWindows.values()) {
-			window.addToBuffer();
-		}
-	}
-
-	private void reduceAllWindows() {
-		for (StreamBatch window : streamWindows.values()) {
-			window.minibatchCounter -= batchPerSlide;
-			window.reduceBatch();
-		}
-	}
-
-	@Override
-	protected void reduceLastBatch() throws Exception {
-		for (StreamBatch window : streamWindows.values()) {
-			window.reduceLastBatch();
-		}
-	}
-
-	protected class GroupedStreamWindow extends StreamWindow {
-
-		private static final long serialVersionUID = 1L;
-
-		public GroupedStreamWindow() {
-			super();
-		}
-
-		@Override
-		protected synchronized void checkWindowEnd(long timeStamp) {
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				addToAllBuffers();
-				if (batchEnd()) {
-					reduceAllWindows();
-				}
-			}
-			currentMiniBatchCount = this.minibatchCounter;
-		}
-
-		@Override
-		public boolean batchEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				return true;
-			}
-			return false;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
index 7fc35ca..e957b8c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
@@ -21,9 +21,10 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -69,7 +70,7 @@ import org.slf4j.LoggerFactory;
  * @param <IN>
  *            The type of input elements handled by this operator invokable.
  */
-public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
+public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 	/**
 	 * Auto-generated serial version UID
@@ -84,7 +85,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
 	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
 	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
-	private Map<Object, WindowingInvokable<IN>> windowingGroups = new HashMap<Object, WindowingInvokable<IN>>();
+	private Map<Object, WindowingInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowingInvokable<IN, OUT>>();
 	private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
 	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
 
@@ -117,7 +118,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 	 * that it forwards/distributed calls all groups.
 	 * 
 	 * @param userFunction
-	 *            The user defined {@link ReduceFunction}.
+	 *            The user defined function.
 	 * @param keySelector
 	 *            A key selector to extract the key for the groups from the
 	 *            input data.
@@ -137,7 +138,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 	 *            If only one element is contained a group, this element itself
 	 *            is returned as aggregated result.)
 	 */
-	public GroupedWindowingInvokable(ReduceFunction<IN> userFunction,
+	public GroupedWindowingInvokable(Function userFunction,
 			KeySelector<IN, ?> keySelector,
 			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
 			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
@@ -165,8 +166,8 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 
 		// Continuously run
 		while (reuse != null) {
-			WindowingInvokable<IN> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
-					.getObject()));
+			WindowingInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
+					.getKey(reuse.getObject()));
 			if (groupInvokable == null) {
 				groupInvokable = makeNewGroup(reuse);
 			}
@@ -175,7 +176,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
 				IN[] result = trigger.preNotifyTrigger(reuse.getObject());
 				for (IN in : result) {
-					for (WindowingInvokable<IN> group : windowingGroups.values()) {
+					for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
 						group.processFakeElement(in, trigger);
 					}
 				}
@@ -193,7 +194,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 				groupInvokable.processRealElement(reuse.getObject());
 			} else {
 				// call user function for all groups
-				for (WindowingInvokable<IN> group : windowingGroups.values()) {
+				for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
 					if (group == groupInvokable) {
 						// process real with initialized policies
 						group.processRealElement(reuse.getObject(), currentTriggerPolicies);
@@ -219,7 +220,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 		}
 
 		// finally trigger the buffer.
-		for (WindowingInvokable<IN> group : windowingGroups.values()) {
+		for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
 			group.emitFinalWindow(centralTriggerPolicies);
 		}
 
@@ -239,7 +240,8 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 	 *             {@link KeySelector#getKey(Object)}, the exception is not
 	 *             catched by this method.
 	 */
-	private WindowingInvokable<IN> makeNewGroup(StreamRecord<IN> element) throws Exception {
+	@SuppressWarnings("unchecked")
+	private WindowingInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception {
 		// clone the policies
 		LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
 		LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
@@ -250,10 +252,17 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 			clonedDistributedEvictionPolicies.add(eviction.clone());
 		}
 
-		@SuppressWarnings("unchecked")
-		WindowingInvokable<IN> groupInvokable = new WindowingInvokable<IN>(
-				(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
-				clonedDistributedEvictionPolicies);
+		WindowingInvokable<IN, OUT> groupInvokable;
+		if (userFunction instanceof ReduceFunction) {
+			groupInvokable = (WindowingInvokable<IN, OUT>) new WindowingReduceInvokable<IN>(
+					(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
+					clonedDistributedEvictionPolicies);
+		} else {
+			groupInvokable = new WindowingGroupInvokable<IN, OUT>(
+					(GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies,
+					clonedDistributedEvictionPolicies);
+		}
+
 		groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
 		groupInvokable.open(this.parameters);
 		windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
@@ -305,7 +314,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
 
 		@Override
 		public void sendFakeElement(IN datapoint) {
-			for (WindowingInvokable<IN> group : windowingGroups.values()) {
+			for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
 				group.processFakeElement(datapoint, policy);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
deleted file mode 100755
index c69816d4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-	private long startTime;
-	protected long nextRecordTime;
-	protected TimeStamp<IN> timestamp;
-	protected StreamWindow window;
-
-	public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
-			long slideInterval, TimeStamp<IN> timestamp) {
-		super(reduceFunction, windowSize, slideInterval);
-		this.timestamp = timestamp;
-		this.startTime = timestamp.getStartTime();
-		this.window = new StreamWindow();
-		this.batch = this.window;
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-		if (timestamp instanceof DefaultTimeStamp) {
-			(new TimeCheck()).start();
-		}
-	}
-
-	protected class StreamWindow extends StreamBatch {
-
-		private static final long serialVersionUID = 1L;
-
-		public StreamWindow() {
-			super();
-		}
-
-		@Override
-		public void addToBuffer(IN nextValue) throws Exception {
-			checkWindowEnd(timestamp.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList.add(nextValue);
-			}
-		}
-
-		protected synchronized void checkWindowEnd(long timeStamp) {
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				circularList.newSlide();
-				minibatchCounter++;
-				if (batchEnd()) {
-					reduceBatch();
-					circularList.shiftWindow(batchPerSlide);
-				}
-			}
-		}
-
-		@Override
-		protected boolean miniBatchEnd() {
-			if (nextRecordTime < startTime + granularity) {
-				return false;
-			} else {
-				startTime += granularity;
-				return true;
-			}
-		}
-
-	}
-
-	private class TimeCheck extends Thread {
-		@Override
-		public void run() {
-			while (true) {
-				try {
-					Thread.sleep(slideSize);
-				} catch (InterruptedException e) {
-				}
-				if (isRunning) {
-					window.checkWindowEnd(System.currentTimeMillis());
-				} else {
-					break;
-				}
-			}
-		}
-	}
-
-}