You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 21:15:22 UTC
[3/3] flink git commit: [FLINK-2052] Fix Serialization warnings in
Stream Operators
[FLINK-2052] Fix Serialization warnings in Stream Operators
This closes #698
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b904ae2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b904ae2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b904ae2
Branch: refs/heads/master
Commit: 8b904ae21d319c6cd26d160f7c5cc91b5f081577
Parents: 495a5c3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed May 20 10:48:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 19:34:23 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/functions/KeySelector.java | 4 +-
.../api/datastream/StreamProjection.java | 50 ++++++++++----------
.../api/operators/AbstractStreamOperator.java | 6 +--
.../operators/AbstractUdfStreamOperator.java | 4 +-
.../streaming/api/operators/StreamCounter.java | 2 +
.../streaming/api/operators/StreamFilter.java | 2 +
.../streaming/api/operators/StreamFlatMap.java | 2 +
.../streaming/api/operators/StreamFold.java | 2 +
.../api/operators/StreamGroupedFold.java | 2 +
.../api/operators/StreamGroupedReduce.java | 2 +
.../streaming/api/operators/StreamMap.java | 2 +
.../streaming/api/operators/StreamProject.java | 20 ++++----
.../streaming/api/operators/StreamReduce.java | 2 +
.../streaming/api/operators/StreamSink.java | 2 +
.../streaming/api/operators/StreamSource.java | 2 +
.../api/operators/co/CoStreamFlatMap.java | 2 +
.../api/operators/co/CoStreamGroupedReduce.java | 1 +
.../streaming/api/operators/co/CoStreamMap.java | 2 +
.../api/operators/co/CoStreamReduce.java | 2 +
.../api/operators/co/CoStreamWindow.java | 2 +
.../windowing/GroupedActiveDiscretizer.java | 5 +-
.../windowing/GroupedStreamDiscretizer.java | 2 +-
.../windowing/GroupedWindowBuffer.java | 13 ++++-
.../operators/windowing/StreamDiscretizer.java | 8 ++--
.../operators/windowing/StreamWindowBuffer.java | 2 +
.../operators/windowing/WindowFlattener.java | 2 +
.../api/operators/windowing/WindowFolder.java | 1 +
.../api/operators/windowing/WindowMerger.java | 2 +
.../operators/windowing/WindowPartitioner.java | 2 +
.../streaming/api/operators/ProjectTest.java | 9 ++--
30 files changed, 106 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index 21ecc24..3d06c59 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.Function;
+import java.io.Serializable;
+
/**
* The {@link KeySelector} allows to use arbitrary objects for operations such as
* reduce, reduceGroup, join, coGoup, etc.
@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.Function;
* @param <IN> Type of objects to extract the key from.
* @param <KEY> Type of key.
*/
-public interface KeySelector<IN, KEY> extends Function, java.io.Serializable {
+public interface KeySelector<IN, KEY> extends Function, Serializable {
/**
* User-defined function that extracts the key from an arbitrary object.
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 447b1fd..16e9deb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -129,7 +129,7 @@ public class StreamProjection<IN> {
TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
- fieldIndexes, tType));
+ fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -143,7 +143,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -157,7 +157,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -171,7 +171,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -185,7 +185,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -199,7 +199,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -213,7 +213,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -227,7 +227,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -241,7 +241,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -255,7 +255,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -269,7 +269,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -297,7 +297,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -311,7 +311,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -339,7 +339,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -353,7 +353,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -367,7 +367,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -381,7 +381,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -395,7 +395,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -409,7 +409,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -423,7 +423,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -437,7 +437,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -451,7 +451,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
/**
@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType));
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a8dc8c5..b55e5d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -31,11 +31,11 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
private static final long serialVersionUID = 1L;
- protected RuntimeContext runtimeContext;
+ protected transient RuntimeContext runtimeContext;
- protected ExecutionConfig executionConfig;
+ protected transient ExecutionConfig executionConfig;
- public Output<OUT> output;
+ public transient Output<OUT> output;
// A sane default for most operators
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index dbd93b5..09d1ef6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -32,7 +32,9 @@ import java.io.Serializable;
* @param <OUT> The output type of the operator
* @param <F> The type of the user function
*/
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
+
+ private static final long serialVersionUID = 1L;
protected final F userFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
index d9a67dd..240e2b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
@@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators;
public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
+ private static final long serialVersionUID = 1L;
+
private Long count = 0L;
public StreamCounter() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index e5575db..a54a4ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FilterFunction;
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
+ private static final long serialVersionUID = 1L;
+
public StreamFilter(FilterFunction<IN> filterFunction) {
super(filterFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index 1e836b1..e8da2c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -23,6 +23,8 @@ public class StreamFlatMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index 6f956ca..580477a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -26,6 +26,8 @@ public class StreamFold<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
private OUT accumulator;
protected TypeSerializer<OUT> outTypeSerializer;
protected TypeInformation<OUT> outTypeInformation;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 75217be..08107a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector;
public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
private KeySelector<IN, ?> keySelector;
private Map<Object, OUT> values;
private OUT initialValue;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index e3980ce..8269be7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector;
public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+ private static final long serialVersionUID = 1L;
+
private KeySelector<IN, ?> keySelector;
private Map<Object, IN> values;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
index a379c56..08dc981 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -23,6 +23,8 @@ public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index d039144..83613d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
@@ -26,16 +25,18 @@ public class StreamProject<IN, OUT extends Tuple>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT> {
- transient OUT outTuple;
- TypeSerializer<OUT> outTypeSerializer;
- TypeInformation<OUT> outTypeInformation;
- int[] fields;
- int numFields;
+ private static final long serialVersionUID = 1L;
- public StreamProject(int[] fields, TypeInformation<OUT> outTypeInformation) {
+ private TypeSerializer<OUT> outSerializer;
+ private int[] fields;
+ private int numFields;
+
+ private transient OUT outTuple;
+
+ public StreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
this.fields = fields;
this.numFields = this.fields.length;
- this.outTypeInformation = outTypeInformation;
+ this.outSerializer = outSerializer;
chainingStrategy = ChainingStrategy.ALWAYS;
}
@@ -52,7 +53,6 @@ public class StreamProject<IN, OUT extends Tuple>
@Override
public void open(Configuration config) throws Exception {
super.open(config);
- this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
- outTuple = outTypeSerializer.createInstance();
+ outTuple = outSerializer.createInstance();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 8205fe6..97cebc1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.ReduceFunction;
public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
+ private static final long serialVersionUID = 1L;
+
private IN currentValue;
public StreamReduce(ReduceFunction<IN> reducer) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index b1a0212..5399302 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
implements OneInputStreamOperator<IN, Object> {
+ private static final long serialVersionUID = 1L;
+
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 2e4d313..9cdfb01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>> implements StreamOperator<OUT> {
+ private static final long serialVersionUID = 1L;
+
public StreamSource(SourceFunction<OUT> sourceFunction) {
super(sourceFunction);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 0be8c90..e3662d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -25,6 +25,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
index d136719..3dc509a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
+
private static final long serialVersionUID = 1L;
protected KeySelector<IN1, ?> keySelector1;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index 9a98c66..a8e57e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -25,6 +25,8 @@ public class CoStreamMap<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
super(mapper);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
index 81da189..7157b1d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
@@ -25,6 +25,8 @@ public class CoStreamReduce<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
protected IN1 currentValue1 = null;
protected IN2 currentValue2 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
index 8f2a0b8..e7b069e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
@@ -33,6 +33,8 @@ public class CoStreamWindow<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoWindowFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
protected long windowSize;
protected long slideSize;
protected CircularFifoList<StreamRecord<IN1>> circularList1;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 2ff8496..fd95110 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory;
public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
- private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
-
+ private static final long serialVersionUID = 1L;
- private static final long serialVersionUID = -3469545957144404137L;
+ private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
private volatile IN last;
private Thread centralThread;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
index 7f6a917..e80b6ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
*/
public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
- private static final long serialVersionUID = -3469545957144404137L;
+ private static final long serialVersionUID = 1L;
protected KeySelector<IN, ?> keySelector;
protected Configuration parameters;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
index 4e4350d..c6b2499 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.api.operators.windowing;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.Map;
@@ -31,12 +33,21 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
*/
public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
- private Map<Object, WindowBuffer<T>> windowMap = new HashMap<Object, WindowBuffer<T>>();
+ private static final long serialVersionUID = 1L;
+
private KeySelector<T, ?> keySelector;
+ private transient Map<Object, WindowBuffer<T>> windowMap;
+
public GroupedWindowBuffer(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
super(buffer);
this.keySelector = keySelector;
+ this.windowMap = new HashMap<Object, WindowBuffer<T>>();
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.windowMap = new HashMap<Object, WindowBuffer<T>>();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index fbe6e44..4ab31cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -36,18 +36,16 @@ public class StreamDiscretizer<IN>
extends AbstractStreamOperator<WindowEvent<IN>>
implements OneInputStreamOperator<IN, WindowEvent<IN>> {
- /**
- * Auto-generated serial version UID
- */
- private static final long serialVersionUID = -8038984294071650730L;
+ private static final long serialVersionUID = 1L;
protected TriggerPolicy<IN> triggerPolicy;
protected EvictionPolicy<IN> evictionPolicy;
private boolean isActiveTrigger;
private boolean isActiveEviction;
- private Thread activePolicyThread;
private int bufferSize = 0;
+ private transient Thread activePolicyThread;
+
protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
index b9de698..f890b69 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
@@ -30,6 +30,8 @@ public class StreamWindowBuffer<T>
extends AbstractStreamOperator<StreamWindow<T>>
implements OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> {
+ private static final long serialVersionUID = 1L;
+
protected WindowBuffer<T> buffer;
public StreamWindowBuffer(WindowBuffer<T> buffer) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
index f1b0ee2..3afc50f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
@@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowFlattener<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<StreamWindow<T>, T> {
+ private static final long serialVersionUID = 1L;
+
public WindowFlattener() {
chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index 29a68db..bdf6782 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
private static final long serialVersionUID = 1L;
+
FoldFunction<IN, OUT> folder;
public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index 12dd239..9b33474 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
+ private static final long serialVersionUID = 1L;
+
private Map<Integer, StreamWindow<T>> windows;
public WindowMerger() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
index 14a055a..b86caaa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>>
implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
+ private static final long serialVersionUID = 1L;
+
private KeySelector<T, ?> keySelector;
private int numberOfSplits;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b904ae2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index 035abe6..d9cc607 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
@@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.streaming.api.datastream.StreamProjection;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -51,12 +53,13 @@ public class ProjectTest implements Serializable {
int[] fields = new int[]{4, 4, 3};
+ TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
+ new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
+ .createSerializer(new ExecutionConfig());
@SuppressWarnings("unchecked")
StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
- fields,
- new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
- .extractFieldTypes(fields, inType)));
+ fields, serializer);
List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer,
String, Integer>>();