You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:04 UTC

[08/12] flink git commit: [FLINK-2753] [streaming] [api breaking] Add first parts of new window API for key grouped windows

[FLINK-2753] [streaming] [api breaking] Add first parts of new window API for key grouped windows

This follows the API design outlined in https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

This is API breaking because it adds new generic type parameters to Java and Scala classes, breaking binary compatibility.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e20299c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e20299c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e20299c

Branch: refs/heads/master
Commit: 7e20299c4e2d9cc78c36f90bdf0acdbaf72062b0
Parents: 501a9b0
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 23 12:05:54 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/TimeCharacteristic.java |  81 +++++++++++
 .../api/datastream/ConnectedDataStream.java     |   4 +-
 .../streaming/api/datastream/DataStream.java    |  26 ++--
 .../api/datastream/GroupedDataStream.java       |   8 +-
 .../api/datastream/KeyedDataStream.java         |  51 ++++++-
 .../api/datastream/KeyedWindowDataStream.java   | 135 +++++++++++++++++++
 .../api/datastream/WindowedDataStream.java      |   4 +-
 .../environment/StreamExecutionEnvironment.java |  47 ++++++-
 .../functions/windows/KeyedWindowFunction.java  |   6 +-
 .../windowpolicy/AbstractTimePolicy.java        | 109 +++++++++++++++
 .../api/windowing/windowpolicy/EventTime.java   |  64 +++++++++
 .../windowing/windowpolicy/ProcessingTime.java  |  65 +++++++++
 .../api/windowing/windowpolicy/Time.java        |  68 ++++++++++
 .../windowing/windowpolicy/WindowPolicy.java    |  57 ++++++++
 .../windows/AccumulatingKeyedTimePanes.java     |   8 +-
 ...ccumulatingProcessingTimeWindowOperator.java |   4 +-
 .../operators/windows/PolicyToOperator.java     |  82 +++++++++++
 .../streaming/util/keys/KeySelectorUtil.java    |  17 ++-
 .../api/state/StatefulOperatorTest.java         |   8 +-
 .../GroupedProcessingTimeWindowExample.java     |  79 +++--------
 .../flink/streaming/api/scala/DataStream.scala  |  17 +--
 .../streaming/api/scala/GroupedDataStream.scala |   3 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  25 +++-
 .../flink/streaming/api/scala/package.scala     |   4 +-
 24 files changed, 848 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
