You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:49 UTC

[06/18] git commit: [streaming] window and batch operator added to DataStream + Documentation updated accordingly

[streaming] window and batch operator added to DataStream + Documentation updated accordingly


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5f601cf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5f601cf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5f601cf9

Branch: refs/heads/master
Commit: 5f601cf9b18fef0b54a92e42405c0179e639f5da
Parents: 47d02a0
Author: gyfora <gy...@gmail.com>
Authored: Mon Sep 8 02:05:20 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  50 ++--
 .../api/datastream/BatchedDataStream.java       | 238 +++++++++++++++++++
 .../streaming/api/datastream/DataStream.java    | 197 ++++++---------
 .../api/datastream/GroupedDataStream.java       |   8 +-
 .../api/datastream/WindowDataStream.java        |  87 +++++++
 .../GroupedWindowGroupReduceInvokable.java      |   4 +-
 .../operator/GroupedWindowReduceInvokable.java  |  19 +-
 .../operator/WindowGroupReduceInvokable.java    |  12 +-
 .../operator/WindowReduceInvokable.java         |  18 +-
 .../api/invokable/util/DefaultTimeStamp.java    |  39 +++
 .../api/invokable/util/DefaultTimestamp.java    |  34 ---
 .../streaming/api/invokable/util/TimeStamp.java |  46 ++++
 .../streaming/api/invokable/util/Timestamp.java |  38 ---
 .../WindowGroupReduceInvokableTest.java         |  19 +-
 .../operator/WindowReduceInvokableTest.java     |  18 +-
 .../ml/IncrementalLearningSkeleton.java         |   6 +-
 16 files changed, 561 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 04e2f2e..6ed53df 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -242,42 +242,42 @@ Merges two or more `DataStream` instances creating a new DataStream containing a
 dataStream.merge(otherStream1, otherStream2…)
 ```
 
+### Grouped operators
+
+Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`:
+
+#### Reduce on GroupedDataStream
+When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream.
+
+### Aggregations
+
+The Flink streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion.
+
+Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`
+
+For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, 0 is used as default. 
+
 ### Window/Batch operators
 
 Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default.
 
-#### Window reduce
-The transformation calls a user-defined `GroupReduceFunction` on records received during the predefined time window. The window is shifted after each reduce call.
-A window reduce that sums the elements in the last minute with 10 seconds stepsize:
+When applied to grouped data streams the operators applied will be executed on groups of elements grouped by the selected key position.
 
-```java
-dataStream.windowReduce(new GroupReduceFunction<Integer, Integer>() {
-			@Override
-			public void reduce(Iterable<Integer> values, Collector<Integer> out) throws Exception {
-				Integer sum = 0;
-				for(Integer val: values){
-					sum+=val;
-				}
-			}
-		}, 60000, 10000);
-```
+#### Reduce on windowed/batched data streams
+The transformation calls a user-defined `ReduceFunction` on records received in the batch or during the predefined time window. The window is shifted after each reduce call. The user can also use the different streaming aggregations.
 
-#### Batch reduce
-The transformation calls a `GroupReduceFunction` for each data batch of the predefined size. The batch slides by the predefined number of elements after each call. Works similarly to window reduce.
+A window reduce that sums the elements in the last minute with 10 seconds slide interval:
 
 ```java
-dataStream.batchReduce(reducer, batchSize, slideSize)
+dataStream.window(60000, 10000).sum();
 ```
 
-### Grouped operators
-
-Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`:
-
-#### Reduce on GroupedDataStream
-When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream.
+#### ReduceGroup on windowed/batched data streams
+The transformation calls a `GroupReduceFunction` for each data batch or data window. The batch/window slides by the predefined number of elements/time after each call.
 
