You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 16:59:40 UTC

flink git commit: [FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value Behaviour

Repository: flink
Updated Branches:
  refs/heads/master fe6b83585 -> 60a4ab32e


[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value Behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60a4ab32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60a4ab32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60a4ab32

Branch: refs/heads/master
Commit: 60a4ab32e1662310da4633a97e02dca62431952e
Parents: fe6b835
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 23 12:13:05 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 17:59:02 2016 +0100

----------------------------------------------------------------------
 .../runtime/state/StateBackendTestBase.java     | 143 ++++++++++++++++++-
 1 file changed, 137 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60a4ab32/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 9e835ce..0a3a092 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -56,12 +56,8 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RunnableFuture;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -659,6 +655,141 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	}
 
 	/**
+	 * Verify that {@link ValueStateDescriptor} allows {@code null} as default.
+	 */
+	@Test
+	public void testValueStateNullAsDefaultValue() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		assertEquals(null, state.value());
+
+		state.update("Ciao");
+		assertEquals("Ciao", state.value());
+
+		state.clear();
+		assertEquals(null, state.value());
+
+		backend.dispose();
+	}
+
+
+	/**
+	 * Verify that an empty {@code ValueState} will yield the default value.
+	 */
+	@Test
+	public void testValueStateDefaultValue() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello");
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		assertEquals("Hello", state.value());
+
+		state.update("Ciao");
+		assertEquals("Ciao", state.value());
+
+		state.clear();
+		assertEquals("Hello", state.value());
+
+		backend.dispose();
+	}
+
+	/**
+	 * Verify that an empty {@code ReduceState} yields {@code null}.
+	 */
+	@Test
+	public void testReducingStateDefaultValue() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		ReducingState<String> state = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		assertNull(state.get());
+
+		state.add("Ciao");
+		assertEquals("Ciao", state.get());
+
+		state.clear();
+		assertNull(state.get());
+
+		backend.dispose();
+	}
+
+	/**
+	 * Verify that an empty {@code FoldingState} yields {@code null}.
+	 */
+	@Test
+	public void testFoldingStateDefaultValue() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		FoldingStateDescriptor<Integer, String> kvId =
+				new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class);
+
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		FoldingState<Integer, String> state = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		assertNull(state.get());
+
+		state.add(1);
+		state.add(2);
+		assertEquals("Fold-Initial:,1,2", state.get());
+
+		state.clear();
+		assertNull(state.get());
+
+		backend.dispose();
+	}
+
+
+	/**
+	 * Verify that an empty {@code ListState} yields {@code null}.
+	 */
+	@Test
+	public void testListStateDefaultValue() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		ListState<String> state = backend.getPartitionedState(
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		assertNull(state.get());
+
+		state.add("Ciao");
+		state.add("Bello");
+		assertThat(state.get(), containsInAnyOrder("Ciao", "Bello"));
+
+		state.clear();
+		assertNull(state.get());
+
+		backend.dispose();
+	}
+
+
+
+
+	/**
 	 * This test verifies that state is correctly assigned to key groups and that restore
 	 * restores the relevant key groups in the backend.
 	 *