You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:36 UTC
[02/13] flink git commit: [hotfix] Use closure cleaner for reduce
window
[hotfix] Use closure cleaner for reduce window
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de9d2ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de9d2ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de9d2ef
Branch: refs/heads/master
Commit: 0de9d2ef66c9f111633c1a95dfc8e3100098b4df
Parents: 62df0a0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 5 10:21:15 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/datastream/AllWindowedStream.java | 6 ++++++
.../apache/flink/streaming/api/datastream/WindowedStream.java | 3 +++
2 files changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0de9d2ef/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 134029f..78ba8ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -121,6 +121,9 @@ public class AllWindowedStream<T, W extends Window> {
* @return The data stream that is the result of applying the reduce function to the window.
*/
public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
String callLocation = Utils.getCallLocationName();
String udfName = "Reduce at " + callLocation;
@@ -185,6 +188,9 @@ public class AllWindowedStream<T, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
String callLocation = Utils.getCallLocationName();
String udfName = "MapWindow at " + callLocation;
http://git-wip-us.apache.org/repos/asf/flink/blob/0de9d2ef/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 41adab5..349651e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -132,6 +132,9 @@ public class WindowedStream<T, K, W extends Window> {
* @return The data stream that is the result of applying the reduce function to the window.
*/
public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
String callLocation = Utils.getCallLocationName();
String udfName = "Reduce at " + callLocation;