-#### Window/Batchreduce on GroupedDataStream
-Similarly to the grouped reduce operator the window and batch reduce operators work the same way as in the non-grouped case except that in a data window/batch every `GroupReduceFunction` call will receive data elements for only the same keys.
+```java
+dataStream.batch(1000, 100).reduceGroup(reducer);
+```
 
 ### Co operators
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
new file mode 100755
index 0000000..0aa5de6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
+import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.types.TypeInformation;
+
+/**
+ * A {@link BatchedDataStream} represents a data stream whose elements are
+ * batched together in a sliding batch. operations like
+ * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
+ * are applied for each batch and the batch is slid afterwards.
+ *
+ * @param <OUT>
+ *            The output type of the {@link BatchedDataStream}
+ */
+public class BatchedDataStream<OUT> {
+
+	protected DataStream<OUT> dataStream;
+	protected boolean isGrouped;
+	protected int keyPosition;
+	protected long batchSize;
+	protected long slideSize;
+
+	protected BatchedDataStream(DataStream<OUT> dataStream, long batchSize, long slideSize) {
+		if (dataStream instanceof GroupedDataStream) {
+			this.isGrouped = true;
+			this.keyPosition = ((GroupedDataStream<OUT>) dataStream).keyPosition;
+		} else {
+			this.isGrouped = false;
+		}
+		this.dataStream = dataStream.copy();
+		this.batchSize = batchSize;
+		this.slideSize = slideSize;
+	}
+
+	protected BatchedDataStream(BatchedDataStream<OUT> batchedDataStream) {
+		this.dataStream = batchedDataStream.dataStream.copy();
+		this.isGrouped = batchedDataStream.isGrouped;
+		this.keyPosition = batchedDataStream.keyPosition;
+		this.batchSize = batchedDataStream.batchSize;
+		this.slideSize = batchedDataStream.slideSize;
+	}
+
+	/**
+	 * Groups the elements of the {@link BatchedDataStream} by the given key
+	 * position to be used with grouped operators.
+	 * 
+	 * @param keyPosition
+	 *            The position of the field on which the
+	 *            {@link BatchedDataStream} will be grouped.
+	 * @return The transformed {@link BatchedDataStream}
+	 */
+	public BatchedDataStream<OUT> groupBy(int keyPosition) {
+		return new BatchedDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize);
+	}
+
+	/**
+	 * Applies a reduce transformation on every sliding batch/window of the data
+	 * stream. If the data stream is grouped then the reducer is applied on
+	 * every group of elements sharing the same key. This type of reduce is much
+	 * faster than reduceGroup since the reduce function can be applied
+	 * incrementally. The user can also extend the {@link RichReduceFunction} to
+	 * gain access to other features provided by the {@link RichFuntion}
+	 * interface.
+	 * 
+	 * @param reducer
+	 *            The {@link ReduceFunction} that will be called for every
+	 *            element of the input values in the batch/window.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
+		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
+				ReduceFunction.class, 0), getReduceInvokable(reducer));
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on preset batches/windows of the
+	 * DataStream. The transformation calls a {@link GroupReduceFunction} for
+	 * each batch/window. Each GroupReduceFunction call can return any number of
+	 * elements including none. The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * @param reducer
+	 *            The {@link GroupReduceFunction} that will be called for every
+	 *            batch/window.
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) {
+		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer, GroupReduceFunction.class,
+				1), getGroupReduceInvokable(reducer));
+	}
+
+	/**
+	 * Applies an aggregation that sums every sliding batch/window of the data
+	 * stream at the given position.
+	 * 
+	 * @param positionToSum
+	 *            The position in the data point to sum
+	 * @return The transformed DataStream.
+	 */
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
+		dataStream.checkFieldRange(positionToSum);
+		return aggregate((AggregationFunction<OUT>) SumAggregationFunction.getSumFunction(
+				positionToSum, dataStream.getClassAtPos(positionToSum)));
+	}
+
+	/**
+	 * Syntactic sugar for sum(0)
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum() {
+		return sum(0);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum of every sliding
+	 * batch/window of the data stream at the given position.
+	 * 
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
+		dataStream.checkFieldRange(positionToMin);
+		return aggregate(new MinAggregationFunction<OUT>(positionToMin));
+	}
+
+	/**
+	 * Syntactic sugar for min(0)
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min() {
+		return min(0);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum of every sliding
+	 * batch/window of the data stream at the given position.
+	 * 
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+		dataStream.checkFieldRange(positionToMax);
+		return aggregate(new MaxAggregationFunction<OUT>(positionToMax));
+	}
+
+	/**
+	 * Syntactic sugar for max(0)
+	 * 
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max() {
+		return max(0);
+	}
+
+	/**
+	 * Gets the output type.
+	 * 
+	 * @return The output type.
+	 */
+	public TypeInformation<OUT> getOutputType() {
+		return dataStream.getOutputType();
+	}
+
+	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
+		BatchReduceInvokable<OUT> invokable = getReduceInvokable(aggregate);
+
+		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
+				aggregate, null, null, invokable);
+
+		dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId());
+		return returnStream;
+	}
+
+	protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+		BatchReduceInvokable<OUT> invokable;
+		if (isGrouped) {
+			invokable = new GroupedBatchReduceInvokable<OUT>(reducer, batchSize, slideSize,
+					keyPosition);
+		} else {
+			invokable = new BatchReduceInvokable<OUT>(reducer, batchSize, slideSize);
+		}
+		return invokable;
+	}
+
+	protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
+			GroupReduceFunction<OUT, R> reducer) {
+		BatchGroupReduceInvokable<OUT, R> invokable;
+		if (isGrouped) {
+			invokable = new GroupedBatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
+					keyPosition);
+		} else {
+			invokable = new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize);
+		}
+		return invokable;
+	}
+
+	protected BatchedDataStream<OUT> copy() {
+		return new BatchedDataStream<OUT>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 70348d6..bebda91 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -26,12 +26,10 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -49,14 +47,12 @@ import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.partitioner.FieldsPartitioner;
@@ -385,160 +381,113 @@ public class DataStream<OUT> {
 				ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
 	}
 
-	public GroupedDataStream<OUT> groupBy(int keyPosition) {
-		return new GroupedDataStream<OUT>(this, keyPosition);
-	}
-
 	/**
-	 * Applies a reduce transformation on preset chunks of the DataStream. The
-	 * transformation calls a {@link GroupReduceFunction} for each tuple batch
-	 * of the predefined size. Each GroupReduceFunction call can return any
-	 * number of elements including none. The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
+	 * Groups the elements of a {@link DataStream} by the given key position to
+	 * be used with grouped operators like
+	 * {@link GroupedDataStream#reduce(ReduceFunction)}
 	 * 
-	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each tuple batch.
-	 * @param batchSize
-	 *            The number of tuples grouped together in the batch.
-	 * @param <R>
-	 *            output type
-	 * @return The transformed {@link DataStream}.
+	 * @param keyPosition
+	 *            The position of the field on which the {@link DataStream} will
+	 *            be grouped.
+	 * @return The transformed {@link DataStream}
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
-			long batchSize) {
-		return batchReduce(reducer, batchSize, batchSize);
+	public GroupedDataStream<OUT> groupBy(int keyPosition) {
+		return new GroupedDataStream<OUT>(this, keyPosition);
 	}
 
 	/**
-	 * Applies a reduce transformation on preset sliding chunks of the
-	 * DataStream. The transformation calls a {@link GroupReduceFunction} for
-	 * each tuple batch of the predefined size. The tuple batch gets slid by the
-	 * given number of tuples. Each GroupReduceFunction call can return any
-	 * number of elements including none. The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
-	 * 
+	 * Collects the data stream elements into sliding batches creating a new
+	 * {@link BatchedDataStream}. The user can apply transformations like
+	 * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
+	 * or aggregations on the {@link BatchedDataStream}.
 	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each tuple batch.
 	 * @param batchSize
-	 *            The number of tuples grouped together in the batch.
+	 *            The number of elements in each batch at each operator
 	 * @param slideSize
-	 *            The number of tuples the batch is slid by.
-	 * @param <R>
-	 *            output type
-	 * @return The transformed {@link DataStream}.
+	 *            The number of elements with which the batches are slid by
+	 *            after each transformation.
+	 * @return The transformed {@link DataStream}
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
-			long batchSize, long slideSize) {
+	public BatchedDataStream<OUT> batch(long batchSize, long slideSize) {
 		if (batchSize < 1) {
 			throw new IllegalArgumentException("Batch size must be positive");
 		}
 		if (slideSize < 1) {
 			throw new IllegalArgumentException("Slide size must be positive");
 		}
-
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
-				GroupReduceFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
-				GroupReduceFunction.class, 1);
-
-		return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
-				new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize));
+		return new BatchedDataStream<OUT>(this, batchSize, slideSize);
 	}
 
 	/**
-	 * Applies a reduce transformation on preset "time" chunks of the
-	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
-	 * records received during the predefined time window. The window is shifted
-	 * after each reduce call. Each GroupReduceFunction call can return any
-	 * number of elements including none.The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
-	 * 
+	 * Collects the data stream elements into sliding batches creating a new
+	 * {@link BatchedDataStream}. The user can apply transformations like
+	 * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
+	 * or aggregations on the {@link BatchedDataStream}.
 	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @param <R>
-	 *            output type
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize) {
-		return windowReduce(reducer, windowSize, windowSize);
-	}
-
-	/**
-	 * Applies a reduce transformation on preset "time" chunks of the
-	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
-	 * records received during the predefined time window. The window is shifted
-	 * after each reduce call. Each GroupReduceFunction call can return any
-	 * number of elements including none.The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
-	 * 
-	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
-	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
-	 * @param slideInterval
-	 *            The time interval, batch is slid by.
-	 * @param <R>
-	 *            output type
-	 * @return The transformed DataStream.
+	 * @param batchSize
+	 *            The number of elements in each batch at each operator
+	 * @return The transformed {@link DataStream}
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize, long slideInterval) {
-		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+	public BatchedDataStream<OUT> batch(long batchSize) {
+		return batch(batchSize, batchSize);
 	}
 
 	/**
-	 * Applies a reduce transformation on preset "time" chunks of the
-	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
-	 * records received during the predefined time window. The window is shifted
-	 * after each reduce call. Each GroupReduceFunction call can return any
-	 * number of elements including none. The time is determined by a
-	 * user-defined timestamp. The user can also extend
-	 * {@link RichGroupReduceFunction} to gain access to other features provided
-	 * by the {@link RichFuntion} interface.
+	 * Collects the data stream elements into sliding windows creating a new
+	 * {@link WindowDataStream}. The user can apply transformations like
+	 * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
+	 * aggregations on the {@link WindowDataStream}.
 	 * 
-	 * 
-	 * @param reducer
-	 *            The GroupReduceFunction that is called for each time window.
 	 * @param windowSize
-	 *            SingleOutputStreamOperator The time window to run the reducer
-	 *            on, in milliseconds.
+	 *            The length of the window in milliseconds.
 	 * @param slideInterval
-	 *            The time interval, batch is slid by.
+	 *            The number of milliseconds with which the windows are slid by
+	 *            after each transformation.
 	 * @param timestamp
-	 *            Timestamp function to retrieve a timestamp from an element.
-	 * @param <R>
-	 *            output type
-	 * @return The transformed DataStream.
+	 *            User defined function for extracting time-stamps from each
+	 *            element
+	 * @return The transformed {@link DataStream}
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
+	public WindowDataStream<OUT> window(long windowSize, long slideInterval,
+			TimeStamp<OUT> timestamp) {
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
 		}
 		if (slideInterval < 1) {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
+		return new WindowDataStream<OUT>(this, windowSize, slideInterval, timestamp);
+	}
 
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
-				GroupReduceFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
-				GroupReduceFunction.class, 1);
+	/**
+	 * Collects the data stream elements into sliding windows creating a new
+	 * {@link WindowDataStream}. The user can apply transformations like
+	 * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
+	 * aggregations on the {@link WindowDataStream}.
+	 * 
+	 * @param windowSize
+	 *            The length of the window in milliseconds.
+	 * @param slideInterval
+	 *            The number of milliseconds with which the windows are slid by
+	 *            after each transformation.
+	 * @return The transformed {@link DataStream}
+	 */
+	public WindowDataStream<OUT> window(long windowSize, long slideInterval) {
+		return window(windowSize, slideInterval, new DefaultTimeStamp<OUT>());
+	}
 
-		return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
-				new WindowGroupReduceInvokable<OUT, R>(reducer, windowSize, slideInterval, timestamp));
+	/**
+	 * Collects the data stream elements into sliding windows creating a new
+	 * {@link WindowDataStream}. The user can apply transformations like
+	 * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
+	 * aggregations on the {@link WindowDataStream}.
+	 * 
+	 * @param windowSize
+	 *            The length of the window in milliseconds.
+	 * @return The transformed {@link DataStream}
+	 */
+	public WindowDataStream<OUT> window(long windowSize) {
+		return window(windowSize, windowSize);
 	}
 
 	/**
@@ -1115,7 +1064,7 @@ public class DataStream<OUT> {
 	 * 
 	 * @return The copy
 	 */
