You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:03 UTC

[07/12] flink git commit: Harmonize generic parameter names in Stream API classes

Harmonize generic parameter names in Stream API classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86c45bfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86c45bfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86c45bfa

Branch: refs/heads/master
Commit: 86c45bfa2c760ca99741fa866f777730514c7986
Parents: 05d2138
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 24 16:40:46 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/GroupedDataStream.java       |  68 +++---
 .../api/datastream/KeyedDataStream.java         |  18 +-
 .../api/datastream/KeyedWindowDataStream.java   |  22 +-
 .../api/datastream/WindowedDataStream.java      | 239 +++++++++----------
 4 files changed, 173 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 50bf341..fde5a6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
  * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
  * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
  * get additional functionality by the grouping.
- * 
- * @param <OUT>
- *            The output type of the {@link GroupedDataStream}.
+ *
+ * @param <T> The type of the elements in the Grouped Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
  */
-public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
+public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
 
 	/**
 	 * Creates a new {@link GroupedDataStream}, group inclusion is determined using
@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 * @param dataStream Base stream of data
 	 * @param keySelector Function for determining group inclusion
 	 */
-	public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) {
+	public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
 		super(dataStream, keySelector);
 	}
 
@@ -64,8 +64,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            element of the input values with the same key.
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return transform("Grouped Reduce", getType(), new StreamGroupedReduce<OUT>(
+	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
+		return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
 				clean(reducer), keySelector));
 	}
 
@@ -82,12 +82,12 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The initialValue passed to the folders for each key.
 	 * @return The transformed DataStream.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
 				Utils.getCallLocationName(), true);
 
-		return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder),
+		return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
 				keySelector, initialValue));
 	}
 
@@ -100,8 +100,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
-		return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig()));
+	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+		return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
 	}
 
 	/**
@@ -117,8 +117,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            applied.
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig()));
+	public SingleOutputStreamOperator<T, ?> sum(String field) {
+		return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
 	}
 
 	/**
@@ -130,8 +130,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to minimize
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN,
+	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
 				getExecutionConfig()));
 	}
 
@@ -148,8 +148,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            applied.
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> min(String field) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN,
+	public SingleOutputStreamOperator<T, ?> min(String field) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
 				false, getExecutionConfig()));
 	}
 
@@ -162,8 +162,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to maximize
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX,
+	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
 				getExecutionConfig()));
 	}
 
@@ -180,8 +180,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            applied.
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX,
+	public SingleOutputStreamOperator<T, ?> max(String field) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
 				false, getExecutionConfig()));
 	}
 
@@ -202,7 +202,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 * @return The transformed DataStream.
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
 		return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY,
 				first, getExecutionConfig()));
 	}
@@ -223,8 +223,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            be returned
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY,
+	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY,
 				first, getExecutionConfig()));
 	}
 
@@ -238,7 +238,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to minimize
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -252,7 +252,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to minimize
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) {
+	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -270,8 +270,8 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            minimal value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first,
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
 				getExecutionConfig()));
 	}
 
@@ -285,7 +285,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to maximize
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -299,7 +299,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            The position in the data point to maximize
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) {
+	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -317,13 +317,13 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
 	 *            maximum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
 				getExecutionConfig()));
 	}
 
-	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-		StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
+	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
+		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector);
 		return transform("Grouped Aggregation", getType(), operator);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index a32cf53..611953e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -34,11 +34,11 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * 
  * 
  * @param <T> The type of the elements in the Keyed Stream.
- * @param <K> The type of the key in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
  */
