You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:27 UTC
[10/10] flink git commit: [FLINK-9538] Make KeyedStateFunction an
interface
[FLINK-9538] Make KeyedStateFunction an interface
This closes #6134
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/975f9b1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/975f9b1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/975f9b1b
Branch: refs/heads/master
Commit: 975f9b1b8c4ebb4c96eebc57aafefa8d10c8a689
Parents: 05ee3ce
Author: yanghua <ya...@gmail.com>
Authored: Thu Jun 7 15:27:25 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jun 13 15:02:10 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/state/KeyedStateFunction.java | 5 ++--
.../runtime/state/StateBackendTestBase.java | 27 ++++++++++++++++++++
2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/975f9b1b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
index b125de9..8bb352a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
@@ -26,7 +26,8 @@ import org.apache.flink.api.common.state.State;
* @param <K> The type of key.
* @param <S> The type of state.
*/
-public abstract class KeyedStateFunction<K, S extends State> {
+@FunctionalInterface
+public interface KeyedStateFunction<K, S extends State> {
/**
* The actual method to be applied on each of the states.
@@ -34,5 +35,5 @@ public abstract class KeyedStateFunction<K, S extends State> {
* @param key the key whose state is being processed.
* @param state the state associated with the aforementioned key.
*/
- public abstract void process(K key, S state) throws Exception;
+ void process(K key, S state) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/975f9b1b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b809d84..845b751 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3748,6 +3748,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testApplyToAllKeysLambdaFunction() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ try {
+ ListStateDescriptor<String> listStateDescriptor =
+ new ListStateDescriptor<>("foo", StringSerializer.INSTANCE);
+
+ ListState<String> listState =
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
+
+ for (int i = 0; i < 100; ++i) {
+ backend.setCurrentKey(i);
+ listState.add("Hello" + i);
+ }
+
+ // valid state value via applyToAllKeys().
+ backend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor,
+ (Integer key, ListState<String> state) -> assertEquals("Hello" + key, state.get().iterator().next())
+ );
+ }
+ finally {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+
+ @Test
public void testAsyncSnapshotCancellation() throws Exception {
OneShotLatch blocker = new OneShotLatch();
OneShotLatch waiter = new OneShotLatch();