new file mode 100644
index 0000000..1ad3c99
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The time characteristic defines how the system determines time for time-dependent
+ * order and operations that depend on time (such as time windows).
+ */
+public enum TimeCharacteristic {
+
+	/**
+	 * Processing time for operators means that the operator uses the system clock of the machine
+	 * to determine the current time of the data stream. Processing-time windows trigger based
+	 * on wall-clock time and include whatever elements happen to have arrived at the operator at
+	 * that point in time.
+	 * <p>
+	 * Using processing time for window operations results in general in quite non-deterministic results,
+	 * because the contents of the windows depends on the speed in which elements arrive. It is, however,
+	 * the cheapest method of forming windows and the method that introduces the least latency.
+	 */
+	ProcessingTime,
+
+	/**
+	 * Ingestion time means that the time of each individual element in the stream is determined
+	 * when the element enters the Flink streaming data flow. Operations like windows group the
+	 * elements based on that time, meaning that processing speed within the streaming dataflow
+	 * does not affect windowing, but only the speed at which sources receive elements.
+	 * <p>
+	 * Ingestion time is often a good compromise between more processing time and event time.
+	 * It does not need and special manual form of watermark generation, and events are typically
+	 * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can 
+	 * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
+	 * are not very much out-of-order means that the latency increase is moderate, compared to event
+	 * time.
+	 */
+	IngestionTime,
+
+	/**
+	 * Event time means that the time of each individual element in the stream (also called event)
+	 * is determined by the event's individual custom timestamp. These timestamps either exist in the
+	 * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
+	 * The big implication of this is that elements arrive in the sources and in all operators generally
+	 * out of order, meaning that elements with earlier timestamps may arrive after elements with
+	 * later timestamps.
+	 * <p>
+	 * Operators that window or order data with respect to event time must buffer data until they can
+	 * be sure that all timestamps for a certain time interval have been received. This is handled by
+	 * the so called "time watermarks".
+	 * <p>
+	 * Operations based on event time are very predictable - the result of windowing operations
+	 * is typically identical no matter when the window is executed and how fast the streams operate.
+	 * At the same time, the buffering and tracking of event time is also costlier than operating
+	 * with processing time, and typically also introduces more latency. The amount of extra
+	 * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
+	 * between the arrival of early and late elements is. With respect to the "time watermarks", this
+	 * means that teh cost typically depends on how early or late the watermarks for can be generated
+	 * for their timestamp.
+	 * <p>
+	 * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
+	 * original time, rather than the time assigned at the data source. Practically, that means that
+	 * event time has generally more meaning, but also that it takes longer to determine that all
+	 * elements for a certain time have arrived.
+	 */
+	EventTime
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 8609a30..0406e35 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -67,8 +67,8 @@ public class ConnectedDataStream<IN1, IN2> {
 
 		if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
 			this.isGrouped = true;
-			this.keySelector1 = ((GroupedDataStream<IN1>) input1).keySelector;
-			this.keySelector2 = ((GroupedDataStream<IN2>) input2).keySelector;
+			this.keySelector1 = ((GroupedDataStream<IN1, ?>) input1).keySelector;
+			this.keySelector2 = ((GroupedDataStream<IN2, ?>) input2).keySelector;
 		} else {
 			this.isGrouped = false;
 			this.keySelector1 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d92498c..5dfb1e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -229,8 +229,8 @@ public class DataStream<T> {
 	 *            The KeySelector to be used for extracting the key for partitioning
 	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
 	 */
-	public KeyedDataStream<T> keyBy(KeySelector<T,?> key){
-		return new KeyedDataStream<T>(this, clean(key));
+	public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){
+		return new KeyedDataStream<T, K>(this, clean(key));
 	}
 
 	/**
@@ -241,7 +241,7 @@ public class DataStream<T> {
 	 *            will be grouped.
 	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
 	 */
-	public KeyedDataStream<T> keyBy(int... fields) {
+	public KeyedDataStream<T, Tuple> keyBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
 			return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
 		} else {
@@ -260,12 +260,12 @@ public class DataStream<T> {
 	 *            partitioned.
 	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
 	 **/
-	public KeyedDataStream<T> keyBy(String... fields) {
+	public KeyedDataStream<T, Tuple> keyBy(String... fields) {
 		return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
 	}
 
-	private KeyedDataStream<T> keyBy(Keys<T> keys) {
-		return new KeyedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
+	private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) {
+		return new KeyedDataStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
 				getType(), getExecutionConfig())));
 	}
 	
@@ -279,7 +279,7 @@ public class DataStream<T> {
 	 *            will be partitioned.
 	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
 	 */
-	public GroupedDataStream<T> groupBy(int... fields) {
+	public GroupedDataStream<T, Tuple> groupBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
 			return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
 		} else {
@@ -304,7 +304,7 @@ public class DataStream<T> {
 	 *            grouped.
 	 * @return The grouped {@link DataStream}
 	 **/
-	public GroupedDataStream<T> groupBy(String... fields) {
+	public GroupedDataStream<T, Tuple> groupBy(String... fields) {
 		return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
 	}
 
@@ -322,13 +322,13 @@ public class DataStream<T> {
 	 *            the values
 	 * @return The grouped {@link DataStream}
 	 */
-	public GroupedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
-		return new GroupedDataStream<T>(this, clean(keySelector));
+	public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> keySelector) {
+		return new GroupedDataStream<T, K>(this, clean(keySelector));
 	}
 
-	private GroupedDataStream<T> groupBy(Keys<T> keys) {
-		return new GroupedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
-				getType(), getExecutionConfig())));
+	private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) {
+		return new GroupedDataStream<T, Tuple>(this, 
+				clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a1106bc..50bf341 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
  * @param <OUT>
  *            The output type of the {@link GroupedDataStream}.
  */
-public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
+public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 
 	/**
 	 * Creates a new {@link GroupedDataStream}, group inclusion is determined using
@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
 	 * @param dataStream Base stream of data
 	 * @param keySelector Function for determining group inclusion
 	 */
-	public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) {
+	public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) {
 		super(dataStream, keySelector);
 	}
 
@@ -324,8 +324,6 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
 
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 		StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
-		SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
-				getType(), operator);
-		return returnStream;
+		return transform("Grouped Aggregation", getType(), operator);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index 100e5de..a32cf53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
@@ -32,11 +33,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy.
  * 
  * 
- * @param <T> The type of the elements in the Keyed Stream
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <K> The type of the key in the Keyed Stream.
  */
-public class KeyedDataStream<T> extends DataStream<T> {
+public class KeyedDataStream<T, K> extends DataStream<T> {
 	
-	protected final KeySelector<T, ?> keySelector;
+	protected final KeySelector<T, K> keySelector;
 
 	/**
 	 * Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
@@ -47,35 +49,70 @@ public class KeyedDataStream<T> extends DataStream<T> {
 	 * @param keySelector
 	 *            Function for determining state partitions
 	 */
-	public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, ?> keySelector) {
+	public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
 		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
 		this.keySelector = keySelector;
 	}
 
-	public KeySelector<T, ?> getKeySelector() {
+	
+	public KeySelector<T, K> getKeySelector() {
 		return this.keySelector;
 	}
 
+	
 	@Override
 	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
 		throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream.");
 	}
 
+	
 	@Override
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
 			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
 
 		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
 
-		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(
-				keySelector);
+		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
 		return returnStream;
 	}
 
+	
+	
 	@Override
 	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
 		DataStreamSink<T> result = super.addSink(sinkFunction);
 		result.getTransformation().setStateKeySelector(keySelector);
 		return result;
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Windowing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
+	 * grouped stream. The window is defined by a single policy.
+	 * <p>
+	 * For time windows, these single-policy windows result in tumbling time windows.
+	 *     
+	 * @param policy The policy that defines the window.
+	 * @return The windows data stream. 
+	 */
+	public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
+		return new KeyedWindowDataStream<T, K>(this, policy);
+	}
+
+	/**
+	 * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
+	 * grouped stream. The window is defined by a window policy, plus a slide policy.
+	 * <p>
+	 * For time windows, these slide policy windows result in sliding time windows.
+	 * 
+	 * @param window The policy that defines the window.
+	 * @param slide The additional policy defining the slide of the window. 
+	 * @return The windows data stream.
+	 */
+	public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) {
+		return new KeyedWindowDataStream<T, K>(this, window, slide);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
new file mode 100644
index 0000000..2ec175a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
+
+/**
+ * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and 
+ * for each key, the stream of elements is split into windows. The windows are conceptually
+ * evaluated for each key individually, meaning windows and trigger at different points
+ * for each key.
+ * <p>
+ * In many cases, however, the windows are "aligned", meaning they trigger at the
+ * same time for all keys. The most common example for that are the regular time windows.
+ * <p>
+ * Note that the KeyedWindowDataStream is purely and API construct, during runtime the
+ * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
+ * over the window into one single operation.
+ * 
+ * @param <Type> The type of elements in the stream.
+ * @param <Key> The type of the key by which elements are grouped.
+ */
+public class KeyedWindowDataStream<Type, Key> {
+
+	/** The keyed data stream that is windowed by this stream */
+	private final KeyedDataStream<Type, Key> input;
+
+	/** The core window policy */
+	private final WindowPolicy windowPolicy;
+
+	/** The optional additional slide policy */
+	private final WindowPolicy slidePolicy;
+	
+	
+	public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) {
+		this(input, windowPolicy, null);
+	}
+
+	public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
+								WindowPolicy windowPolicy, WindowPolicy slidePolicy) 
+	{
+		TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+		this.input = input;
+		this.windowPolicy = windowPolicy.makeSpecificBasedOnTimeCharacteristic(time);
+		this.slidePolicy = slidePolicy == null ? null : slidePolicy.makeSpecificBasedOnTimeCharacteristic(time);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Operations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the reduce function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
+	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * so a few elements are stored per key (one per slide interval).
+	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+	 * aggregation tree.
+	 * 
+	 * @param function The reduce function.
+	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 */
+	public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
+		String callLocation = Utils.getCallLocationName();
+		return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
+	}
+
+	/**
+	 * Applies a window function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means od pre-aggregation.
+	 * 
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) {
+		String callLocation = Utils.getCallLocationName();
+
+		TypeInformation<Type> inType = input.getType();
+		TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, KeyedWindowFunction.class, true, true, inType, null, false);
+
+		return createWindowOperator(function, resultType, "KeyedWindowFunction at " + callLocation);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private <Result> DataStream<Result> createWindowOperator(
+			Function function, TypeInformation<Result> resultType, String functionName) {
+
+		String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
+		KeySelector<Type, Key> keySel = input.getKeySelector();
+		
+		OneInputStreamOperator<Type, Result> operator =
+				PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
+		
+		return input.transform(opName, resultType, operator);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index bf3a11a..1226adf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -103,7 +103,7 @@ public class WindowedDataStream<OUT> {
 		this.triggerHelper = policyHelper;
 
 		if (dataStream instanceof GroupedDataStream) {
-			this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
+			this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
 		}
 	}
 
@@ -115,7 +115,7 @@ public class WindowedDataStream<OUT> {
 		this.userEvicter = evicter;
 
 		if (dataStream instanceof GroupedDataStream) {
-			this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
+			this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index d91afc9..a22a519 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.environment;
 
 import com.esotericsoftware.kryo.Serializer;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -49,6 +49,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
@@ -72,10 +73,12 @@ import org.apache.flink.util.SplittableIterator;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -83,25 +86,33 @@ import java.util.List;
  */
 public abstract class StreamExecutionEnvironment {
 
-	public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
+	public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
 
 	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
-
+	
+	/** The time characteristic that is used if none other is set */
+	private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+	
+	// ------------------------------------------------------------------------
+	
 	private long bufferTimeout = 100;
 
-	private ExecutionConfig config = new ExecutionConfig();
+	private final ExecutionConfig config = new ExecutionConfig();
 
-	protected List<StreamTransformation<?>> transformations = Lists.newArrayList();
+	protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
 
 	protected boolean isChainingEnabled = true;
 
 	protected long checkpointInterval = -1; // disabled
 
-	protected CheckpointingMode checkpointingMode = null;
+	protected CheckpointingMode checkpointingMode;
 
 	protected boolean forceCheckpointing = false;
 
 	protected StateHandleProvider<?> stateHandleProvider;
+	
+	/** The time characteristic used by the data streams */
+	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
 	/** The environment of the context (local by default, cluster if invoked through command line) */
 	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
@@ -516,6 +527,30 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	// --------------------------------------------------------------------------------------------
+	//  Time characteristic
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the time characteristic for the stream, e.g., processing time, event time,
+	 * or ingestion time.
+	 * 
+	 * @param characteristic The time characteristic.
+	 */
+	public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
+		this.timeCharacteristic = Objects.requireNonNull(characteristic);
+	}
+
+	/**
+	 * Gets the time characteristic for the stream, e.g., processing time, event time,
+	 * or ingestion time.
+	 * 
+	 * @return The time characteristic.
+	 */
+	public TimeCharacteristic getStreamTimeCharacteristic() {
+		return timeCharacteristic;
+	}
+	
+	// --------------------------------------------------------------------------------------------
 	// Data stream creations
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
index d7ca0a1..b4e55e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
@@ -25,12 +25,12 @@ import java.io.Serializable;
 
 /**
  * Base interface for functions that are evaluated over keyed (grouped) windows.
- * 
- * @param <KEY> The type of the key.
+ *
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
  */
-public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, Serializable {
+public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, Serializable {
 
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
new file mode 100644
index 0000000..9dc0dd0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class AbstractTimePolicy extends WindowPolicy {
+
+	private static final long serialVersionUID = 6593098375698927728L;
+	
+	/** the time unit for this policy's time interval */
+	private final TimeUnit unit;
+	
+	/** the length of this policy's time interval */
+	private final long num;
+
+
+	protected AbstractTimePolicy(long num, TimeUnit unit) {
+		this.unit = checkNotNull(unit, "time unit may not be null");
+		this.num = num;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the time unit for this policy's time interval.
+	 * @return The time unit for this policy's time interval.
+	 */
+	public TimeUnit getUnit() {
+		return unit;
+	}
+
+	/**
+	 * Gets the length of this policy's time interval.
+	 * @return The length of this policy's time interval.
+	 */
+	public long getNum() {
+		return num;
+	}
+
+	/**
+	 * Converts the time interval to milliseconds.
+	 * @return The time interval in milliseconds.
+	 */
+	public long toMilliseconds() {
+		return unit.toMillis(num);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString(WindowPolicy slidePolicy) {
+		if (slidePolicy == null) {
+			return "Tumbling Window (" + getClass().getSimpleName() + ") (" + num + ' ' + unit.name() + ')';
+		}
+		else if (slidePolicy.getClass() == getClass()) {
+			AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy;
+			
+			return "Sliding Window (" + getClass().getSimpleName() + ") (length="
+					+ num + ' ' + unit.name() + ", slide=" + timeSlide.num + ' ' + timeSlide.unit.name() + ')';
+		}
+		else {
+			return super.toString(slidePolicy);
+		}
+	}
+	
+	@Override
+	public int hashCode() {
+		return 31 * (int) (num ^ (num >>> 32)) + unit.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj.getClass() == getClass()) {
+			AbstractTimePolicy that = (AbstractTimePolicy) obj;
+			return this.num == that.num && this.unit.equals(that.unit);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName() + " (" + num + ' ' + unit.name() + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
new file mode 100644
index 0000000..8a671fc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of an event time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
+ * of event time.
+ */
+public final class EventTime extends AbstractTimePolicy {
+
+	private static final long serialVersionUID = 8333566691833596747L;
+
+	/** Instantiation only via factory method */
+	private EventTime(long num, TimeUnit unit) {
+		super(num, unit);
+	}
+
+	@Override
+	public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+		if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
+			return this;
+		}
+		else {
+			throw new InvalidProgramException(
+					"Cannot use EventTime policy in a dataflow that runs on " + characteristic);
+		}
+	}
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates an event time policy describing an event time interval.
+	 *
+	 * @param num The length of the time interval.
+	 * @param unit The init (seconds, milliseconds) of the time interval.
+	 * @return The event time policy.
+	 */
+	public static EventTime of(long num, TimeUnit unit) {
+		return new EventTime(num, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
new file mode 100644
index 0000000..2ff13fa
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a processing time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
+ * of processing time.
+ */
+public final class ProcessingTime extends AbstractTimePolicy {
+
+	private static final long serialVersionUID = 7546166721132583007L;
+
+	/** Instantiation only via factory method */
+	private ProcessingTime(long num, TimeUnit unit) {
+		super(num, unit);
+	}
+
+	@Override
+	public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+		if (characteristic == TimeCharacteristic.ProcessingTime) {
+			return this;
+		}
+		else {
+			throw new InvalidProgramException(
+					"Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a processing time policy describing a processing time interval.
+	 * 
+	 * @param num The length of the time interval.
+	 * @param unit The init (seconds, milliseconds) of the time interval.
+	 * @return The processing time policy.
+	 */
+	public static ProcessingTime of(long num, TimeUnit unit) {
+		return new ProcessingTime(num, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
new file mode 100644
index 0000000..0233e96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a time interval for windowing. The time characteristic referred
+ * to is the default time characteristic set on the execution environment.
+ */
+public final class Time extends AbstractTimePolicy {
+
+	private static final long serialVersionUID = 3197290738634320211L;
+
+	/** Instantiation only via factory method */
+	private Time(long num, TimeUnit unit) {
+		super(num, unit);
+	}
+
+	@Override
+	public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+		switch (timeCharacteristic) {
+			case ProcessingTime:
+				return ProcessingTime.of(getNum(), getUnit());
+			case IngestionTime:
+			case EventTime:
+				return EventTime.of(getNum(), getUnit());
+			default:
+				throw new IllegalArgumentException("Unknown time characteristic");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a time policy describing a processing time interval. The policy refers to the
+	 * time characteristic that is set on the dataflow via
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
+	 * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
+	 *
+	 * @param num The length of the time interval.
+	 * @param unit The init (seconds, milliseconds) of the time interval.
+	 * @return The time policy.
+	 */
+	public static Time of(long num, TimeUnit unit) {
+		return new Time(num, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
new file mode 100644
index 0000000..a82f892
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+/**
+ * The base class of all window policies. Window policies define how windows
+ * are formed over the data stream.
+ */
+public abstract class WindowPolicy implements java.io.Serializable {
+
+	private static final long serialVersionUID = -8696529489282723113L;
+	
+	/**
+	 * If this window policies concrete instantiation depends on the time characteristic of the
+	 * dataflow (processing time, event time), then this method must be overridden to convert this
+	 * policy to the respective specific instantiation.
+	 * <p>
+	 * The {@link Time} policy for example, will convert itself to an {@link ProcessingTime} policy,
+	 * if the time characteristic is set to {@link TimeCharacteristic#ProcessingTime}.
+	 * <p>
+	 * By default, this method does nothing and simply returns this object itself.
+	 * 
+	 * @param characteristic The time characteristic of the dataflow.
+	 * @return The specific instantiation of this policy, or the policy itself. 
+	 */
+	public WindowPolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+		return this;
+	}
+	
+	
+	public String toString(WindowPolicy slidePolicy) {
+		if (slidePolicy != null) {
+			return "Window [" + toString() + ", slide=" + slidePolicy + ']';
+		}
+		else {
+			return "Window [" + toString() + ']';
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
index e776106..1212123 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
@@ -32,13 +32,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
 
-	private final KeyedWindowFunction<Key, Type, Result> function;
+	private final KeyedWindowFunction<Type, Result, Key> function;
 	
 	private long evaluationPass;
 
 	// ------------------------------------------------------------------------
 	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Key, Type, Result> function) {
+	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
 		this.keySelector = keySelector;
 		this.function = function;
 	}
@@ -75,7 +75,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	
 	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-		private final KeyedWindowFunction<Key, Type, Result> function;
+		private final KeyedWindowFunction<Type, Result, Key> function;
 		
 		private final UnionIterator<Type> unionIterator;
 		
@@ -83,7 +83,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		
 		private Key currentKey;
 
-		WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> function, Collector<Result> out) {
+		WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
 			this.function = function;
 			this.out = out;
 			this.unionIterator = new UnionIterator<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
index 16444c2..fb9d163 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
@@ -30,7 +30,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	
 	public AccumulatingProcessingTimeWindowOperator(
-			KeyedWindowFunction<KEY, IN, OUT> function,
+			KeyedWindowFunction<IN, OUT, KEY> function,
 			KeySelector<IN, KEY> keySelector,
 			long windowLength,
 			long windowSlide)
@@ -41,7 +41,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 	@Override
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
-		KeyedWindowFunction<KEY, IN, OUT> windowFunction = (KeyedWindowFunction<KEY, IN, OUT>) function;
+		KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
 		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
new file mode 100644
index 0000000..9d06ef5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+
+/**
+ * This class implements the conversion from window policies to concrete operator
+ * implementations.
+ */
+public class PolicyToOperator {
+
+	/**
+	 * Entry point to create an operator for the given window policies and the window function.
+	 */
+	public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
+			WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
+	{
+		if (window == null || function == null) {
+			throw new NullPointerException();
+		}
+		
+		// -- case 1: both policies are processing time policies
+		if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
+			final long windowLength = ((ProcessingTime) window).toMilliseconds();
+			final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
+			
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
+						new AggregatingProcessingTimeWindowOperator<KEY, IN>(
+								reducer, keySelector, windowLength, windowSlide);
+				return op;
+			}
+			else if (function instanceof KeyedWindowFunction) {
+				@SuppressWarnings("unchecked")
+				KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function;
+
+				return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+								wf, keySelector, windowLength, windowSlide);
+			}
+		}
+
+		// -- case 2: both policies are event time policies
+		if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
+			// add event time implementation
+		}
+		
+		throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/** Don't instantiate */
+	private PolicyToOperator() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 2e0fe66..f758147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 public class KeySelectorUtil {
 
-	public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
+	public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
 		if (!(typeInfo instanceof CompositeType)) {
 			throw new InvalidTypesException(
 					"This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
@@ -93,9 +93,15 @@ public class KeySelectorUtil {
 			comparator.extractKeys(value, keyArray, 0);
 			return (K) keyArray[0];
 		}
-
 	}
 
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * A key selector for selecting key fields via a TypeComparator.
+	 *
+	 * @param <IN> The type from which the key is extracted.
+	 */
 	public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
 
 		private static final long serialVersionUID = 1L;
@@ -126,6 +132,13 @@ public class KeySelectorUtil {
 
 	}
 
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * A key selector for selecting individual array fields as keys and returns them as a Tuple.
+	 * 
+	 * @param <IN> The type from which the key is extracted, i.e., the array type.
+	 */
 	public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index b8b4c13..207b1b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -113,7 +113,9 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(3);
 
-		KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
+		KeyedDataStream<Integer, Integer> keyedStream = env
+				.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
+				.keyBy(new ModKey(4));
 
 		keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
 			private static final long serialVersionUID = 1L;
@@ -163,7 +165,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 
 	@SuppressWarnings("unchecked")
 	private StreamMap<Integer, String> createOperatorWithContext(List<String> output,
-			KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception {
+			KeySelector<Integer, ? extends Serializable> partitioner, byte[] serializedState) throws Exception {
 		final List<String> outputList = output;
 
 		StreamingRuntimeContext context = new StreamingRuntimeContext(
@@ -355,7 +357,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 
 	}
 
-	public static class ModKey implements KeySelector<Integer, Serializable> {
+	public static class ModKey implements KeySelector<Integer, Integer> {
 
 		private static final long serialVersionUID = 4193026742083046736L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 7387a1e..e52c2cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -22,15 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
 import org.apache.flink.util.Collector;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 @SuppressWarnings("serial")
 public class GroupedProcessingTimeWindowExample {
 	
@@ -75,31 +76,20 @@ public class GroupedProcessingTimeWindowExample {
 				});
 		
 		stream
-				.groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
-//				.window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS))
-//				.reduceWindow(new SummingReducer())
-//				.flatten()
-//		.partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
-//		.transform(
-//				"Aligned time window",
-//				TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
-//				new AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long, Long>>(
-//						new SummingWindowFunction<Long>(),
-//						new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
-//						2500, 500))
-			.transform(
-				"Aligned time window",
-				TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
-				new AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>(
-						new SummingReducer(),
-						new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
-						2500, 500))
+			.keyBy(0)
+			.window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+			.reduceWindow(new SummingReducer())
+
+			// alternative: use a mapWindow function which does not pre-aggregate
+//			.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
+//			.window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+//			.mapWindow(new SummingWindowFunction())
 				
 			.addSink(new SinkFunction<Tuple2<Long, Long>>() {
-					@Override
-					public void invoke(Tuple2<Long, Long> value) {
-			}
-		});
+				@Override
+				public void invoke(Tuple2<Long, Long> value) {
+				}
+			});
 		
 		env.execute();
 	}
@@ -113,47 +103,16 @@ public class GroupedProcessingTimeWindowExample {
 		}
 	}
 
-	public static class IdentityKeyExtractor<T> implements KeySelector<T, T> {
-
-		@Override
-		public T getKey(T value) {
-			return value;
-		}
-	}
-
-	public static class IdentityWindowFunction<K, T> implements KeyedWindowFunction<K, T, T> {
-
-		@Override
-		public void evaluate(K k, Iterable<T> values, Collector<T> out) throws Exception {
-			for (T v : values) {
-				out.collect(v);
-			}
-		}
-	}
-	
-	public static class CountingWindowFunction<K, T> implements KeyedWindowFunction<K, T, Long> {
-		
-		@Override
-		public void evaluate(K k, Iterable<T> values, Collector<Long> out) throws Exception {
-			long count = 0;
-			for (T ignored : values) {
-				count++;
-			}
-
-			out.collect(count);
-		}
-	}
-
-	public static class SummingWindowFunction<K> implements KeyedWindowFunction<K, Tuple2<K, Long>, Tuple2<K, Long>> {
+	public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
 
 		@Override
-		public void evaluate(K key, Iterable<Tuple2<K, Long>> values, Collector<Tuple2<K, Long>> out) throws Exception {
+		public void evaluate(Long key, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
 			long sum = 0L;
-			for (Tuple2<K, Long> value : values) {
+			for (Tuple2<Long, Long> value : values) {
 				sum += value.f1;
 			}
 
-			out.collect(new Tuple2<K, Long>(key, sum));
+			out.collect(new Tuple2<>(key, sum));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2bb6a6a..2f4bd23 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.api.scala
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, MapFunction,
-  Partitioner, FoldFunction, FilterFunction}
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -30,17 +30,12 @@ import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.OperatorState
 import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.KeyedDataStream
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 
@@ -244,20 +239,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy(fields: Int*): GroupedDataStream[T] = javaStream.groupBy(fields: _*)
+  def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T] = 
+  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] = 
    javaStream.groupBy(firstField +: otherFields.toArray: _*)   
   
   /**
    * Groups the elements of a DataStream by the given K key to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T] = {
+  def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = {
 
     val cleanFun = clean(fun)
     val keyExtractor = new KeySelector[T, K] {
@@ -605,7 +600,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   private[flink] def isStatePartitioned: Boolean = {
-    javaStream.isInstanceOf[KeyedDataStream[T]]
+    javaStream.isInstanceOf[KeyedDataStream[_, _]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
index 34f0807..e1a963d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
@@ -29,7 +29,8 @@ import org.apache.flink.api.common.functions.FoldFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 
 
-class GroupedDataStream[T](javaStream: GroupedJavaStream[T]) extends DataStream[T](javaStream){
+class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K]) 
+  extends DataStream[T](javaStream) {
  
   /**
    * Creates a new [[DataStream]] by reducing the elements of this DataStream

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 5e02ec5..9d62bcb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,13 +18,15 @@
 
 package org.apache.flink.streaming.api.scala
 
+import java.util.Objects
+
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.StateHandleProvider
-import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -294,6 +296,27 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   // --------------------------------------------------------------------------------------------
+  //  Time characteristic
+  // --------------------------------------------------------------------------------------------
+  /**
+   * Sets the time characteristic for the stream, e.g., processing time, event time,
+   * or ingestion time.
+   *
+   * @param characteristic The time characteristic.
+   */
+  def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
+    javaEnv.setStreamTimeCharacteristic(characteristic)
+  }
+
+  /**
+   * Gets the time characteristic for the stream, e.g., processing time, event time,
+   * or ingestion time.
+   *
+   * @return The time characteristic.
+   */
+  def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
+
+  // --------------------------------------------------------------------------------------------
   // Data stream creations
   // --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 2eb4f9e..59843e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -38,8 +38,8 @@ package object scala {
   implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
     new DataStream[R](javaStream)
     
-  implicit def javaToScalaGroupedStream[R](javaStream: GroupedJavaStream[R]): 
-  GroupedDataStream[R] = new GroupedDataStream[R](javaStream)    
+  implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, K]): 
+  GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream)    
 
   implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
     new WindowedDataStream[R](javaWStream)