-	protected DataStream<OUT> copy(){
+	protected DataStream<OUT> copy() {
 		return new DataStream<OUT>(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index e513f2d..138a6f8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
@@ -166,7 +166,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
 			long windowSize, long slideInterval) {
-		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp<OUT>());
+		return windowReduce(reducer, windowSize, slideInterval, new DefaultTimeStamp<OUT>());
 	}
 
 	/**
@@ -191,7 +191,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
-			long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
+			long windowSize, long slideInterval, TimeStamp<OUT> timestamp) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
 				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
 				GroupReduceFunction.class, 1), new GroupedWindowGroupReduceInvokable<OUT, R>(reducer,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
new file mode 100755
index 0000000..4756050
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+/**
+ * A {@link WindowDataStream} represents a data stream whose elements are
+ * batched together in a sliding window. operations like
+ * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
+ * are applied for each window and the window is slid afterwards.
+ *
+ * @param <OUT>
+ *            The output type of the {@link WindowDataStream}
+ */
+public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
+
+	TimeStamp<OUT> timeStamp;
+
+	protected WindowDataStream(DataStream<OUT> dataStream, long windowSize, long slideInterval,
+			TimeStamp<OUT> timeStamp) {
+		super(dataStream, windowSize, slideInterval);
+		this.timeStamp = timeStamp;
+	}
+
+	protected WindowDataStream(WindowDataStream<OUT> windowDataStream) {
+		super(windowDataStream);
+		this.timeStamp = windowDataStream.timeStamp;
+	}
+
+	public WindowDataStream<OUT> groupBy(int keyPosition) {
+		return new WindowDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize,
+				timeStamp);
+	}
+
+	protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+		BatchReduceInvokable<OUT> invokable;
+		if (isGrouped) {
+			invokable = new GroupedWindowReduceInvokable<OUT>(reducer, batchSize, slideSize,
+					keyPosition, timeStamp);
+		} else {
+			invokable = new WindowReduceInvokable<OUT>(reducer, batchSize, slideSize, timeStamp);
+		}
+		return invokable;
+	}
+
+	protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
+			GroupReduceFunction<OUT, R> reducer) {
+		BatchGroupReduceInvokable<OUT, R> invokable;
+		if (isGrouped) {
+			invokable = new GroupedWindowGroupReduceInvokable<OUT, R>(reducer, batchSize,
+					slideSize, keyPosition, timeStamp);
+		} else {
+			invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
+					timeStamp);
+		}
+		return invokable;
+	}
+
+	public WindowDataStream<OUT> copy() {
+		return new WindowDataStream<OUT>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
index 4027b78..865dced 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.MutableTableState;
 
@@ -33,7 +33,7 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
 	private MutableTableState<Object, List<IN>> values;
 
 	public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
-			long slideInterval, int keyPosition, Timestamp<IN> timestamp) {
+			long slideInterval, int keyPosition, TimeStamp<IN> timestamp) {
 		super(reduceFunction, windowSize, slideInterval, timestamp);
 		this.keyPosition = keyPosition;
 		this.reducer = reduceFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
index e202e86..df94843 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java
@@ -18,33 +18,26 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.SlidingWindowState;
 
 public class GroupedWindowReduceInvokable<OUT> extends GroupedBatchReduceInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;
-	protected transient SlidingWindowState<Map<Object, OUT>> state;
 
-	private Timestamp<OUT> timestamp;
+	private TimeStamp<OUT> timestamp;
 	private long startTime;
 	private long nextRecordTime;
 
 	public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
-			long slideInterval, Timestamp<OUT> timestamp, int keyPosition) {
+			long slideInterval, int keyPosition, TimeStamp<OUT> timestamp) {
 		super(reduceFunction, windowSize, slideInterval, keyPosition);
 		this.timestamp = timestamp;
+		this.startTime = timestamp.getStartTime();
 	}