-public class KeyedDataStream<T, K> extends DataStream<T> {
+public class KeyedDataStream<T, KEY> extends DataStream<T> {
 	
-	protected final KeySelector<T, K> keySelector;
+	protected final KeySelector<T, KEY> keySelector;
 
 	/**
 	 * Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
@@ -49,13 +49,13 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
 	 * @param keySelector
 	 *            Function for determining state partitions
 	 */
-	public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
+	public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
 		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
 		this.keySelector = keySelector;
 	}
 
 	
-	public KeySelector<T, K> getKeySelector() {
+	public KeySelector<T, KEY> getKeySelector() {
 		return this.keySelector;
 	}
 
@@ -98,8 +98,8 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
 	 * @param policy The policy that defines the window.
 	 * @return The windows data stream. 
 	 */
-	public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
-		return new KeyedWindowDataStream<T, K>(this, policy);
+	public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) {
+		return new KeyedWindowDataStream<T, KEY>(this, policy);
 	}
 
 	/**
@@ -112,7 +112,7 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
 	 * @param slide The additional policy defining the slide of the window. 
 	 * @return The windows data stream.
 	 */
-	public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) {
-		return new KeyedWindowDataStream<T, K>(this, window, slide);
+	public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
+		return new KeyedWindowDataStream<T, KEY>(this, window, slide);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 37151d7..711a959 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -43,13 +43,13 @@ import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
  * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
  * over the window into one single operation.
  * 
- * @param <Type> The type of elements in the stream.
- * @param <Key> The type of the key by which elements are grouped.
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
  */
-public class KeyedWindowDataStream<Type, Key> {
+public class KeyedWindowDataStream<T, K> {
 
 	/** The keyed data stream that is windowed by this stream */
-	private final KeyedDataStream<Type, Key> input;
+	private final KeyedDataStream<T, K> input;
 
 	/** The core window policy */
 	private final WindowPolicy windowPolicy;
@@ -58,11 +58,11 @@ public class KeyedWindowDataStream<Type, Key> {
 	private final WindowPolicy slidePolicy;
 	
 	
-	public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) {
+	public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy) {
 		this(input, windowPolicy, null);
 	}
 
-	public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
+	public KeyedWindowDataStream(KeyedDataStream<T, K> input,
 								WindowPolicy windowPolicy, WindowPolicy slidePolicy) 
 	{
 		TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
@@ -91,7 +91,7 @@ public class KeyedWindowDataStream<Type, Key> {
 	 * @param function The reduce function.
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
-	public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
+	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
 		String callLocation = Utils.getCallLocationName();
 		return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
 	}
@@ -107,10 +107,10 @@ public class KeyedWindowDataStream<Type, Key> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) {
+	public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K> function) {
 		String callLocation = Utils.getCallLocationName();
 
-		TypeInformation<Type> inType = input.getType();
+		TypeInformation<T> inType = input.getType();
 		TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, KeyedWindowFunction.class, true, true, inType, null, false);
 
@@ -125,9 +125,9 @@ public class KeyedWindowDataStream<Type, Key> {
 			Function function, TypeInformation<Result> resultType, String functionName) {
 
 		String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
-		KeySelector<Type, Key> keySel = input.getKeySelector();
+		KeySelector<T, K> keySel = input.getKeySelector();
 		
-		OneInputStreamOperator<Type, Result> operator =
+		OneInputStreamOperator<T, Result> operator =
 				PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
 		
 		return input.transform(opName, resultType, operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/86c45bfa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 1226adf..ef6f53b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -80,46 +80,45 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
  * can be applied to the windows. The results of these transformations are also
  * WindowedDataStreams of the same discretisation unit.
  * 
- * @param <OUT>
- *            The output type of the {@link WindowedDataStream}
+ * @param <T> The output type of the {@link WindowedDataStream}
  */
-public class WindowedDataStream<OUT> {
+public class WindowedDataStream<T> {
 
-	protected DataStream<OUT> dataStream;
+	protected DataStream<T> dataStream;
 
 	protected boolean isLocal = false;
 
-	protected KeySelector<OUT, ?> discretizerKey;
-	protected KeySelector<OUT, ?> groupByKey;
+	protected KeySelector<T, ?> discretizerKey;
+	protected KeySelector<T, ?> groupByKey;
 
-	protected WindowingHelper<OUT> triggerHelper;
-	protected WindowingHelper<OUT> evictionHelper;
+	protected WindowingHelper<T> triggerHelper;
+	protected WindowingHelper<T> evictionHelper;
 
-	protected TriggerPolicy<OUT> userTrigger;
-	protected EvictionPolicy<OUT> userEvicter;
+	protected TriggerPolicy<T> userTrigger;
+	protected EvictionPolicy<T> userEvicter;
 
-	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> policyHelper) {
+	protected WindowedDataStream(DataStream<T> dataStream, WindowingHelper<T> policyHelper) {
 		this.dataStream = dataStream;
 		this.triggerHelper = policyHelper;
 
 		if (dataStream instanceof GroupedDataStream) {
-			this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
+			this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
 		}
 	}
 
-	protected WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> trigger,
-			EvictionPolicy<OUT> evicter) {
+	protected WindowedDataStream(DataStream<T> dataStream, TriggerPolicy<T> trigger,
+			EvictionPolicy<T> evicter) {
 		this.dataStream = dataStream;
 
 		this.userTrigger = trigger;
 		this.userEvicter = evicter;
 
 		if (dataStream instanceof GroupedDataStream) {
-			this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
+			this.discretizerKey = ((GroupedDataStream<T, ?>) dataStream).keySelector;
 		}
 	}
 
-	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
+	protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
 		this.dataStream = windowedDataStream.dataStream;
 		this.discretizerKey = windowedDataStream.discretizerKey;
 		this.groupByKey = windowedDataStream.groupByKey;
@@ -148,9 +147,9 @@ public class WindowedDataStream<OUT> {
 	 * @return The windowed data stream with triggering set
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+	public WindowedDataStream<T> every(WindowingHelper policyHelper) {
 		policyHelper.setExecutionConfig(getExecutionConfig());
-		WindowedDataStream<OUT> ret = this.copy();
+		WindowedDataStream<T> ret = this.copy();
 		if (ret.evictionHelper == null) {
 			ret.evictionHelper = ret.triggerHelper;
 			ret.triggerHelper = policyHelper;
@@ -171,11 +170,11 @@ public class WindowedDataStream<OUT> {
 	 *            The position of the fields to group by.
 	 * @return The grouped {@link WindowedDataStream}
 	 */
-	public WindowedDataStream<OUT> groupBy(int... fields) {
+	public WindowedDataStream<T> groupBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return groupBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
+			return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
 		} else {
-			return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+			return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
 		}
 	}
 
@@ -195,8 +194,8 @@ public class WindowedDataStream<OUT> {
 	 *            The fields to group by
 	 * @return The grouped {@link WindowedDataStream}
 	 */
-	public WindowedDataStream<OUT> groupBy(String... fields) {
-		return groupBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
+	public WindowedDataStream<T> groupBy(String... fields) {
+		return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
 	}
 
 	/**
@@ -211,13 +210,13 @@ public class WindowedDataStream<OUT> {
 	 *            The keySelector used to extract the key for grouping.
 	 * @return The grouped {@link WindowedDataStream}
 	 */
-	public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
-		WindowedDataStream<OUT> ret = this.copy();
+	public WindowedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
+		WindowedDataStream<T> ret = this.copy();
 		ret.groupByKey = keySelector;
 		return ret;
 	}
 
-	private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
+	private WindowedDataStream<T> groupBy(Keys<T> keys) {
 		return groupBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
 				getExecutionConfig())));
 	}
@@ -228,8 +227,8 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 * @return The WindowedDataStream with local discretisation
 	 */
-	public WindowedDataStream<OUT> local() {
-		WindowedDataStream<OUT> out = copy();
+	public WindowedDataStream<T> local() {
+		WindowedDataStream<T> out = copy();
 		out.isLocal = true;
 		return out;
 	}
@@ -241,11 +240,11 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 * @return The discretised stream
 	 */
-	public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
+	public DataStream<StreamWindow<T>> getDiscretizedStream() {
 		if (getEviction() instanceof KeepAllEvictionPolicy) {
 			throw new RuntimeException("Cannot get discretized stream for full stream window");
 		}
-		return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>())
+		return discretize(WindowTransformation.NONE, new BasicWindowBuffer<T>())
 				.getDiscretizedStream();
 	}
 
@@ -255,7 +254,7 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 * @return The data stream consisting of the individual records.
 	 */
-	public DataStream<OUT> flatten() {
+	public DataStream<T> flatten() {
 		return dataStream;
 	}
 
@@ -269,7 +268,7 @@ public class WindowedDataStream<OUT> {
 	 *            The reduce function that will be applied to the windows.
 	 * @return The transformed DataStream
 	 */
-	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
+	public DiscretizedStream<T> reduceWindow(ReduceFunction<T> reduceFunction) {
 
 		// We check whether we should apply parallel time discretization, which
 		// is a more complex exploiting the monotonic properties of time
@@ -281,9 +280,9 @@ public class WindowedDataStream<OUT> {
 			WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
 					.with(clean(reduceFunction));
 
-			WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
+			WindowBuffer<T> windowBuffer = getWindowBuffer(transformation);
 
-			DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+			DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
 
 			if (windowBuffer instanceof PreAggregator) {
 				return discretized;
@@ -310,11 +309,11 @@ public class WindowedDataStream<OUT> {
 	 *            The output type of the operator
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction,
 			TypeInformation<R> outType) {
 
 		return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
-				new BasicWindowBuffer<OUT>()).foldWindow(initialValue, foldFunction, outType);
+				new BasicWindowBuffer<T>()).foldWindow(initialValue, foldFunction, outType);
 
 	}
 
@@ -330,7 +329,7 @@ public class WindowedDataStream<OUT> {
 	 *            Initial value given to foldFunction
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction) {
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
 				getType());
@@ -350,7 +349,7 @@ public class WindowedDataStream<OUT> {
 	 *            The function that will be applied to the windows.
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
+	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction) {
 		return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
 				getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
 	}
@@ -373,7 +372,7 @@ public class WindowedDataStream<OUT> {
 	 *            The output type of the operator.
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,
+	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction,
 			TypeInformation<R> outType) {
 
 		return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
@@ -381,24 +380,24 @@ public class WindowedDataStream<OUT> {
 				outType);
 	}
 
-	private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
-			WindowBuffer<OUT> windowBuffer) {
+	private DiscretizedStream<T> discretize(WindowTransformation transformation,
+			WindowBuffer<T> windowBuffer) {
 
-		OneInputStreamOperator<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
+		OneInputStreamOperator<T, WindowEvent<T>> discretizer = getDiscretizer();
 
-		OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> bufferOperator = getBufferOperator(windowBuffer);
+		OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> bufferOperator = getBufferOperator(windowBuffer);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
-		TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
+		TypeInformation<WindowEvent<T>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
 				getType(), BasicTypeInfo.INT_TYPE_INFO);
 
 		int parallelism = getDiscretizerParallelism(transformation);
 
-		return new DiscretizedStream<OUT>(dataStream
+		return new DiscretizedStream<T>(dataStream
 				.transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
 				.setParallelism(parallelism)
 				.transform(windowBuffer.getClass().getSimpleName(),
-						new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
+						new StreamWindowTypeInfo<T>(getType()), bufferOperator)
 				.setParallelism(parallelism), groupByKey, transformation, false);
 
 	}
@@ -430,7 +429,7 @@ public class WindowedDataStream<OUT> {
 	 *            Reduce function to apply
 	 * @return The transformed stream
 	 */
-	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
+	protected DiscretizedStream<T> timeReduce(ReduceFunction<T> reduceFunction) {
 
 		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
 				.with(clean(reduceFunction));
@@ -438,7 +437,7 @@ public class WindowedDataStream<OUT> {
 		// We get the windowbuffer and set it to emit empty windows with
 		// sequential IDs. This logic is necessary to merge windows created in
 		// parallel.
-		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
+		WindowBuffer<T> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
 
 		// If there is a groupby for the reduce operation we apply it before the
 		// discretizers, because we will forward everything afterwards to
@@ -449,7 +448,7 @@ public class WindowedDataStream<OUT> {
 
 		// We discretize the stream and call the timeReduce function of the
 		// discretized stream, we also pass the type of the windowbuffer
-		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+		DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
 
 		if (getEviction() instanceof KeepAllEvictionPolicy
 				&& !(windowBuffer instanceof PreAggregator)) {
@@ -464,27 +463,27 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Based on the defined policies, returns the stream discretizer to be used
 	 */
-	private OneInputStreamOperator<OUT, WindowEvent<OUT>> getDiscretizer() {
+	private OneInputStreamOperator<T, WindowEvent<T>> getDiscretizer() {
 		if (discretizerKey == null) {
-			return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
+			return new StreamDiscretizer<T>(getTrigger(), getEviction());
 		} else if (getTrigger() instanceof CentralActiveTrigger) {
-			return new GroupedActiveDiscretizer<OUT>(discretizerKey,
-					(CentralActiveTrigger<OUT>) getTrigger(),
-					(CloneableEvictionPolicy<OUT>) getEviction());
+			return new GroupedActiveDiscretizer<T>(discretizerKey,
+					(CentralActiveTrigger<T>) getTrigger(),
+					(CloneableEvictionPolicy<T>) getEviction());
 		} else {
-			return new GroupedStreamDiscretizer<OUT>(discretizerKey,
-					(CloneableTriggerPolicy<OUT>) getTrigger(),
-					(CloneableEvictionPolicy<OUT>) getEviction());
+			return new GroupedStreamDiscretizer<T>(discretizerKey,
+					(CloneableTriggerPolicy<T>) getTrigger(),
+					(CloneableEvictionPolicy<T>) getEviction());
 		}
 
 	}
 
-	private OneInputStreamOperator<WindowEvent<OUT>, StreamWindow<OUT>> getBufferOperator(
-			WindowBuffer<OUT> windowBuffer) {
+	private OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> getBufferOperator(
+			WindowBuffer<T> windowBuffer) {
 		if (discretizerKey == null) {
-			return new StreamWindowBuffer<OUT>(windowBuffer);
+			return new StreamWindowBuffer<T>(windowBuffer);
 		} else {
-			return new GroupedWindowBuffer<OUT>(windowBuffer, discretizerKey);
+			return new GroupedWindowBuffer<T>(windowBuffer, discretizerKey);
 		}
 	}
 
@@ -496,43 +495,43 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 */
 	@SuppressWarnings("unchecked")
-	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
-		TriggerPolicy<OUT> trigger = getTrigger();
-		EvictionPolicy<OUT> eviction = getEviction();
+	private WindowBuffer<T> getWindowBuffer(WindowTransformation transformation) {
+		TriggerPolicy<T> trigger = getTrigger();
+		EvictionPolicy<T> eviction = getEviction();
 
 		if (transformation == WindowTransformation.REDUCEWINDOW) {
 			if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
 				if (eviction instanceof KeepAllEvictionPolicy) {
 					if (groupByKey == null) {
-						return new TumblingPreReducer<OUT>(
-								(ReduceFunction<OUT>) transformation.getUDF(), getType()
+						return new TumblingPreReducer<T>(
+								(ReduceFunction<T>) transformation.getUDF(), getType()
 										.createSerializer(getExecutionConfig())).noEvict();
 					} else {
-						return new TumblingGroupedPreReducer<OUT>(
-								(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+						return new TumblingGroupedPreReducer<T>(
+								(ReduceFunction<T>) transformation.getUDF(), groupByKey,
 								getType().createSerializer(getExecutionConfig())).noEvict();
 					}
 				} else {
 					if (groupByKey == null) {
-						return new TumblingPreReducer<OUT>(
-								(ReduceFunction<OUT>) transformation.getUDF(), getType()
+						return new TumblingPreReducer<T>(
+								(ReduceFunction<T>) transformation.getUDF(), getType()
 										.createSerializer(getExecutionConfig()));
 					} else {
-						return new TumblingGroupedPreReducer<OUT>(
-								(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
+						return new TumblingGroupedPreReducer<T>(
+								(ReduceFunction<T>) transformation.getUDF(), groupByKey,
 								getType().createSerializer(getExecutionConfig()));
 					}
 				}
 			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
 				if (groupByKey == null) {
-					return new SlidingCountPreReducer<OUT>(
-							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+					return new SlidingCountPreReducer<T>(
+							clean((ReduceFunction<T>) transformation.getUDF()), dataStream
 									.getType().createSerializer(getExecutionConfig()),
 							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
 							((CountTriggerPolicy<?>) trigger).getStart());
 				} else {
-					return new SlidingCountGroupedPreReducer<OUT>(
-							clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
+					return new SlidingCountGroupedPreReducer<T>(
+							clean((ReduceFunction<T>) transformation.getUDF()), dataStream
 									.getType().createSerializer(getExecutionConfig()), groupByKey,
 							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
 							((CountTriggerPolicy<?>) trigger).getStart());
@@ -540,14 +539,14 @@ public class WindowedDataStream<OUT> {
 
 			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
 				if (groupByKey == null) {
-					return new SlidingTimePreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+					return new SlidingTimePreReducer<T>(
+							(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
 							WindowUtils.getTimeStampWrapper(trigger));
 				} else {
-					return new SlidingTimeGroupedPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
+					return new SlidingTimeGroupedPreReducer<T>(
+							(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
 									.createSerializer(getExecutionConfig()), groupByKey,
 							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
 							WindowUtils.getTimeStampWrapper(trigger));
@@ -555,26 +554,26 @@ public class WindowedDataStream<OUT> {
 
 			} else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
 				if (groupByKey == null) {
-					return new JumpingCountPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), getType()
+					return new JumpingCountPreReducer<T>(
+							(ReduceFunction<T>) transformation.getUDF(), getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
 				} else {
-					return new JumpingCountGroupedPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+					return new JumpingCountGroupedPreReducer<T>(
+							(ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
 				}
 			} else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
 				if (groupByKey == null) {
-					return new JumpingTimePreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), getType()
+					return new JumpingTimePreReducer<T>(
+							(ReduceFunction<T>) transformation.getUDF(), getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
 							WindowUtils.getTimeStampWrapper(trigger));
 				} else {
-					return new JumpingTimeGroupedPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+					return new JumpingTimeGroupedPreReducer<T>(
+							(ReduceFunction<T>) transformation.getUDF(), groupByKey, getType()
 									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
 							WindowUtils.getTimeStampWrapper(trigger));
@@ -586,7 +585,7 @@ public class WindowedDataStream<OUT> {
 			throw new RuntimeException(
 					"Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations");
 		} else {
-			return new BasicWindowBuffer<OUT>();
+			return new BasicWindowBuffer<T>();
 		}
 	}
 
@@ -598,8 +597,8 @@ public class WindowedDataStream<OUT> {
 	 *            The position in the tuple/array to sum
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> sum(int positionToSum) {
-		return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig()));
+	public WindowedDataStream<T> sum(int positionToSum) {
+		return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
 	}
 
 	/**
@@ -613,8 +612,8 @@ public class WindowedDataStream<OUT> {
 	 *            The field to sum
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> sum(String field) {
-		return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig()));
+	public WindowedDataStream<T> sum(String field) {
+		return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
 	}
 
 	/**
@@ -625,8 +624,8 @@ public class WindowedDataStream<OUT> {
 	 *            The position to minimize
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN,
+	public WindowedDataStream<T> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
 				getExecutionConfig()));
 	}
 
@@ -642,8 +641,8 @@ public class WindowedDataStream<OUT> {
 	 *            applied.
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> min(String field) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN,
+	public WindowedDataStream<T> min(String field) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
 				false, getExecutionConfig()));
 	}
 
@@ -656,7 +655,7 @@ public class WindowedDataStream<OUT> {
 	 *            The position to minimize by
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> minBy(int positionToMinBy) {
+	public WindowedDataStream<T> minBy(int positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -669,7 +668,7 @@ public class WindowedDataStream<OUT> {
 	 *            The position to minimize by
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> minBy(String positionToMinBy) {
+	public WindowedDataStream<T> minBy(String positionToMinBy) {
 		return this.minBy(positionToMinBy, true);
 	}
 
@@ -686,8 +685,8 @@ public class WindowedDataStream<OUT> {
 	 *            minimum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first,
+	public WindowedDataStream<T> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
 				getExecutionConfig()));
 	}
 
@@ -706,8 +705,8 @@ public class WindowedDataStream<OUT> {
 	 *            be returned
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> minBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MINBY,
+	public WindowedDataStream<T> minBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MINBY,
 				first, getExecutionConfig()));
 	}
 
@@ -719,8 +718,8 @@ public class WindowedDataStream<OUT> {
 	 *            The position to maximize
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX,
+	public WindowedDataStream<T> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
 				getExecutionConfig()));
 	}
 
@@ -736,8 +735,8 @@ public class WindowedDataStream<OUT> {
 	 *            applied.
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> max(String field) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX,
+	public WindowedDataStream<T> max(String field) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
 				false, getExecutionConfig()));
 	}
 
@@ -750,7 +749,7 @@ public class WindowedDataStream<OUT> {
 	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> maxBy(int positionToMaxBy) {
+	public WindowedDataStream<T> maxBy(int positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -763,7 +762,7 @@ public class WindowedDataStream<OUT> {
 	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> maxBy(String positionToMaxBy) {
+	public WindowedDataStream<T> maxBy(String positionToMaxBy) {
 		return this.maxBy(positionToMaxBy, true);
 	}
 
@@ -780,8 +779,8 @@ public class WindowedDataStream<OUT> {
 	 *            maximum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
+	public WindowedDataStream<T> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
 				getExecutionConfig()));
 	}
 
@@ -800,16 +799,16 @@ public class WindowedDataStream<OUT> {
 	 *            be returned
 	 * @return The transformed DataStream.
 	 */
-	public WindowedDataStream<OUT> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY, first,
+	public WindowedDataStream<T> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY, first,
 				getExecutionConfig()));
 	}
 
-	private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) {
+	private WindowedDataStream<T> aggregate(AggregationFunction<T> aggregator) {
 		return reduceWindow(aggregator);
 	}
 
-	protected TriggerPolicy<OUT> getTrigger() {
+	protected TriggerPolicy<T> getTrigger() {
 
 		if (triggerHelper != null) {
 			return triggerHelper.toTrigger();
@@ -821,7 +820,7 @@ public class WindowedDataStream<OUT> {
 
 	}
 
-	protected EvictionPolicy<OUT> getEviction() {
+	protected EvictionPolicy<T> getEviction() {
 
 		if (evictionHelper != null) {
 			return evictionHelper.toEvict();
@@ -829,7 +828,7 @@ public class WindowedDataStream<OUT> {
 			if (triggerHelper instanceof Time) {
 				return triggerHelper.toEvict();
 			} else {
-				return new TumblingEvictionPolicy<OUT>();
+				return new TumblingEvictionPolicy<T>();
 			}
 		} else {
 			return userEvicter;
@@ -854,7 +853,7 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 * @return The output type.
 	 */
-	public TypeInformation<OUT> getType() {
+	public TypeInformation<T> getType() {
 		return dataStream.getType();
 	}
 
@@ -862,7 +861,7 @@ public class WindowedDataStream<OUT> {
 		return dataStream.getExecutionConfig();
 	}
 
-	protected WindowedDataStream<OUT> copy() {
-		return new WindowedDataStream<OUT>(this);
+	protected WindowedDataStream<T> copy() {
+		return new WindowedDataStream<T>(this);
 	}
 }