You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/10 07:06:31 UTC
[3/7] flink git commit: [FLINK-6164] Make ProcessWindowFunction a
RichFunction
[FLINK-6164] Make ProcessWindowFunction a RichFunction
This closes #3824.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c4560d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c4560d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c4560d3
Branch: refs/heads/master
Commit: 5c4560d3bcff47f8563c75ca909d20469e4736c0
Parents: b787536
Author: zentol <ch...@apache.org>
Authored: Wed May 3 15:57:05 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:48 2017 +0200
----------------------------------------------------------------------
.../FoldApplyProcessAllWindowFunction.java | 2 +-
.../FoldApplyProcessWindowFunction.java | 2 +-
.../windowing/ProcessAllWindowFunction.java | 4 +-
.../windowing/ProcessWindowFunction.java | 4 +-
.../ReduceApplyProcessAllWindowFunction.java | 3 +-
.../ReduceApplyProcessWindowFunction.java | 2 +-
.../windowing/RichProcessAllWindowFunction.java | 53 ++------------------
.../windowing/RichProcessWindowFunction.java | 53 ++------------------
.../functions/InternalWindowFunctionTest.java | 11 ++--
...AlignedProcessingTimeWindowOperatorTest.java | 4 +-
.../function/ProcessAllWindowFunction.scala | 7 ++-
.../scala/function/ProcessWindowFunction.scala | 6 ++-
.../function/RichProcessAllWindowFunction.scala | 53 +-------------------
.../function/RichProcessWindowFunction.scala | 53 +-------------------
.../ScalaProcessWindowFunctionWrapper.scala | 20 +++-----
...ngIdentityRichProcessAllWindowFunction.scala | 4 +-
...ckingIdentityRichProcessWindowFunction.scala | 4 +-
17 files changed, 45 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 38244dd..b96a8ff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
*/
@Internal
public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
- extends RichProcessAllWindowFunction<T, R, W>
+ extends ProcessAllWindowFunction<T, R, W>
implements OutputTypeConfigurable<R> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 1b2c2e2..98f5622 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
*/
@Internal
public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
- extends RichProcessWindowFunction<T, R, K, W>
+ extends ProcessWindowFunction<T, R, K, W>
implements OutputTypeConfigurable<R> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 4d247a7..34a37bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@PublicEvolving
-public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function {
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 2c80e9e..506b610 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@PublicEvolving
-public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index d1f9ccd..e7e6609 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector;
* {@link ReduceFunction}.
*/
@Internal
-public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
- extends RichProcessAllWindowFunction<T, R, W> {
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> extends ProcessAllWindowFunction<T, R, W> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 836726d..18037b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
*/
@Internal
public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
- extends RichProcessWindowFunction<T, R, K, W> {
+ extends ProcessWindowFunction<T, R, K, W> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
index 1130fa5..a800870 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -19,10 +19,6 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
@@ -32,53 +28,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <W> The type of {@code Window} that this window function can be applied on.
+ *
+ * @deprecated use {@link ProcessAllWindowFunction} instead
*/
@PublicEvolving
-public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
- extends ProcessAllWindowFunction<IN, OUT, W>
- implements RichFunction {
+@Deprecated
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> extends ProcessAllWindowFunction<IN, OUT, W> {
private static final long serialVersionUID = 1L;
-
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- private transient RuntimeContext runtimeContext;
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- this.runtimeContext = t;
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- if (this.runtimeContext != null) {
- return this.runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
-
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.");
- } else if (this.runtimeContext instanceof IterationRuntimeContext) {
- return (IterationRuntimeContext) this.runtimeContext;
- } else {
- throw new IllegalStateException("This stub is not part of an iteration step function.");
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {}
-
- @Override
- public void close() throws Exception {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
index ac55bc6..83da065 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -19,10 +19,6 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
@@ -33,53 +29,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
+ *
+ * @deprecated use {@link ProcessWindowFunction} instead
*/
@PublicEvolving
-public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
- extends ProcessWindowFunction<IN, OUT, KEY, W>
- implements RichFunction {
+@Deprecated
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window> extends ProcessWindowFunction<IN, OUT, KEY, W> {
private static final long serialVersionUID = 1L;
-
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- private transient RuntimeContext runtimeContext;
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- this.runtimeContext = t;
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- if (this.runtimeContext != null) {
- return this.runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
-
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.");
- } else if (this.runtimeContext instanceof IterationRuntimeContext) {
- return (IterationRuntimeContext) this.runtimeContext;
- } else {
- throw new IllegalStateException("This stub is not part of an iteration step function.");
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {}
-
- @Override
- public void close() throws Exception {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 4b8057f..4b0f5ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -24,11 +24,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -612,7 +611,7 @@ public class InternalWindowFunctionTest {
}
public static class ProcessWindowFunctionMock
- extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
+ extends ProcessWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@@ -626,7 +625,7 @@ public class InternalWindowFunctionTest {
}
public static class AggregateProcessWindowFunctionMock
- extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
+ extends ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@@ -640,7 +639,7 @@ public class InternalWindowFunctionTest {
}
public static class AggregateProcessAllWindowFunctionMock
- extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
+ extends ProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@@ -679,7 +678,7 @@ public class InternalWindowFunctionTest {
}
public static class ProcessAllWindowFunctionMock
- extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+ extends ProcessAllWindowFunction<Long, String, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index a8d3154..2f7e302 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,7 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -1038,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
+ private static class StatefulFunction extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
// we use a concurrent map here even though there is no concurrency, to
// get "volatile" style access to entries
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 2f0e48e..49911e4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -18,10 +18,8 @@
package org.apache.flink.streaming.api.scala.function
-import java.io.Serializable
-
import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -35,7 +33,8 @@ import org.apache.flink.util.Collector
* @tparam W The type of the window.
*/
@PublicEvolving
-abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
+ extends AbstractRichFunction {
/**
* Evaluates the window and outputs none or several elements.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index bc79a26..d2075db 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala.function
import java.io.Serializable
import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -36,7 +36,9 @@ import org.apache.flink.util.Collector
* @tparam W The type of the window.
*/
@PublicEvolving
-abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
+ extends AbstractRichFunction {
+
/**
* Evaluates the window and outputs none or several elements.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
index 22d64a8..6edc1e6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -18,11 +18,7 @@
package org.apache.flink.streaming.api.scala.function
-import java.beans.Transient
-
import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.Window
/**
@@ -34,53 +30,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window
* @tparam W The type of the window.
*/
@Public
+@deprecated("use [[ProcessAllWindowFunction]] instead")
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
- extends ProcessAllWindowFunction[IN, OUT, W]
- with RichFunction {
-
- @Transient
- private var runtimeContext: RuntimeContext = null
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- override def setRuntimeContext(t: RuntimeContext) {
- this.runtimeContext = t
- }
-
- override def getRuntimeContext: RuntimeContext = {
- if (this.runtimeContext != null) {
- this.runtimeContext
- }
- else {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- }
-
- override def getIterationRuntimeContext: IterationRuntimeContext = {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- else {
- this.runtimeContext match {
- case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
- case _ =>
- throw new IllegalStateException("This stub is not part of an iteration step function.")
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @throws[Exception]
- override def open(parameters: Configuration) {
- }
-
- @throws[Exception]
- override def close() {
- }
+ extends ProcessAllWindowFunction[IN, OUT, W] {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
index 320685a..d9cd275 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -18,11 +18,7 @@
package org.apache.flink.streaming.api.scala.function
-import java.beans.Transient
-
import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.Window
/**
@@ -35,53 +31,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window
* @tparam W The type of the window.
*/
@Public
+@deprecated("use [[ProcessWindowFunction]] instead")
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
- extends ProcessWindowFunction[IN, OUT, KEY, W]
- with RichFunction {
-
- @Transient
- private var runtimeContext: RuntimeContext = null
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- override def setRuntimeContext(t: RuntimeContext) {
- this.runtimeContext = t
- }
-
- override def getRuntimeContext: RuntimeContext = {
- if (this.runtimeContext != null) {
- this.runtimeContext
- }
- else {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- }
-
- override def getIterationRuntimeContext: IterationRuntimeContext = {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- else {
- this.runtimeContext match {
- case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
- case _ =>
- throw new IllegalStateException("This stub is not part of an iteration step function.")
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @throws[Exception]
- override def open(parameters: Configuration) {
- }
-
- @throws[Exception]
- override def close() {
- }
+ extends ProcessWindowFunction[IN, OUT, KEY, W] {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 263373e..bc4b7dd 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -21,13 +21,9 @@ package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
-import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction}
-import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction}
import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
-import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction}
-import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -43,7 +39,7 @@ import scala.collection.JavaConverters._
*/
final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
- extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
+ extends JProcessWindowFunction[IN, OUT, KEY, W] {
override def process(
key: KEY,
@@ -82,7 +78,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
- case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
+ case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
case _ =>
}
}
@@ -90,7 +86,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def open(parameters: Configuration): Unit = {
super.open(parameters)
func match {
- case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
+ case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
case _ =>
}
}
@@ -98,7 +94,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def close(): Unit = {
super.close()
func match {
- case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
+ case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
case _ =>
}
}
@@ -114,7 +110,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
*/
final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
- extends JRichProcessAllWindowFunction[IN, OUT, W] {
+ extends JProcessAllWindowFunction[IN, OUT, W] {
override def process(
context: JProcessAllWindowFunction[IN, OUT, W]#Context,
@@ -145,7 +141,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
- case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
+ case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
case _ =>
}
}
@@ -153,7 +149,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def open(parameters: Configuration): Unit = {
super.open(parameters)
func match {
- case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
+ case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
case _ =>
}
}
@@ -161,7 +157,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def close(): Unit = {
super.close()
func match {
- case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
+ case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
case _ =>
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
index df005fa..146452b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
- extends RichProcessAllWindowFunction[T, T, W] {
+ extends ProcessAllWindowFunction[T, T, W] {
override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = {
for (value <- input) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
index d62f2d3..2ec179a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
- extends RichProcessWindowFunction[T, T, K, W] {
+ extends ProcessWindowFunction[T, T, K, W] {
override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
for (value <- input) {