You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/25 13:38:37 UTC

[2/2] flink git commit: [FLINK-2257] [streaming] Properly forward rich window function calls to wrapped functions

[FLINK-2257] [streaming] Properly forward rich window function calls to wrapped functions

Closes #855


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/658a0782
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/658a0782
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/658a0782

Branch: refs/heads/master
Commit: 658a0782f20391ad2de5e4b1aae217ac3ac6ae7c
Parents: ba2796a
Author: mbalassi <mb...@apache.org>
Authored: Mon Jun 22 12:45:33 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Thu Jun 25 13:37:23 2015 +0200

----------------------------------------------------------------------
 .../api/operators/windowing/WindowFolder.java   | 23 +++++++++++++++++++-
 .../api/operators/windowing/WindowMapper.java   | 21 ++++++++++++++++++
 .../api/operators/windowing/WindowReducer.java  | 22 +++++++++++++++++++
 3 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/658a0782/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index b8f407a..cdfc35b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -18,10 +18,11 @@
 package org.apache.flink.streaming.api.operators.windowing;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -69,11 +70,31 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 			return outputWindow;
 		}
 
+		// --------------------------------------------------------------------------------------------
+		//  Forwarding calls to the wrapped folder
+		// --------------------------------------------------------------------------------------------
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			FunctionUtils.openFunction(folder, parameters);
+		}
+
+		@Override
+		public void close() throws Exception {
+			FunctionUtils.closeFunction(folder);
+		}
+
 		@Override
 		public void setRuntimeContext(RuntimeContext t) {
 			FunctionUtils.setFunctionRuntimeContext(folder, t);
 		}
 
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return FunctionUtils.getFunctionRuntimeContext(folder, getRuntimeContext());
+		}
+
+		// streaming does not use iteration runtime context, so that is omitted
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/658a0782/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
index 18a237d..ec4309d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -63,11 +64,31 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 			return outputWindow;
 		}
 
+		// --------------------------------------------------------------------------------------------
+		//  Forwarding calls to the wrapped mapper
+		// --------------------------------------------------------------------------------------------
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			FunctionUtils.openFunction(mapper, parameters);
+		}
+
+		@Override
+		public void close() throws Exception {
+			FunctionUtils.closeFunction(mapper);
+		}
+
 		@Override
 		public void setRuntimeContext(RuntimeContext t) {
 			FunctionUtils.setFunctionRuntimeContext(mapper, t);
 		}
 
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return FunctionUtils.getFunctionRuntimeContext(mapper, getRuntimeContext());
+		}
+
+		// streaming does not use iteration runtime context, so that is omitted
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/658a0782/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
index ff88bab..a43405e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -67,11 +68,32 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
 			return outputWindow;
 		}
 
+		// --------------------------------------------------------------------------------------------
+		//  Forwarding calls to the wrapped reducer
+		// --------------------------------------------------------------------------------------------
+
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			FunctionUtils.openFunction(reducer, parameters);
+		}
+
+		@Override
+		public void close() throws Exception {
+			FunctionUtils.closeFunction(reducer);
+		}
+
 		@Override
 		public void setRuntimeContext(RuntimeContext t) {
 			FunctionUtils.setFunctionRuntimeContext(reducer, t);
 		}
 
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return FunctionUtils.getFunctionRuntimeContext(reducer, getRuntimeContext());
+		}
+
+		// streaming does not use iteration runtime context, so that is omitted
 	}
 
 }