You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:27 UTC

[10/10] flink git commit: [FLINK-9538] Make KeyedStateFunction an interface

[FLINK-9538] Make KeyedStateFunction an interface

This closes #6134


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/975f9b1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/975f9b1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/975f9b1b

Branch: refs/heads/master
Commit: 975f9b1b8c4ebb4c96eebc57aafefa8d10c8a689
Parents: 05ee3ce
Author: yanghua <ya...@gmail.com>
Authored: Thu Jun 7 15:27:25 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jun 13 15:02:10 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/state/KeyedStateFunction.java |  5 ++--
 .../runtime/state/StateBackendTestBase.java     | 27 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/975f9b1b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
index b125de9..8bb352a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
@@ -26,7 +26,8 @@ import org.apache.flink.api.common.state.State;
  * @param <K> The type of key.
  * @param <S> The type of state.
  */
-public abstract class KeyedStateFunction<K, S extends State> {
+@FunctionalInterface
+public interface KeyedStateFunction<K, S extends State> {
 
 	/**
 	 * The actual method to be applied on each of the states.
@@ -34,5 +35,5 @@ public abstract class KeyedStateFunction<K, S extends State> {
 	 * @param key the key whose state is being processed.
 	 * @param state the state associated with the aforementioned key.
 	 */
-	public abstract void process(K key, S state) throws Exception;
+	void process(K key, S state) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/975f9b1b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b809d84..845b751 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3748,6 +3748,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testApplyToAllKeysLambdaFunction() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			ListStateDescriptor<String> listStateDescriptor =
+				new ListStateDescriptor<>("foo", StringSerializer.INSTANCE);
+
+			ListState<String> listState =
+				backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
+
+			for (int i = 0; i < 100; ++i) {
+				backend.setCurrentKey(i);
+				listState.add("Hello" + i);
+			}
+
+			// valid state value via applyToAllKeys().
+			backend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor,
+				(Integer key, ListState<String> state) -> assertEquals("Hello" + key, state.get().iterator().next())
+			);
+		}
+		finally {
+			IOUtils.closeQuietly(backend);
+			backend.dispose();
+		}
+	}
+
+	@Test
 	public void testAsyncSnapshotCancellation() throws Exception {
 		OneShotLatch blocker = new OneShotLatch();
 		OneShotLatch waiter = new OneShotLatch();