You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 16:59:40 UTC
flink git commit: [FLINK-5181] Add Tests in StateBackendTestBase that
verify Default-Value Behaviour
Repository: flink
Updated Branches:
refs/heads/master fe6b83585 -> 60a4ab32e
[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value Behaviour
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60a4ab32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60a4ab32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60a4ab32
Branch: refs/heads/master
Commit: 60a4ab32e1662310da4633a97e02dca62431952e
Parents: fe6b835
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 23 12:13:05 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 17:59:02 2016 +0100
----------------------------------------------------------------------
.../runtime/state/StateBackendTestBase.java | 143 ++++++++++++++++++-
1 file changed, 137 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/60a4ab32/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 9e835ce..0a3a092 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -56,12 +56,8 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RunnableFuture;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -659,6 +655,141 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
/**
+ * Verify that {@link ValueStateDescriptor} allows {@code null} as default.
+ */
+ @Test
+ public void testValueStateNullAsDefaultValue() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ assertEquals(null, state.value());
+
+ state.update("Ciao");
+ assertEquals("Ciao", state.value());
+
+ state.clear();
+ assertEquals(null, state.value());
+
+ backend.dispose();
+ }
+
+
+ /**
+ * Verify that an empty {@code ValueState} will yield the default value.
+ */
+ @Test
+ public void testValueStateDefaultValue() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello");
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ assertEquals("Hello", state.value());
+
+ state.update("Ciao");
+ assertEquals("Ciao", state.value());
+
+ state.clear();
+ assertEquals("Hello", state.value());
+
+ backend.dispose();
+ }
+
+ /**
+ * Verify that an empty {@code ReduceState} yields {@code null}.
+ */
+ @Test
+ public void testReducingStateDefaultValue() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ReducingState<String> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ assertNull(state.get());
+
+ state.add("Ciao");
+ assertEquals("Ciao", state.get());
+
+ state.clear();
+ assertNull(state.get());
+
+ backend.dispose();
+ }
+
+ /**
+ * Verify that an empty {@code FoldingState} yields {@code null}.
+ */
+ @Test
+ public void testFoldingStateDefaultValue() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ FoldingStateDescriptor<Integer, String> kvId =
+ new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class);
+
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ FoldingState<Integer, String> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ assertNull(state.get());
+
+ state.add(1);
+ state.add(2);
+ assertEquals("Fold-Initial:,1,2", state.get());
+
+ state.clear();
+ assertNull(state.get());
+
+ backend.dispose();
+ }
+
+
+ /**
+ * Verify that an empty {@code ListState} yields {@code null}.
+ */
+ @Test
+ public void testListStateDefaultValue() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ListState<String> state = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ assertNull(state.get());
+
+ state.add("Ciao");
+ state.add("Bello");
+ assertThat(state.get(), containsInAnyOrder("Ciao", "Bello"));
+
+ state.clear();
+ assertNull(state.get());
+
+ backend.dispose();
+ }
+
+
+
+
+ /**
* This test verifies that state is correctly assigned to key groups and that restore
* restores the relevant key groups in the backend.
*