-	
-	@Override
-	protected void initializeAtFirstRecord() {
-		startTime = nextRecordTime - (nextRecordTime % granularity);
-	}
-	
+
 	@Override
 	protected StreamRecord<OUT> getNextRecord() throws IOException {
 		reuse = recordIterator.next(reuse);
@@ -53,7 +46,7 @@ public class GroupedWindowReduceInvokable<OUT> extends GroupedBatchReduceInvokab
 		}
 		return reuse;
 	}
-	
+
 	@Override
 	protected boolean batchNotFull() {
 		if (nextRecordTime < startTime + granularity) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 7b4317a..03c19d4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -20,24 +20,20 @@ package org.apache.flink.streaming.api.invokable.operator;
 import java.io.IOException;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private long startTime;
 	private long nextRecordTime;
-	private Timestamp<IN> timestamp;
+	private TimeStamp<IN> timestamp;
 
 	public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
-			long slideInterval, Timestamp<IN> timestamp) {
+			long slideInterval, TimeStamp<IN> timestamp) {
 		super(reduceFunction, windowSize, slideInterval);
 		this.timestamp = timestamp;
-	}
-
-	@Override
-	protected void initializeAtFirstRecord() {
-		startTime = nextRecordTime - (nextRecordTime % granularity);
+		this.startTime = timestamp.getStartTime();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 0f13397..bd51c65 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -21,25 +21,21 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
 	private static final long serialVersionUID = 1L;
 	private long startTime;
 	private long nextRecordTime;
-	private Timestamp<OUT> timestamp;
+	private TimeStamp<OUT> timestamp;
 	private String nullElement = "nullElement";
 
 	public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
-			long slideInterval, Timestamp<OUT> timestamp) {
+			long slideInterval, TimeStamp<OUT> timestamp) {
 		super(reduceFunction, windowSize, slideInterval);
 		this.timestamp = timestamp;
-	}
-
-	@Override
-	protected void initializeAtFirstRecord() {
-		startTime = nextRecordTime - (nextRecordTime % granularity);
+		this.startTime = timestamp.getStartTime();
 	}
 
 	protected StreamRecord<OUT> getNextRecord() throws IOException {
@@ -59,7 +55,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
 			return false;
 		}
 	}
