You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/02 16:51:00 UTC
[2/3] flink git commit: [hotfix] [streaming] Apply closure cleaner to
KeyedWindowFunction
[hotfix] [streaming] Apply closure cleaner to KeyedWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73902011
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73902011
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73902011
Branch: refs/heads/master
Commit: 739020110251aa98773695163bb23b52c48f8987
Parents: daeab84
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 19:59:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:10:53 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/datastream/KeyedWindowDataStream.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73902011/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index ad7ca37..302a645 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -178,6 +178,9 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
+ // clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, KeyedWindowFunction.class, true, true, inType, null, false);