You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:36 UTC

[02/13] flink git commit: [hotfix] Use closure cleaner for reduce window

[hotfix] Use closure cleaner for reduce window


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de9d2ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de9d2ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de9d2ef

Branch: refs/heads/master
Commit: 0de9d2ef66c9f111633c1a95dfc8e3100098b4df
Parents: 62df0a0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 5 10:21:15 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/AllWindowedStream.java      | 6 ++++++
 .../apache/flink/streaming/api/datastream/WindowedStream.java  | 3 +++
 2 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0de9d2ef/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 134029f..78ba8ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -121,6 +121,9 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
 	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "Reduce at " + callLocation;
 
@@ -185,6 +188,9 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "MapWindow at " + callLocation;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de9d2ef/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 41adab5..349651e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -132,6 +132,9 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @return The data stream that is the result of applying the reduce function to the window. 
 	 */
 	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "Reduce at " + callLocation;