-	
+
 	@Override
 	protected void collectOneUnit() throws Exception {
 		OUT reduced = null;
@@ -71,9 +67,9 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
 				resetReuse();
 			}
 		}
-		if(reduced!=null){
+		if (reduced != null) {
 			state.pushBack(reduced);
-		}else{
+		} else {
 			state.pushBack(nullElement);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
new file mode 100644
index 0000000..b6186e1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+/**
+ * Default timestamp function that uses the Java System.currentTimeMillis()
+ * method to retrieve a timestamp.
+ *
+ * @param <T>
+ *            Type of the inputs of the reducing function.
+ */
+public class DefaultTimeStamp<T> implements TimeStamp<T> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public long getTimestamp(T value) {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public long getStartTime() {
+		return System.currentTimeMillis();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
deleted file mode 100644
index 8276a01..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-/**
- * Default timestamp function that uses the Java System.currentTimeMillis()
- * method to retrieve a timestamp.
- *
- * @param <T>
- *            Type of the inputs of the reducing function.
- */
-public class DefaultTimestamp<T> implements Timestamp<T> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public long getTimestamp(T value) {
-		return System.currentTimeMillis();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
new file mode 100644
index 0000000..27447d7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.util;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ *            Type of the value to create the timestamp from.
+ */
+public interface TimeStamp<T> extends Serializable {
+
+	/**
+	 * Values
+	 * 
+	 * @param value
+	 *            The value to create the timestamp from
+	 * @return The timestamp
+	 */
+	public long getTimestamp(T value);
+
+	/**
+	 * Function to define the starting time for reference
+	 * 
+	 * @return The starting timestamp
+	 */
+	public long getStartTime();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
deleted file mode 100644
index 91758e8..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- *            Type of the value to create the timestamp from.
- */
-public interface Timestamp<T> extends Serializable {
-
-	/**
-	 * Values
-	 * @param value
-	 * The value to create the timestamp from
-	 * @return The timestamp
-	 */
-	public long getTimestamp(T value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
index 7437bec..097e391 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.MockInvokable;
 import org.apache.flink.util.Collector;
 import org.junit.Before;
@@ -45,13 +45,15 @@ public class WindowGroupReduceInvokableTest {
 		}
 	}
 
-	public static final class MyTimestamp implements Timestamp<Integer> {
+	public static final class MyTimestamp implements TimeStamp<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		private Iterator<Long> timestamps;
+		private long start;
 
 		public MyTimestamp(List<Long> timestamps) {
 			this.timestamps = timestamps.iterator();
+			this.start = timestamps.get(0);
 		}
 
 		@Override
@@ -59,6 +61,11 @@ public class WindowGroupReduceInvokableTest {
 			long ts = timestamps.next();
 			return ts;
 		}
+
+		@Override
+		public long getStartTime() {
+			return start;
+		}
 	}
 
 	private final static String EOW = "|";
@@ -81,16 +88,16 @@ public class WindowGroupReduceInvokableTest {
 		slideSize = 5;
 		timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
 		expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, "3", "4", "5", "6", EOW, "3",
-				"4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8",
-				EOW, "7", "8", "9", EOW, "9", "10", EOW));
+				"4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", EOW, "7",
+				"8", "9", EOW, "8", "9", "10", EOW));
 		invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
 				windowSize, slideSize, new MyTimestamp(timestamps)));
 
 		windowSize = 10;
 		slideSize = 4;
 		timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
-		expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", EOW, "3", "4", "5", "6",
-				EOW, "4", "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "7", "8", "9", EOW, "9",
+		expectedResults.add(Arrays.asList("1", "2","3" ,EOW, "3", "4", "5","6", EOW, "3", "4", "5", "6",
+				EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8","9",
 				"10", EOW));
 		invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
 				windowSize, slideSize, new MyTimestamp(timestamps)));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
