You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 21:15:22 UTC

[3/3] flink git commit: [FLINK-2052] Fix Serialization warnings in Stream Operators

[FLINK-2052] Fix Serialization warnings in Stream Operators

This closes #698


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b904ae2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b904ae2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b904ae2

Branch: refs/heads/master
Commit: 8b904ae21d319c6cd26d160f7c5cc91b5f081577
Parents: 495a5c3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed May 20 10:48:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 19:34:23 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/functions/KeySelector.java   |  4 +-
 .../api/datastream/StreamProjection.java        | 50 ++++++++++----------
 .../api/operators/AbstractStreamOperator.java   |  6 +--
 .../operators/AbstractUdfStreamOperator.java    |  4 +-
 .../streaming/api/operators/StreamCounter.java  |  2 +
 .../streaming/api/operators/StreamFilter.java   |  2 +
 .../streaming/api/operators/StreamFlatMap.java  |  2 +
 .../streaming/api/operators/StreamFold.java     |  2 +
 .../api/operators/StreamGroupedFold.java        |  2 +
 .../api/operators/StreamGroupedReduce.java      |  2 +
 .../streaming/api/operators/StreamMap.java      |  2 +
 .../streaming/api/operators/StreamProject.java  | 20 ++++----
 .../streaming/api/operators/StreamReduce.java   |  2 +
 .../streaming/api/operators/StreamSink.java     |  2 +
 .../streaming/api/operators/StreamSource.java   |  2 +
 .../api/operators/co/CoStreamFlatMap.java       |  2 +
 .../api/operators/co/CoStreamGroupedReduce.java |  1 +
 .../streaming/api/operators/co/CoStreamMap.java |  2 +
 .../api/operators/co/CoStreamReduce.java        |  2 +
 .../api/operators/co/CoStreamWindow.java        |  2 +
 .../windowing/GroupedActiveDiscretizer.java     |  5 +-
 .../windowing/GroupedStreamDiscretizer.java     |  2 +-
 .../windowing/GroupedWindowBuffer.java          | 13 ++++-
 .../operators/windowing/StreamDiscretizer.java  |  8 ++--
 .../operators/windowing/StreamWindowBuffer.java |  2 +
 .../operators/windowing/WindowFlattener.java    |  2 +
 .../api/operators/windowing/WindowFolder.java   |  1 +
 .../api/operators/windowing/WindowMerger.java   |  2 +
 .../operators/windowing/WindowPartitioner.java  |  2 +
 .../streaming/api/operators/ProjectTest.java    |  9 ++--
 30 files changed, 106 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index 21ecc24..3d06c59 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.Function;
 
+import java.io.Serializable;
+
 /**
  * The {@link KeySelector} allows to use arbitrary objects for operations such as
  * reduce, reduceGroup, join, coGoup, etc.
@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.Function;
  * @param <IN> Type of objects to extract the key from.
  * @param <KEY> Type of key.
  */
