You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/05 12:14:44 UTC

flink git commit: [FLINK-1956] [streaming] Proper opening of rich windowfunctions

Repository: flink
Updated Branches:
  refs/heads/master ae867efd3 -> 912edbca1


[FLINK-1956] [streaming] Proper opening of rich windowfunctions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/912edbca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/912edbca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/912edbca

Branch: refs/heads/master
Commit: 912edbca1aab6f204e917032ba82dcb612b25e93
Parents: ae867ef
Author: mbalassi <mb...@apache.org>
Authored: Mon May 4 16:43:38 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue May 5 10:08:58 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/windowing/WindowFolder.java  | 10 +++++++++-
 .../streaming/api/operators/windowing/WindowMapper.java  | 11 ++++++++++-
 .../streaming/api/operators/windowing/WindowReducer.java | 10 +++++++++-
 3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/912edbca/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index 138861a..04a700b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.streaming.api.operators.windowing;
 
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -38,7 +41,7 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 		withoutInputCopy();
 	}
 
-	private static class WindowFoldFunction<IN, OUT> implements
+	private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements
 			MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
 
 		private static final long serialVersionUID = 1L;
@@ -65,6 +68,11 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 			return outputWindow;
 		}
 
+		@Override
+		public void setRuntimeContext(RuntimeContext t) {
+			FunctionUtils.setFunctionRuntimeContext(folder, t);
+		}
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/912edbca/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
index f47bcb7..400ba24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
@@ -17,7 +17,10 @@
 
 package org.apache.flink.streaming.api.operators.windowing;
 
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -39,7 +42,8 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 		withoutInputCopy();
 	}
 
-	private static class WindowMap<T, R> implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
+	private static class WindowMap<T, R> extends AbstractRichFunction
+			implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
 
 		private static final long serialVersionUID = 1L;
 		WindowMapFunction<T, R> mapper;
@@ -59,6 +63,11 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 			return outputWindow;
 		}
 
+		@Override
+		public void setRuntimeContext(RuntimeContext t) {
+			FunctionUtils.setFunctionRuntimeContext(mapper, t);
+		}
+
 	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/912edbca/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
index 3dc4038..4143064 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.streaming.api.operators.windowing;
 
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -39,7 +42,7 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
 		withoutInputCopy();
 	}
 
-	private static class WindowReduceFunction<T> implements
+	private static class WindowReduceFunction<T> extends AbstractRichFunction implements
 			MapFunction<StreamWindow<T>, StreamWindow<T>> {
 
 		private static final long serialVersionUID = 1L;
@@ -64,6 +67,11 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
 			return outputWindow;
 		}
 
+		@Override
+		public void setRuntimeContext(RuntimeContext t) {
+			FunctionUtils.setFunctionRuntimeContext(reducer, t);
+		}
+
 	}
 
 }