index 1aed25f..ff0951d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.util.MockInvokable;
 import org.junit.Test;
 
@@ -52,13 +52,18 @@ public class WindowReduceInvokableTest {
 					public Integer reduce(Integer value1, Integer value2) throws Exception {
 						return value1 + value2;
 					}
-				}, 4, 2, new Timestamp<Integer>() {
+				}, 4, 2, new TimeStamp<Integer>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public long getTimestamp(Integer value) {
 						return value;
 					}
+
+					@Override
+					public long getStartTime() {
+						return 0;
+					}
 				});
 
 		List<Integer> expected = new ArrayList<Integer>();
@@ -86,14 +91,19 @@ public class WindowReduceInvokableTest {
 							Tuple2<String, Integer> value2) throws Exception {
 						return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
 					}
-				}, 3, 2, new Timestamp<Tuple2<String, Integer>>() {
+				}, 3, 2, 0, new TimeStamp<Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public long getTimestamp(Tuple2<String, Integer> value) {
 						return value.f1;
 					}
-				}, 0);
+
+					@Override
+					public long getStartTime() {
+						return 1;
+					}
+				});
 
 		List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
 		expected2.add(new Tuple2<String, Integer>("a", 6));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index d80b937..a433fd0 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -119,12 +119,12 @@ public class IncrementalLearningSkeleton {
 
 	public static void main(String[] args) {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM).setBufferTimeout(1000);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
+				PARALLELISM).setBufferTimeout(1000);
 
 		// Build new model on every second of new data
 		DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
-				.windowReduce(new PartialModelBuilder(), 5000);
+				.window(5000).reduceGroup(new PartialModelBuilder());
 
 		// Use partial model for prediction
 		DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)