You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/25 13:38:37 UTC
[2/2] flink git commit: [FLINK-2257] [streaming] Properly forward
rich window function calls to wrapped functions
[FLINK-2257] [streaming] Properly forward rich window function calls to wrapped functions
Closes #855
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/658a0782
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/658a0782
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/658a0782
Branch: refs/heads/master
Commit: 658a0782f20391ad2de5e4b1aae217ac3ac6ae7c
Parents: ba2796a
Author: mbalassi <mb...@apache.org>
Authored: Mon Jun 22 12:45:33 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Thu Jun 25 13:37:23 2015 +0200
----------------------------------------------------------------------
.../api/operators/windowing/WindowFolder.java | 23 +++++++++++++++++++-
.../api/operators/windowing/WindowMapper.java | 21 ++++++++++++++++++
.../api/operators/windowing/WindowReducer.java | 22 +++++++++++++++++++
3 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/658a0782/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index b8f407a..cdfc35b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -18,10 +18,11 @@
package org.apache.flink.streaming.api.operators.windowing;
import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -69,11 +70,31 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
return outputWindow;
}
+ // --------------------------------------------------------------------------------------------
+ // Forwarding calls to the wrapped folder
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ FunctionUtils.openFunction(folder, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(folder);
+ }
+
@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(folder, t);
}
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return FunctionUtils.getFunctionRuntimeContext(folder, getRuntimeContext());
+ }
+
+ // streaming does not use iteration runtime context, so that is omitted
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/658a0782/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
index 18a237d..ec4309d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -63,11 +64,31 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
return outputWindow;
}
+ // --------------------------------------------------------------------------------------------
+ // Forwarding calls to the wrapped mapper
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ FunctionUtils.openFunction(mapper, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(mapper);
+ }
+
@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(mapper, t);
}
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return FunctionUtils.getFunctionRuntimeContext(mapper, getRuntimeContext());
+ }
+
+ // streaming does not use iteration runtime context, so that is omitted
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/658a0782/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
index ff88bab..a43405e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -67,11 +68,32 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
return outputWindow;
}
+ // --------------------------------------------------------------------------------------------
+ // Forwarding calls to the wrapped reducer
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ FunctionUtils.openFunction(reducer, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(reducer);
+ }
+
@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(reducer, t);
}
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return FunctionUtils.getFunctionRuntimeContext(reducer, getRuntimeContext());
+ }
+
+ // streaming does not use iteration runtime context, so that is omitted
}
}