You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/09 20:56:21 UTC
[1/4] flink git commit: [FLINK-6459] Move origin header ConfigOption
to JMOptions
Repository: flink
Updated Branches:
refs/heads/release-1.3 2e044d468 -> bb03c07d9
[FLINK-6459] Move origin header ConfigOption to JMOptions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb03c07d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb03c07d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb03c07d
Branch: refs/heads/release-1.3
Commit: bb03c07d98988b31d1cece020a3b35abbdaa714f
Parents: e76a0aa
Author: zentol <ch...@apache.org>
Authored: Fri May 5 12:13:10 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 21:07:33 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/configuration/ConfigConstants.java | 6 ------
.../org/apache/flink/configuration/JobManagerOptions.java | 8 ++++++++
.../apache/flink/runtime/webmonitor/WebMonitorConfig.java | 3 ++-
3 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb03c07d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 975a3d4..61c1b27 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1333,12 +1333,6 @@ public final class ConfigConstants {
key("jobmanager.web.address")
.noDefaultValue();
- /** The config parameter defining the Access-Control-Allow-Origin header for all
- * responses from the web-frontend. */
- public static final ConfigOption<String> JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
- key("jobmanager.web.access-control-allow-origin")
- .defaultValue("*");
-
/** The config key for the port of the JobManager web frontend.
* Setting this value to {@code -1} disables the web frontend. */
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
http://git-wip-us.apache.org/repos/asf/flink/blob/bb03c07d/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 5481d7a..140ba2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -101,6 +101,14 @@ public class JobManagerOptions {
.defaultValue(8081);
/**
+ * The config parameter defining the Access-Control-Allow-Origin header for all
+ * responses from the web-frontend.
+ */
+ public static final ConfigOption<String> WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
+ key("jobmanager.web.access-control-allow-origin")
+ .defaultValue("*");
+
+ /**
* Config parameter to override SSL support for the JobManager Web UI
*/
public static final ConfigOption<Boolean> WEB_SSL_ENABLED =
http://git-wip-us.apache.org/repos/asf/flink/blob/bb03c07d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 11e94b0..d95d13a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
public class WebMonitorConfig {
@@ -80,6 +81,6 @@ public class WebMonitorConfig {
}
public String getAllowOrigin() {
- return config.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
+ return config.getString(JobManagerOptions.WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
}
}
[4/4] flink git commit: [FLINK-6396] Fix FsSavepointStreamFactoryTest
on Windows
Posted by ch...@apache.org.
[FLINK-6396] Fix FsSavepointStreamFactoryTest on Windows
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35b74f2b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35b74f2b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35b74f2b
Branch: refs/heads/release-1.3
Commit: 35b74f2bc2247fe00f894167ce49ab063f2a7e8b
Parents: 2e044d4
Author: zentol <ch...@apache.org>
Authored: Thu Apr 27 15:56:48 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 21:07:33 2017 +0200
----------------------------------------------------------------------
.../state/filesystem/FsSavepointStreamFactoryTest.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35b74f2b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
index a29d29c..3095a09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
import java.io.File;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import org.junit.Rule;
@@ -48,7 +49,8 @@ public class FsSavepointStreamFactoryTest {
jobId,
0);
- File[] listed = testRoot.listFiles();
+ Path root = new Path(testRoot.getAbsolutePath());
+ FileStatus[] listed = root.getFileSystem().listStatus(root);
assertNotNull(listed);
assertEquals(0, listed.length);
@@ -59,9 +61,9 @@ public class FsSavepointStreamFactoryTest {
FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
- listed = testRoot.listFiles();
+ listed = root.getFileSystem().listStatus(root);
assertNotNull(listed);
assertEquals(1, listed.length);
- assertEquals(handle.getFilePath().getPath(), listed[0].getPath());
+ assertEquals(handle.getFilePath().getPath(), listed[0].getPath().getPath());
}
}
[2/4] flink git commit: [FLINK-5720] Deprecate DataStream#fold()
Posted by ch...@apache.org.
[FLINK-5720] Deprecate DataStream#fold()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5adf113
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5adf113
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5adf113
Branch: refs/heads/release-1.3
Commit: e5adf11342337994ea9da3f50ce7b587086bf820
Parents: 35b74f2
Author: zentol <ch...@apache.org>
Authored: Wed May 3 15:49:03 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 21:07:33 2017 +0200
----------------------------------------------------------------------
.../streaming/state/RocksDBFoldingState.java | 3 +++
.../flink/api/common/functions/FoldFunction.java | 3 +++
.../api/common/functions/RichFoldFunction.java | 3 +++
.../api/common/functions/RuntimeContext.java | 3 +++
.../flink/api/common/state/FoldingState.java | 3 +++
.../api/common/state/FoldingStateDescriptor.java | 3 +++
.../flink/api/common/state/KeyedStateStore.java | 5 ++++-
.../flink/api/common/state/StateBinder.java | 3 +++
.../flink/api/java/typeutils/TypeExtractor.java | 8 ++++++++
.../runtime/state/AbstractKeyedStateBackend.java | 5 ++++-
.../runtime/state/heap/HeapFoldingState.java | 7 +++++--
.../state/internal/InternalFoldingState.java | 3 +++
.../api/datastream/AllWindowedStream.java | 18 ++++++++++++++++++
.../streaming/api/datastream/KeyedStream.java | 6 ++++++
.../streaming/api/datastream/WindowedStream.java | 18 ++++++++++++++++++
.../windowing/FoldApplyAllWindowFunction.java | 3 +++
.../FoldApplyProcessAllWindowFunction.java | 3 +++
.../windowing/FoldApplyProcessWindowFunction.java | 3 +++
.../windowing/FoldApplyWindowFunction.java | 3 +++
.../api/operators/StreamGroupedFold.java | 3 +++
.../streaming/api/scala/AllWindowedStream.scala | 10 ++++++++--
.../flink/streaming/api/scala/KeyedStream.scala | 3 +++
.../streaming/api/scala/WindowedStream.scala | 8 +++++++-
23 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 26dc3dd..d5d9fce 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -39,7 +39,10 @@ import java.io.IOException;
* @param <N> The type of the namespace.
* @param <T> The type of the values that can be folded into the state.
* @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
*/
+@Deprecated
public class RocksDBFoldingState<K, N, T, ACC>
extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
implements InternalFoldingState<N, T, ACC> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index b52828e..b3fd700 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -38,8 +38,11 @@ import java.io.Serializable;
*
* @param <T> Type of the initial input and the returned element
* @param <O> Type of the elements that the group/list/stream contains
+ *
+ * @deprecated use {@link AggregateFunction} instead
*/
@Public
+@Deprecated
public interface FoldFunction<O, T> extends Function, Serializable {
/**
* The core method of FoldFunction, combining two values into one value of the same type.
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
index 245550d..516e1b4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -28,8 +28,11 @@ import org.apache.flink.annotation.Public;
*
* @param <T> Type of the initial input and the returned element
* @param <O> Type of the elements that the group/list/stream contains
+ *
+ *@deprecated use {@link RichAggregateFunction} instead
*/
@Public
+@Deprecated
public abstract class RichFoldFunction<O, T> extends AbstractRichFunction implements FoldFunction<O, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 2978f3a..38155f6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -394,8 +394,11 @@ public interface RuntimeContext {
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
+ *
+ * @deprecated will be removed in a future version
*/
@PublicEvolving
+ @Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index 684a612..7e45399 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -35,6 +35,9 @@ import org.apache.flink.annotation.PublicEvolving;
*
* @param <T> Type of the values folded into the state
* @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
*/
@PublicEvolving
+@Deprecated
public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 73bfaa8..f7609c3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -32,8 +32,11 @@ import static java.util.Objects.requireNonNull;
*
* @param <T> Type of the values folded int othe state
* @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor}
*/
@PublicEvolving
+@Deprecated
public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState<T, ACC>, ACC> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 2187f6c..ea9ac41 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -193,8 +193,11 @@ public interface KeyedStateStore {
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
+ *
+ * @deprecated will be removed in a future version
*/
@PublicEvolving
+ @Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
/**
@@ -236,4 +239,4 @@ public interface KeyedStateStore {
*/
@PublicEvolving
<UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
index 9df7a47..a373923 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -68,7 +68,10 @@ public interface StateBinder {
*
* @param <T> Type of the values folded into the state
* @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
*/
+ @Deprecated
<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a5f236f..f1bf957 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -177,13 +177,21 @@ public class TypeExtractor {
return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
}
+ /**
+ * @deprecated will be removed in a future version
+ */
@PublicEvolving
+ @Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
{
return getFoldReturnTypes(foldInterface, inType, null, false);
}
+ /**
+ * @deprecated will be removed in a future version
+ */
@PublicEvolving
+ @Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 47ebe3b..2b225df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -195,8 +195,11 @@ public abstract class AbstractKeyedStateBackend<K>
*
* @param <N> The type of the namespace.
* @param <T> Type of the values folded into the state
- * @param <ACC> Type of the value in the state *
+ * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
*/
+ @Deprecated
protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index dad6d0d..3a77cca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -36,7 +36,10 @@ import java.io.IOException;
* @param <N> The type of the namespace.
* @param <T> The type of the values that can be folded into the state.
* @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
*/
+@Deprecated
public class HeapFoldingState<K, N, T, ACC>
extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
implements InternalFoldingState<N, T, ACC> {
@@ -84,7 +87,7 @@ public class HeapFoldingState<K, N, T, ACC>
}
}
- static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
+ private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
private final FoldingStateDescriptor<T, ACC> stateDescriptor;
private final FoldFunction<T, ACC> foldFunction;
@@ -99,4 +102,4 @@ public class HeapFoldingState<K, N, T, ACC>
return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
index eb58ce5..4ef258f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -28,5 +28,8 @@ import org.apache.flink.api.common.state.FoldingState;
* @param <N> The type of the namespace
* @param <T> Type of the values folded into the state
* @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
*/
+@Deprecated
public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0d953a9..7ea65fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -754,7 +754,10 @@ public class AllWindowedStream<T, W extends Window> {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction)} instead
*/
+ @Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
@@ -774,7 +777,10 @@ public class AllWindowedStream<T, W extends Window> {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
*/
+ @Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
@@ -795,8 +801,11 @@ public class AllWindowedStream<T, W extends Window> {
* @param foldFunction The fold function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
*/
@PublicEvolving
+ @Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -821,8 +830,11 @@ public class AllWindowedStream<T, W extends Window> {
* @param foldAccumulatorType Type information for the result type of the fold function
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
*/
@PublicEvolving
+ @Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
FoldFunction<T, ACC> foldFunction,
AllWindowFunction<ACC, R, W> function,
@@ -901,8 +913,11 @@ public class AllWindowedStream<T, W extends Window> {
* @param foldFunction The fold function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
*/
@PublicEvolving
+ @Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -927,8 +942,11 @@ public class AllWindowedStream<T, W extends Window> {
* @param foldAccumulatorType Type information for the result type of the fold function
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
*/
@PublicEvolving
+ @Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
FoldFunction<T, ACC> foldFunction,
ProcessAllWindowFunction<ACC, R, W> function,
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 9334c66..e3171c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -416,7 +416,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @param initialValue
* The initialValue passed to the folders for each key.
* @return The transformed DataStream.
+ *
+ * @deprecated will be removed in a future version
*/
+ @Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
@@ -748,8 +751,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
+ *
+ * @deprecated will be removed in a future version
*/
@PublicEvolving
+ @Deprecated
public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(
String queryableStateName,
FoldingStateDescriptor<T, ACC> stateDescriptor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2d7dafe..7913e95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -487,7 +487,10 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregationFunction)} instead
*/
+ @Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
@@ -507,7 +510,10 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
*/
+ @Deprecated
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
@@ -528,8 +534,11 @@ public class WindowedStream<T, K, W extends Window> {
* @param foldFunction The fold function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead
*/
@PublicEvolving
+ @Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -554,8 +563,11 @@ public class WindowedStream<T, K, W extends Window> {
* @param foldAccumulatorType Type information for the result type of the fold function
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, ProcessWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
*/
@PublicEvolving
+ @Deprecated
public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
FoldFunction<T, ACC> foldFunction,
WindowFunction<ACC, R, K, W> function,
@@ -638,8 +650,11 @@ public class WindowedStream<T, K, W extends Window> {
* @param foldFunction The fold function that is used for incremental aggregation.
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead
*/
@PublicEvolving
+ @Deprecated
public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
if (foldFunction instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
@@ -667,7 +682,10 @@ public class WindowedStream<T, K, W extends Window> {
* @param windowFunction The process window function.
* @param windowResultType The process window function result type.
* @return The data stream that is the result of applying the fold function to the window.
+ *
+ * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
*/
+ @Deprecated
@Internal
public <R, ACC> SingleOutputStreamOperator<R> fold(
ACC initialValue,
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
index 30662f0..2069f7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -37,8 +37,11 @@ import org.apache.flink.util.Collector;
/**
* Internal {@link AllWindowFunction} that is used for implementing a fold on a window configuration
* that only allows {@link AllWindowFunction} and cannot directly execute a {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
*/
@Internal
+@Deprecated
public class FoldApplyAllWindowFunction<W extends Window, T, ACC, R>
extends WrappingFunction<AllWindowFunction<ACC, R, W>>
implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 38244dd..8982c71 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -39,8 +39,11 @@ import org.apache.flink.util.Collector;
* Internal {@link ProcessAllWindowFunction} that is used for implementing a fold on a window
* configuration that only allows {@link ProcessAllWindowFunction} and cannot directly execute a
* {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
*/
@Internal
+@Deprecated
public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
extends RichProcessAllWindowFunction<T, R, W>
implements OutputTypeConfigurable<R> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 1b2c2e2..0e0356a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -39,8 +39,11 @@ import org.apache.flink.util.Collector;
* Internal {@link ProcessWindowFunction} that is used for implementing a fold on a window
* configuration that only allows {@link ProcessWindowFunction} and cannot directly execute a
* {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
*/
@Internal
+@Deprecated
public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
extends RichProcessWindowFunction<T, R, K, W>
implements OutputTypeConfigurable<R> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
index 770deb0..865dbc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -37,8 +37,11 @@ import org.apache.flink.util.Collector;
/**
* Internal {@link WindowFunction} that is used for implementing a fold on a window configuration
* that only allows {@link WindowFunction} and cannot directly execute a {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
*/
@Internal
+@Deprecated
public class FoldApplyWindowFunction<K, W extends Window, T, ACC, R>
extends WrappingFunction<WindowFunction<ACC, R, K, W>>
implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 1ed7178..07c5c90 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -35,8 +35,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* A {@link StreamOperator} for executing a {@link FoldFunction} on a
* {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
+ *
+ * @deprecated will be removed in a future version
*/
@Internal
+@Deprecated
public class StreamGroupedFold<IN, OUT, KEY>
extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 757e45f..bbdcf4a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -401,7 +401,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
- */
+ */
+ @deprecated("use [[aggregate()]] instead")
def fold[R: TypeInformation](
initialValue: R,
function: FoldFunction[T,R]): DataStream[R] = {
@@ -421,7 +422,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
- */
+ */
+ @deprecated("use [[aggregate()]] instead")
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
@@ -444,6 +446,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
def fold[ACC: TypeInformation, R: TypeInformation](
initialValue: ACC,
preAggregator: FoldFunction[T, ACC],
@@ -474,6 +477,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param windowFunction The process window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
@PublicEvolving
def fold[ACC: TypeInformation, R: TypeInformation](
initialValue: ACC,
@@ -505,6 +509,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
def fold[ACC: TypeInformation, R: TypeInformation](
initialValue: ACC,
preAggregator: (ACC, T) => ACC,
@@ -540,6 +545,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
@PublicEvolving
def fold[ACC: TypeInformation, R: TypeInformation](
initialValue: ACC,
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index d5ef89f..aaeb1ec 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -184,6 +184,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* using an associative fold function and an initial value. An independent
* aggregate is kept per key.
*/
+ @deprecated("will be removed in a future version")
def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]):
DataStream[R] = {
if (folder == null) {
@@ -201,6 +202,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* using an associative fold function and an initial value. An independent
* aggregate is kept per key.
*/
+ @deprecated("will be removed in a future version")
def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Fold function must not be null.")
@@ -507,6 +509,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* @return Queryable state instance
*/
@PublicEvolving
+ @deprecated("will be removed in a future version")
def asQueryableState[ACC](
queryableStateName: String,
stateDescriptor: FoldingStateDescriptor[T, ACC]) : QueryableStateStream[K, ACC] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/e5adf113/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 4e0e1a4..0f8a6e0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -382,6 +382,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
def fold[R: TypeInformation](
initialValue: R,
function: FoldFunction[T,R]): DataStream[R] = {
@@ -401,7 +402,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
*
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
- */
+ */
+ @deprecated("use [[aggregate()]] instead")
def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
@@ -423,6 +425,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
def fold[ACC: TypeInformation, R: TypeInformation](
initialValue: ACC,
foldFunction: FoldFunction[T, ACC],
@@ -452,6 +455,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
def fold[ACC: TypeInformation, R: TypeInformation](
initialValue: ACC,
foldFunction: (ACC, T) => ACC,
@@ -486,6 +490,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The process window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
@PublicEvolving
def fold[R: TypeInformation, ACC: TypeInformation](
initialValue: ACC,
@@ -516,6 +521,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The process window function.
* @return The data stream that is the result of applying the window function to the window.
*/
+ @deprecated("use [[aggregate()]] instead")
@PublicEvolving
def fold[R: TypeInformation, ACC: TypeInformation](
initialValue: ACC,
[3/4] flink git commit: [FLINK-6164] Make ProcessWindowFunction a
RichFunction
Posted by ch...@apache.org.
[FLINK-6164] Make ProcessWindowFunction a RichFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e76a0aa9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e76a0aa9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e76a0aa9
Branch: refs/heads/release-1.3
Commit: e76a0aa9e2324c4647509379fe4125d8dc576ff0
Parents: e5adf11
Author: zentol <ch...@apache.org>
Authored: Wed May 3 15:57:05 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 21:07:33 2017 +0200
----------------------------------------------------------------------
.../FoldApplyProcessAllWindowFunction.java | 2 +-
.../FoldApplyProcessWindowFunction.java | 2 +-
.../windowing/ProcessAllWindowFunction.java | 4 +-
.../windowing/ProcessWindowFunction.java | 4 +-
.../ReduceApplyProcessAllWindowFunction.java | 3 +-
.../ReduceApplyProcessWindowFunction.java | 2 +-
.../windowing/RichProcessAllWindowFunction.java | 53 ++------------------
.../windowing/RichProcessWindowFunction.java | 53 ++------------------
.../functions/InternalWindowFunctionTest.java | 11 ++--
...AlignedProcessingTimeWindowOperatorTest.java | 4 +-
.../function/ProcessAllWindowFunction.scala | 7 ++-
.../scala/function/ProcessWindowFunction.scala | 6 ++-
.../function/RichProcessAllWindowFunction.scala | 53 +-------------------
.../function/RichProcessWindowFunction.scala | 53 +-------------------
.../ScalaProcessWindowFunctionWrapper.scala | 20 +++-----
...ngIdentityRichProcessAllWindowFunction.scala | 4 +-
...ckingIdentityRichProcessWindowFunction.scala | 4 +-
17 files changed, 45 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 8982c71..1d39252 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.Collector;
@Internal
@Deprecated
public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
- extends RichProcessAllWindowFunction<T, R, W>
+ extends ProcessAllWindowFunction<T, R, W>
implements OutputTypeConfigurable<R> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 0e0356a..fa4fe86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.Collector;
@Internal
@Deprecated
public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
- extends RichProcessWindowFunction<T, R, K, W>
+ extends ProcessWindowFunction<T, R, K, W>
implements OutputTypeConfigurable<R> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 4d247a7..34a37bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@PublicEvolving
-public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function {
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 2c80e9e..506b610 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@PublicEvolving
-public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index d1f9ccd..e7e6609 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector;
* {@link ReduceFunction}.
*/
@Internal
-public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
- extends RichProcessAllWindowFunction<T, R, W> {
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> extends ProcessAllWindowFunction<T, R, W> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 836726d..18037b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
*/
@Internal
public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
- extends RichProcessWindowFunction<T, R, K, W> {
+ extends ProcessWindowFunction<T, R, K, W> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
index 1130fa5..a800870 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -19,10 +19,6 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
@@ -32,53 +28,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <W> The type of {@code Window} that this window function can be applied on.
+ *
+ * @deprecated use {@link ProcessAllWindowFunction} instead
*/
@PublicEvolving
-public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
- extends ProcessAllWindowFunction<IN, OUT, W>
- implements RichFunction {
+@Deprecated
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> extends ProcessAllWindowFunction<IN, OUT, W> {
private static final long serialVersionUID = 1L;
-
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- private transient RuntimeContext runtimeContext;
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- this.runtimeContext = t;
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- if (this.runtimeContext != null) {
- return this.runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
-
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.");
- } else if (this.runtimeContext instanceof IterationRuntimeContext) {
- return (IterationRuntimeContext) this.runtimeContext;
- } else {
- throw new IllegalStateException("This stub is not part of an iteration step function.");
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {}
-
- @Override
- public void close() throws Exception {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
index ac55bc6..83da065 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -19,10 +19,6 @@
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
@@ -33,53 +29,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
+ *
+ * @deprecated use {@link ProcessWindowFunction} instead
*/
@PublicEvolving
-public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
- extends ProcessWindowFunction<IN, OUT, KEY, W>
- implements RichFunction {
+@Deprecated
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window> extends ProcessWindowFunction<IN, OUT, KEY, W> {
private static final long serialVersionUID = 1L;
-
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- private transient RuntimeContext runtimeContext;
-
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- this.runtimeContext = t;
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- if (this.runtimeContext != null) {
- return this.runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
-
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.");
- } else if (this.runtimeContext instanceof IterationRuntimeContext) {
- return (IterationRuntimeContext) this.runtimeContext;
- } else {
- throw new IllegalStateException("This stub is not part of an iteration step function.");
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {}
-
- @Override
- public void close() throws Exception {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 4b8057f..4b0f5ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -24,11 +24,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -612,7 +611,7 @@ public class InternalWindowFunctionTest {
}
public static class ProcessWindowFunctionMock
- extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
+ extends ProcessWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@@ -626,7 +625,7 @@ public class InternalWindowFunctionTest {
}
public static class AggregateProcessWindowFunctionMock
- extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
+ extends ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@@ -640,7 +639,7 @@ public class InternalWindowFunctionTest {
}
public static class AggregateProcessAllWindowFunctionMock
- extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
+ extends ProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
@@ -679,7 +678,7 @@ public class InternalWindowFunctionTest {
}
public static class ProcessAllWindowFunctionMock
- extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+ extends ProcessAllWindowFunction<Long, String, TimeWindow>
implements OutputTypeConfigurable<String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index a8d3154..2f7e302 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,7 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -1038,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
+ private static class StatefulFunction extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
// we use a concurrent map here even though there is no concurrency, to
// get "volatile" style access to entries
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 2f0e48e..49911e4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -18,10 +18,8 @@
package org.apache.flink.streaming.api.scala.function
-import java.io.Serializable
-
import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -35,7 +33,8 @@ import org.apache.flink.util.Collector
* @tparam W The type of the window.
*/
@PublicEvolving
-abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
+ extends AbstractRichFunction {
/**
* Evaluates the window and outputs none or several elements.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index bc79a26..d2075db 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala.function
import java.io.Serializable
import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -36,7 +36,9 @@ import org.apache.flink.util.Collector
* @tparam W The type of the window.
*/
@PublicEvolving
-abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
+ extends AbstractRichFunction {
+
/**
* Evaluates the window and outputs none or several elements.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
index 22d64a8..6edc1e6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -18,11 +18,7 @@
package org.apache.flink.streaming.api.scala.function
-import java.beans.Transient
-
import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.Window
/**
@@ -34,53 +30,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window
* @tparam W The type of the window.
*/
@Public
+@deprecated("use [[ProcessAllWindowFunction]] instead")
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
- extends ProcessAllWindowFunction[IN, OUT, W]
- with RichFunction {
-
- @Transient
- private var runtimeContext: RuntimeContext = null
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- override def setRuntimeContext(t: RuntimeContext) {
- this.runtimeContext = t
- }
-
- override def getRuntimeContext: RuntimeContext = {
- if (this.runtimeContext != null) {
- this.runtimeContext
- }
- else {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- }
-
- override def getIterationRuntimeContext: IterationRuntimeContext = {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- else {
- this.runtimeContext match {
- case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
- case _ =>
- throw new IllegalStateException("This stub is not part of an iteration step function.")
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @throws[Exception]
- override def open(parameters: Configuration) {
- }
-
- @throws[Exception]
- override def close() {
- }
+ extends ProcessAllWindowFunction[IN, OUT, W] {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
index 320685a..d9cd275 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -18,11 +18,7 @@
package org.apache.flink.streaming.api.scala.function
-import java.beans.Transient
-
import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.Window
/**
@@ -35,53 +31,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window
* @tparam W The type of the window.
*/
@Public
+@deprecated("use [[ProcessWindowFunction]] instead")
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
- extends ProcessWindowFunction[IN, OUT, KEY, W]
- with RichFunction {
-
- @Transient
- private var runtimeContext: RuntimeContext = null
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- override def setRuntimeContext(t: RuntimeContext) {
- this.runtimeContext = t
- }
-
- override def getRuntimeContext: RuntimeContext = {
- if (this.runtimeContext != null) {
- this.runtimeContext
- }
- else {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- }
-
- override def getIterationRuntimeContext: IterationRuntimeContext = {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.")
- }
- else {
- this.runtimeContext match {
- case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
- case _ =>
- throw new IllegalStateException("This stub is not part of an iteration step function.")
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @throws[Exception]
- override def open(parameters: Configuration) {
- }
-
- @throws[Exception]
- override def close() {
- }
+ extends ProcessWindowFunction[IN, OUT, KEY, W] {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 263373e..bc4b7dd 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -21,13 +21,9 @@ package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
-import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction}
-import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction}
import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
-import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction}
-import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -43,7 +39,7 @@ import scala.collection.JavaConverters._
*/
final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
- extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
+ extends JProcessWindowFunction[IN, OUT, KEY, W] {
override def process(
key: KEY,
@@ -82,7 +78,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
- case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
+ case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
case _ =>
}
}
@@ -90,7 +86,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def open(parameters: Configuration): Unit = {
super.open(parameters)
func match {
- case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
+ case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
case _ =>
}
}
@@ -98,7 +94,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def close(): Unit = {
super.close()
func match {
- case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
+ case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
case _ =>
}
}
@@ -114,7 +110,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
*/
final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
- extends JRichProcessAllWindowFunction[IN, OUT, W] {
+ extends JProcessAllWindowFunction[IN, OUT, W] {
override def process(
context: JProcessAllWindowFunction[IN, OUT, W]#Context,
@@ -145,7 +141,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
- case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
+ case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
case _ =>
}
}
@@ -153,7 +149,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def open(parameters: Configuration): Unit = {
super.open(parameters)
func match {
- case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
+ case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
case _ =>
}
}
@@ -161,7 +157,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def close(): Unit = {
super.close()
func match {
- case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
+ case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
case _ =>
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
index df005fa..146452b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
- extends RichProcessAllWindowFunction[T, T, W] {
+ extends ProcessAllWindowFunction[T, T, W] {
override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = {
for (value <- input) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
index d62f2d3..2ec179a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
- extends RichProcessWindowFunction[T, T, K, W] {
+ extends ProcessWindowFunction[T, T, K, W] {
override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
for (value <- input) {