-public interface KeySelector<IN, KEY> extends Function, java.io.Serializable {
+public interface KeySelector<IN, KEY> extends Function, Serializable {
 	
 	/**
 	 * User-defined function that extracts the key from an arbitrary object.

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 447b1fd..16e9deb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -129,7 +129,7 @@ public class StreamProjection<IN> {
 		TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
 
 		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
-				fieldIndexes, tType));
+				fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -143,7 +143,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -157,7 +157,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -171,7 +171,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -185,7 +185,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -199,7 +199,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -213,7 +213,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -227,7 +227,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -241,7 +241,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -255,7 +255,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -269,7 +269,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -297,7 +297,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -311,7 +311,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -339,7 +339,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -353,7 +353,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -367,7 +367,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -381,7 +381,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -395,7 +395,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -409,7 +409,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -423,7 +423,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -437,7 +437,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -451,7 +451,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	/**
@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
 		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
 		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
 
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType));
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
 	}
 
 	public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a8dc8c5..b55e5d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -31,11 +31,11 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	protected RuntimeContext runtimeContext;
+	protected transient RuntimeContext runtimeContext;
 
-	protected ExecutionConfig executionConfig;
+	protected transient ExecutionConfig executionConfig;
 
-	public Output<OUT> output;
+	public transient Output<OUT> output;
 
 	// A sane default for most operators
 	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index dbd93b5..09d1ef6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -32,7 +32,9 @@ import java.io.Serializable;
  * @param <OUT> The output type of the operator
  * @param <F> The type of the user function
  */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+
+	private static final long serialVersionUID = 1L;
 
 	protected final F userFunction;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
index d9a67dd..240e2b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
@@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators;
 
 public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
 
+	private static final long serialVersionUID = 1L;
+
 	private Long count = 0L;
 
 	public StreamCounter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index e5575db..a54a4ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FilterFunction;
 
 public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
 
+	private static final long serialVersionUID = 1L;
+
 	public StreamFilter(FilterFunction<IN> filterFunction) {
 		super(filterFunction);
 		chainingStrategy = ChainingStrategy.ALWAYS;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index 1e836b1..e8da2c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -23,6 +23,8 @@ public class StreamFlatMap<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
 		chainingStrategy = ChainingStrategy.ALWAYS;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index 6f956ca..580477a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -26,6 +26,8 @@ public class StreamFold<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	private OUT accumulator;
 	protected TypeSerializer<OUT> outTypeSerializer;
 	protected TypeInformation<OUT> outTypeInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 75217be..08107a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 
 public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	private KeySelector<IN, ?> keySelector;
 	private Map<Object, OUT> values;
 	private OUT initialValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index e3980ce..8269be7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 
 public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 
+	private static final long serialVersionUID = 1L;
+
 	private KeySelector<IN, ?> keySelector;
 	private Map<Object, IN> values;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
index a379c56..08dc981 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -23,6 +23,8 @@ public class StreamMap<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	public StreamMap(MapFunction<IN, OUT> mapper) {
 		super(mapper);
 		chainingStrategy = ChainingStrategy.ALWAYS;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index d039144..83613d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
@@ -26,16 +25,18 @@ public class StreamProject<IN, OUT extends Tuple>
 		extends AbstractStreamOperator<OUT>
 		implements OneInputStreamOperator<IN, OUT> {
 
-	transient OUT outTuple;
-	TypeSerializer<OUT> outTypeSerializer;
-	TypeInformation<OUT> outTypeInformation;
-	int[] fields;
-	int numFields;
+	private static final long serialVersionUID = 1L;
 
-	public StreamProject(int[] fields, TypeInformation<OUT> outTypeInformation) {
+	private TypeSerializer<OUT> outSerializer;
+	private int[] fields;
+	private int numFields;
+
+	private transient OUT outTuple;
+
+	public StreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
 		this.fields = fields;
 		this.numFields = this.fields.length;
-		this.outTypeInformation = outTypeInformation;
+		this.outSerializer = outSerializer;
 
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
@@ -52,7 +53,6 @@ public class StreamProject<IN, OUT extends Tuple>
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
-		outTuple = outTypeSerializer.createInstance();
+		outTuple = outSerializer.createInstance();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 8205fe6..97cebc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
 		implements OneInputStreamOperator<IN, IN> {
 
+	private static final long serialVersionUID = 1L;
+
 	private IN currentValue;
 
 	public StreamReduce(ReduceFunction<IN> reducer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index b1a0212..5399302 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
 		implements OneInputStreamOperator<IN, Object> {
 
+	private static final long serialVersionUID = 1L;
+
 	public StreamSink(SinkFunction<IN> sinkFunction) {
 		super(sinkFunction);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 2e4d313..9cdfb01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>> implements StreamOperator<OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	public StreamSource(SourceFunction<OUT> sourceFunction) {
 		super(sourceFunction);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 0be8c90..e3662d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -25,6 +25,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
 		implements TwoInputStreamOperator<IN1, IN2, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
index d136719..3dc509a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
 
 public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
+
 	private static final long serialVersionUID = 1L;
 
 	protected KeySelector<IN1, ?> keySelector1;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index 9a98c66..a8e57e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -25,6 +25,8 @@ public class CoStreamMap<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
 		implements TwoInputStreamOperator<IN1, IN2, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
 		super(mapper);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
index 81da189..7157b1d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
@@ -25,6 +25,8 @@ public class CoStreamReduce<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>>
 		implements TwoInputStreamOperator<IN1, IN2, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	protected IN1 currentValue1 = null;
 	protected IN2 currentValue2 = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
index 8f2a0b8..e7b069e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
@@ -33,6 +33,8 @@ public class CoStreamWindow<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoWindowFunction<IN1, IN2, OUT>>
 		implements TwoInputStreamOperator<IN1, IN2, OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	protected long windowSize;
 	protected long slideSize;
 	protected CircularFifoList<StreamRecord<IN1>> circularList1;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 2ff8496..fd95110 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory;
 
 public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
-
+	private static final long serialVersionUID = 1L;
 
-	private static final long serialVersionUID = -3469545957144404137L;
+	private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
 
 	private volatile IN last;
 	private Thread centralThread;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
index 7f6a917..e80b6ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
  */
 public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 
-	private static final long serialVersionUID = -3469545957144404137L;
+	private static final long serialVersionUID = 1L;
 
 	protected KeySelector<IN, ?> keySelector;
 	protected Configuration parameters;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
index 4e4350d..c6b2499 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.operators.windowing;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -31,12 +33,21 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
  */
 public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
 
-	private Map<Object, WindowBuffer<T>> windowMap = new HashMap<Object, WindowBuffer<T>>();
+	private static final long serialVersionUID = 1L;
+
 	private KeySelector<T, ?> keySelector;
 
+	private transient Map<Object, WindowBuffer<T>> windowMap;
+
 	public GroupedWindowBuffer(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
 		super(buffer);
 		this.keySelector = keySelector;
+		this.windowMap = new HashMap<Object, WindowBuffer<T>>();
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		this.windowMap = new HashMap<Object, WindowBuffer<T>>();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index fbe6e44..4ab31cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -36,18 +36,16 @@ public class StreamDiscretizer<IN>
 		extends AbstractStreamOperator<WindowEvent<IN>>
 		implements OneInputStreamOperator<IN, WindowEvent<IN>> {
 
-	/**
-	 * Auto-generated serial version UID
-	 */
-	private static final long serialVersionUID = -8038984294071650730L;
+	private static final long serialVersionUID = 1L;
 
 	protected TriggerPolicy<IN> triggerPolicy;
 	protected EvictionPolicy<IN> evictionPolicy;
 	private boolean isActiveTrigger;
 	private boolean isActiveEviction;
-	private Thread activePolicyThread;
 	private int bufferSize = 0;
 
+	private transient Thread activePolicyThread;
+
 	protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
 
 	public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
index b9de698..f890b69 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
@@ -30,6 +30,8 @@ public class StreamWindowBuffer<T>
 		extends AbstractStreamOperator<StreamWindow<T>>
 		implements OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> {
 
+	private static final long serialVersionUID = 1L;
+
 	protected WindowBuffer<T> buffer;
 
 	public StreamWindowBuffer(WindowBuffer<T> buffer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
index f1b0ee2..3afc50f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
@@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 public class WindowFlattener<T> extends AbstractStreamOperator<T>
 		implements OneInputStreamOperator<StreamWindow<T>, T> {
 
+	private static final long serialVersionUID = 1L;
+
 	public WindowFlattener() {
 		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index 29a68db..bdf6782 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
 
 	private static final long serialVersionUID = 1L;
+
 	FoldFunction<IN, OUT> folder;
 
 	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index 12dd239..9b33474 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
 		implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
 
+	private static final long serialVersionUID = 1L;
+
 	private Map<Integer, StreamWindow<T>> windows;
 
 	public WindowMerger() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
index 14a055a..b86caaa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>>
 		implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
 
+	private static final long serialVersionUID = 1L;
+
 	private KeySelector<T, ?> keySelector;
 	private int numberOfSplits;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index 035abe6..d9cc607 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.streaming.api.datastream.StreamProjection;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -51,12 +53,13 @@ public class ProjectTest implements Serializable {
 
 		int[] fields = new int[]{4, 4, 3};
 
+		TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
+				new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
+						.createSerializer(new ExecutionConfig());
 		@SuppressWarnings("unchecked")
 		StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
 				new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-						fields,
-						new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
-								.extractFieldTypes(fields, inType)));
+						fields, serializer);
 
 		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer,
 				String, Integer>>();