You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:25 UTC
[14/50] [abbrv] flink git commit: [FLINK-6018] Add tests for
KryoSerializer restore with registered types
http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index faa9314..22bb715 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.state;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Joiner;
import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -34,11 +36,15 @@ import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -61,9 +67,11 @@ import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -90,6 +98,8 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import org.junit.rules.ExpectedException;
+
/**
* Generic tests for the partitioned state part of {@link AbstractStateBackend}.
@@ -97,6 +107,9 @@ import static org.mockito.Mockito.verify;
@SuppressWarnings("serial")
public abstract class StateBackendTestBase<B extends AbstractStateBackend> extends TestLogger {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
protected abstract B getStateBackend() throws Exception;
protected CheckpointStreamFactory createStreamFactory() throws Exception {
@@ -171,21 +184,478 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@Test
@SuppressWarnings("unchecked")
+ public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ // cast because our test serializer is not typed to TestPojo
+ env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // we will be expecting ExpectedKryoTestException to be thrown,
+ // because the ExceptionThrowingTestSerializer should be used
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ // backends that eagerly serializes (such as RocksDB) will fail here
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ // backends that lazily serializes (such as memory state backend) will fail here
+ runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ // cast because our test serializer is not typed to TestPojo
+ env.getExecutionConfig()
+ .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ pojoType.createSerializer(env.getExecutionConfig());
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+ assertTrue(state instanceof InternalValueState);
+ ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // we will be expecting ExpectedKryoTestException to be thrown,
+ // because the ExceptionThrowingTestSerializer should be used
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ // backends that eagerly serializes (such as RocksDB) will fail here
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ // backends that lazily serializes (such as memory state backend) will fail here
+ runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+ }
+
+ @Test
+ public void testBackendUsesRegisteredKryoSerializer() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ env.getExecutionConfig()
+ .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // we will be expecting ExpectedKryoTestException to be thrown,
+ // because the ExceptionThrowingTestSerializer should be used
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ // backends that eagerly serializes (such as RocksDB) will fail here
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ // backends that lazily serializes (such as memory state backend) will fail here
+ runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+ assertTrue(state instanceof InternalValueState);
+ ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // we will be expecting ExpectedKryoTestException to be thrown,
+ // because the ExceptionThrowingTestSerializer should be used
+ int numExceptions = 0;
+
+ backend.setCurrentKey(1);
+
+ try {
+ // backends that eagerly serializes (such as RocksDB) will fail here
+ state.update(new TestPojo("u1", 1));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ try {
+ // backends that lazily serializes (such as memory state backend) will fail here
+ runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ } catch (ExpectedKryoTestException e) {
+ numExceptions++;
+ } catch (Exception e) {
+ if (e.getCause() instanceof ExpectedKryoTestException) {
+ numExceptions++;
+ } else {
+ throw e;
+ }
+ }
+
+ assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+ }
+
+
+ /**
+ * Verify state restore resilience when:
+ * - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+ * - restored with the state type registered (no specific serializer)
+ *
+ * This test should not fail, because de- / serialization of the state should noth be performed with Kryo's default
+ * {@link com.esotericsoftware.kryo.serializers.FieldSerializer}.
+ */
+ @Test
+ public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.update(new TestPojo("u1", 1));
+
+ backend.setCurrentKey(2);
+ state.update(new TestPojo("u2", 2));
+
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint()));
+
+ backend.dispose();
+
+ // ====================================== restore snapshot ======================================
+
+ env.getExecutionConfig().registerKryoType(TestPojo.class);
+
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+ snapshot.discardState();
+
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ backend.setCurrentKey(1);
+ assertEquals(state.value(), new TestPojo("u1", 1));
+
+ backend.setCurrentKey(2);
+ assertEquals(state.value(), new TestPojo("u2", 2));
+
+ backend.dispose();
+ }
+
+ /**
+ * Verify state restore resilience when:
+ * - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+ * - restored with a default serializer for the state type
+ *
+ * <p> The default serializer used on restore is {@link CustomKryoTestSerializer}, which deliberately
+ * fails only on deserialization. We use the deliberate deserialization failure to acknowledge test success.
+ *
+ * @throws Exception expects {@link ExpectedKryoTestException} to be thrown.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+ ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.update(new TestPojo("u1", 1));
+
+ backend.setCurrentKey(2);
+ state.update(new TestPojo("u2", 2));
+
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint()));
+
+ backend.dispose();
+
+ // ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
+
+ // cast because our test serializer is not typed to TestPojo
+ env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+ snapshot.discardState();
+
+ // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+ // initializeSerializerUnlessSet would not pick up our new config
+ kvId = new ValueStateDescriptor<>("id", pojoType);
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+
+ // update to test state backends that eagerly serialize, such as RocksDB
+ state.update(new TestPojo("u1", 11));
+
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint()));
+
+ backend.dispose();
+
+ // ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) =========
+
+ // cast because our test serializer is not typed to TestPojo
+ env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+
+ // on the second restore, since the custom serializer will be used for
+ // deserialization, we expect the deliberate failure to be thrown
+ expectedException.expect(ExpectedKryoTestException.class);
+
+ // state backends that eagerly deserializes (such as the memory state backend) will fail here
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ // state backends that lazily deserializes (such as RocksDB) will fail here
+ state.value();
+ }
+
+ /**
+ * Verify state restore resilience when:
+ * - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+ * - restored with a specific serializer for the state type
+ *
+ * <p> The specific serializer used on restore is {@link CustomKryoTestSerializer}, which deliberately
+ * fails only on deserialization. We use the deliberate deserialization failure to acknowledge test success.
+ *
+ * @throws Exception expects {@link ExpectedKryoTestException} to be thrown.
+ */
+ @Test
+ public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment("test", 1, 0);
+
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+ // make sure that we are in fact using the KryoSerializer
+ assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+ ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+ ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ // ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.update(new TestPojo("u1", 1));
+
+ backend.setCurrentKey(2);
+ state.update(new TestPojo("u2", 2));
+
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint()));
+
+ backend.dispose();
+
+ // ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
+
+ env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+ snapshot.discardState();
+
+ // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+ // initializeSerializerUnlessSet would not pick up our new config
+ kvId = new ValueStateDescriptor<>("id", pojoType);
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+
+ // update to test state backends that eagerly serialize, such as RocksDB
+ state.update(new TestPojo("u1", 11));
+
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+ 682375462378L,
+ 2,
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint()));
+
+ backend.dispose();
+
+ // ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========
+
+ env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+
+ // on the second restore, since the custom serializer will be used for
+ // deserialization, we expect the deliberate failure to be thrown
+ expectedException.expect(ExpectedKryoTestException.class);
+
+ // state backends that eagerly deserializes (such as the memory state backend) will fail here
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+ state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(1);
+ // state backends that lazily deserializes (such as RocksDB) will fail here
+ state.value();
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
public void testValueState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- TypeSerializer<String> valueSerializer = kvId.getSerializer();
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ // this is only available after the backend initialized the serializer
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
// some modifications to the state
backend.setCurrentKey(1);
assertNull(state.value());
@@ -276,16 +746,17 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
final ValueStateDescriptor<String> kvId =
new ValueStateDescriptor<>("id", String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
final TypeSerializer<Integer> namespaceSerializer =
IntSerializer.INSTANCE;
- final TypeSerializer<String> valueSerializer = kvId.getSerializer();
final ValueState<String> state = backend
.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
+ // this is only available after the backend initialized the serializer
+ final TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
@SuppressWarnings("unchecked")
final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
@@ -390,9 +861,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE);
ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE);
- desc1.initializeSerializerUnlessSet(new ExecutionConfig());
- desc2.initializeSerializerUnlessSet(new ExecutionConfig());
-
ValueState<String> state1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc1);
ValueState<Integer> state2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc2);
@@ -459,7 +927,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -499,463 +966,443 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@Test
@SuppressWarnings("unchecked,rawtypes")
- public void testListState() {
- try {
- CheckpointStreamFactory streamFactory = createStreamFactory();
- AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+ public void testListState() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
- ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+ ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
- TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
- TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
- Joiner joiner = Joiner.on(",");
- // some modifications to the state
- backend.setCurrentKey(1);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add("1");
- backend.setCurrentKey(2);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add("2");
- backend.setCurrentKey(1);
- assertEquals("1", joiner.join(state.get()));
- assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ // this is only available after the backend initialized the serializer
+ TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
- // draw a snapshot
- KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ Joiner joiner = Joiner.on(",");
+ // some modifications to the state
+ backend.setCurrentKey(1);
+ assertEquals(null, state.get());
+ assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.add("1");
+ backend.setCurrentKey(2);
+ assertEquals(null, state.get());
+ assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.add("2");
+ backend.setCurrentKey(1);
+ assertEquals("1", joiner.join(state.get()));
+ assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- // make some more modifications
- backend.setCurrentKey(1);
- state.add("u1");
- backend.setCurrentKey(2);
- state.add("u2");
- backend.setCurrentKey(3);
- state.add("u3");
+ // draw a snapshot
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
- // draw another snapshot
- KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.add("u1");
+ backend.setCurrentKey(2);
+ state.add("u2");
+ backend.setCurrentKey(3);
+ state.add("u3");
- // validate the original state
- backend.setCurrentKey(1);
- assertEquals("1,u1", joiner.join(state.get()));
- assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- backend.setCurrentKey(2);
- assertEquals("2,u2", joiner.join(state.get()));
- assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- backend.setCurrentKey(3);
- assertEquals("u3", joiner.join(state.get()));
- assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ // draw another snapshot
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
- backend.dispose();
- // restore the first snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
- snapshot1.discardState();
+ // validate the original state
+ backend.setCurrentKey(1);
+ assertEquals("1,u1", joiner.join(state.get()));
+ assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ backend.setCurrentKey(2);
+ assertEquals("2,u2", joiner.join(state.get()));
+ assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ backend.setCurrentKey(3);
+ assertEquals("u3", joiner.join(state.get()));
+ assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ backend.dispose();
+ // restore the first snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+ snapshot1.discardState();
- backend.setCurrentKey(1);
- assertEquals("1", joiner.join(restored1.get()));
- assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- backend.setCurrentKey(2);
- assertEquals("2", joiner.join(restored1.get()));
- assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
- backend.dispose();
- // restore the second snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
- snapshot2.discardState();
+ backend.setCurrentKey(1);
+ assertEquals("1", joiner.join(restored1.get()));
+ assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ backend.setCurrentKey(2);
+ assertEquals("2", joiner.join(restored1.get()));
+ assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ backend.dispose();
+ // restore the second snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+ snapshot2.discardState();
- backend.setCurrentKey(1);
- assertEquals("1,u1", joiner.join(restored2.get()));
- assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- backend.setCurrentKey(2);
- assertEquals("2,u2", joiner.join(restored2.get()));
- assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
- backend.setCurrentKey(3);
- assertEquals("u3", joiner.join(restored2.get()));
- assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
- backend.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ backend.setCurrentKey(1);
+ assertEquals("1,u1", joiner.join(restored2.get()));
+ assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ backend.setCurrentKey(2);
+ assertEquals("2,u2", joiner.join(restored2.get()));
+ assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+ backend.setCurrentKey(3);
+ assertEquals("u3", joiner.join(restored2.get()));
+ assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
+ backend.dispose();
}
@Test
@SuppressWarnings("unchecked")
- public void testReducingState() {
- try {
- CheckpointStreamFactory streamFactory = createStreamFactory();
- AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+ public void testReducingState() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
- ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+ ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
- TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
- TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- TypeSerializer<String> valueSerializer = kvId.getSerializer();
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
- // some modifications to the state
- backend.setCurrentKey(1);
- assertEquals(null, state.get());
- assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add("1");
- backend.setCurrentKey(2);
- assertEquals(null, state.get());
- assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add("2");
- backend.setCurrentKey(1);
- assertEquals("1", state.get());
- assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ // this is only available after the backend initialized the serializer
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
- // draw a snapshot
- KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // some modifications to the state
+ backend.setCurrentKey(1);
+ assertEquals(null, state.get());
+ assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.add("1");
+ backend.setCurrentKey(2);
+ assertEquals(null, state.get());
+ assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.add("2");
+ backend.setCurrentKey(1);
+ assertEquals("1", state.get());
+ assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- // make some more modifications
- backend.setCurrentKey(1);
- state.add("u1");
- backend.setCurrentKey(2);
- state.add("u2");
- backend.setCurrentKey(3);
- state.add("u3");
+ // draw a snapshot
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
- // draw another snapshot
- KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.add("u1");
+ backend.setCurrentKey(2);
+ state.add("u2");
+ backend.setCurrentKey(3);
+ state.add("u3");
- // validate the original state
- backend.setCurrentKey(1);
- assertEquals("1,u1", state.get());
- assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(2);
- assertEquals("2,u2", state.get());
- assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(3);
- assertEquals("u3", state.get());
- assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ // draw another snapshot
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
- backend.dispose();
- // restore the first snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
- snapshot1.discardState();
+ // validate the original state
+ backend.setCurrentKey(1);
+ assertEquals("1,u1", state.get());
+ assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("2,u2", state.get());
+ assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(3);
+ assertEquals("u3", state.get());
+ assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ backend.dispose();
+ // restore the first snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+ snapshot1.discardState();
- backend.setCurrentKey(1);
- assertEquals("1", restored1.get());
- assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(2);
- assertEquals("2", restored1.get());
- assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
- backend.dispose();
- // restore the second snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
- snapshot2.discardState();
+ backend.setCurrentKey(1);
+ assertEquals("1", restored1.get());
+ assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("2", restored1.get());
+ assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ backend.dispose();
+ // restore the second snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+ snapshot2.discardState();
- backend.setCurrentKey(1);
- assertEquals("1,u1", restored2.get());
- assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(2);
- assertEquals("2,u2", restored2.get());
- assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(3);
- assertEquals("u3", restored2.get());
- assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
- backend.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ backend.setCurrentKey(1);
+ assertEquals("1,u1", restored2.get());
+ assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("2,u2", restored2.get());
+ assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(3);
+ assertEquals("u3", restored2.get());
+ assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+ backend.dispose();
}
@Test
@SuppressWarnings("unchecked,rawtypes")
- public void testFoldingState() {
- try {
- CheckpointStreamFactory streamFactory = createStreamFactory();
- AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+ public void testFoldingState() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
- FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
- "Fold-Initial:",
- new AppendingFold(),
- String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+ FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
+ "Fold-Initial:",
+ new AppendingFold(),
+ String.class);
- TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
- TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- TypeSerializer<String> valueSerializer = kvId.getSerializer();
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
- // some modifications to the state
- backend.setCurrentKey(1);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add(1);
- backend.setCurrentKey(2);
- assertEquals(null, state.get());
- assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- state.add(2);
- backend.setCurrentKey(1);
- assertEquals("Fold-Initial:,1", state.get());
- assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ // this is only available after the backend initialized the serializer
+ TypeSerializer<String> valueSerializer = kvId.getSerializer();
- // draw a snapshot
- KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // some modifications to the state
+ backend.setCurrentKey(1);
+ assertEquals(null, state.get());
+ assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.add(1);
+ backend.setCurrentKey(2);
+ assertEquals(null, state.get());
+ assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ state.add(2);
+ backend.setCurrentKey(1);
+ assertEquals("Fold-Initial:,1", state.get());
+ assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- // make some more modifications
- backend.setCurrentKey(1);
- state.clear();
- state.add(101);
- backend.setCurrentKey(2);
- state.add(102);
- backend.setCurrentKey(3);
- state.add(103);
+ // draw a snapshot
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
- // draw another snapshot
- KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.clear();
+ state.add(101);
+ backend.setCurrentKey(2);
+ state.add(102);
+ backend.setCurrentKey(3);
+ state.add(103);
- // validate the original state
- backend.setCurrentKey(1);
- assertEquals("Fold-Initial:,101", state.get());
- assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(2);
- assertEquals("Fold-Initial:,2,102", state.get());
- assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(3);
- assertEquals("Fold-Initial:,103", state.get());
- assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ // draw another snapshot
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
- backend.dispose();
- // restore the first snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
- snapshot1.discardState();
+ // validate the original state
+ backend.setCurrentKey(1);
+ assertEquals("Fold-Initial:,101", state.get());
+ assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("Fold-Initial:,2,102", state.get());
+ assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(3);
+ assertEquals("Fold-Initial:,103", state.get());
+ assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ backend.dispose();
+ // restore the first snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+ snapshot1.discardState();
- backend.setCurrentKey(1);
- assertEquals("Fold-Initial:,1", restored1.get());
- assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(2);
- assertEquals("Fold-Initial:,2", restored1.get());
- assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
- backend.dispose();
- // restore the second snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
- snapshot1.discardState();
+ backend.setCurrentKey(1);
+ assertEquals("Fold-Initial:,1", restored1.get());
+ assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("Fold-Initial:,2", restored1.get());
+ assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- @SuppressWarnings("unchecked")
- FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ backend.dispose();
+ // restore the second snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+ snapshot1.discardState();
- backend.setCurrentKey(1);
- assertEquals("Fold-Initial:,101", restored2.get());
- assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(2);
- assertEquals("Fold-Initial:,2,102", restored2.get());
- assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
- backend.setCurrentKey(3);
- assertEquals("Fold-Initial:,103", restored2.get());
- assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ @SuppressWarnings("unchecked")
+ FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
- backend.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ backend.setCurrentKey(1);
+ assertEquals("Fold-Initial:,101", restored2.get());
+ assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("Fold-Initial:,2,102", restored2.get());
+ assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+ backend.setCurrentKey(3);
+ assertEquals("Fold-Initial:,103", restored2.get());
+ assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+ backend.dispose();
}
@Test
@SuppressWarnings("unchecked,rawtypes")
- public void testMapState() {
- try {
- CheckpointStreamFactory streamFactory = createStreamFactory();
- AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+ public void testMapState() throws Exception {
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
- MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+ MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
- TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
- TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
- TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
+ TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+ MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
- // some modifications to the state
- backend.setCurrentKey(1);
- assertEquals(null, state.get(1));
- assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- state.put(1, "1");
- backend.setCurrentKey(2);
- assertEquals(null, state.get(2));
- assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- state.put(2, "2");
- backend.setCurrentKey(1);
- assertTrue(state.contains(1));
- assertEquals("1", state.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
- getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ // these are only available after the backend initialized the serializer
+ TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
+ TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
- // draw a snapshot
- KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // some modifications to the state
+ backend.setCurrentKey(1);
+ assertEquals(null, state.get(1));
+ assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ state.put(1, "1");
+ backend.setCurrentKey(2);
+ assertEquals(null, state.get(2));
+ assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ state.put(2, "2");
+ backend.setCurrentKey(1);
+ assertTrue(state.contains(1));
+ assertEquals("1", state.get(1));
+ assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+ getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- // make some more modifications
- backend.setCurrentKey(1);
- state.put(1, "101");
- backend.setCurrentKey(2);
- state.put(102, "102");
- backend.setCurrentKey(3);
- state.put(103, "103");
- state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
+ // draw a snapshot
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
- // draw another snapshot
- KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.put(1, "101");
+ backend.setCurrentKey(2);
+ state.put(102, "102");
+ backend.setCurrentKey(3);
+ state.put(103, "103");
+ state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
- // validate the original state
- backend.setCurrentKey(1);
- assertEquals("101", state.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
- getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- backend.setCurrentKey(2);
- assertEquals("102", state.get(102));
- assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
- getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- backend.setCurrentKey(3);
- assertTrue(state.contains(103));
- assertEquals("103", state.get(103));
- assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
- getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-
- List<Integer> keys = new ArrayList<>();
- for (Integer key : state.keys()) {
- keys.add(key);
- }
- List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
- assertEquals(keys.size(), expectedKeys.size());
- keys.removeAll(expectedKeys);
- assertTrue(keys.isEmpty());
-
- List<String> values = new ArrayList<>();
- for (String value : state.values()) {
- values.add(value);
- }
- List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
- assertEquals(values.size(), expectedValues.size());
- values.removeAll(expectedValues);
- assertTrue(values.isEmpty());
+ // draw another snapshot
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
- // make some more modifications
- backend.setCurrentKey(1);
- state.clear();
- backend.setCurrentKey(2);
- state.remove(102);
- backend.setCurrentKey(3);
- final String updateSuffix = "_updated";
- Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, String> entry = iterator.next();
- if (entry.getValue().length() != 4) {
- iterator.remove();
- } else {
- entry.setValue(entry.getValue() + updateSuffix);
- }
- }
+ // validate the original state
+ backend.setCurrentKey(1);
+ assertEquals("101", state.get(1));
+ assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
+ getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("102", state.get(102));
+ assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
+ getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ backend.setCurrentKey(3);
+ assertTrue(state.contains(103));
+ assertEquals("103", state.get(103));
+ assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+ getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+
+ List<Integer> keys = new ArrayList<>();
+ for (Integer key : state.keys()) {
+ keys.add(key);
+ }
+ List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
+ assertEquals(keys.size(), expectedKeys.size());
+ keys.removeAll(expectedKeys);
+ assertTrue(keys.isEmpty());
+
+ List<String> values = new ArrayList<>();
+ for (String value : state.values()) {
+ values.add(value);
+ }
+ List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
+ assertEquals(values.size(), expectedValues.size());
+ values.removeAll(expectedValues);
+ assertTrue(values.isEmpty());
- // validate the state
- backend.setCurrentKey(1);
- backend.setCurrentKey(2);
- assertFalse(state.contains(102));
- backend.setCurrentKey(3);
- for (Map.Entry<Integer, String> entry : state.entries()) {
- assertEquals(4 + updateSuffix.length(), entry.getValue().length());
- assertTrue(entry.getValue().endsWith(updateSuffix));
+ // make some more modifications
+ backend.setCurrentKey(1);
+ state.clear();
+ backend.setCurrentKey(2);
+ state.remove(102);
+ backend.setCurrentKey(3);
+ final String updateSuffix = "_updated";
+ Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, String> entry = iterator.next();
+ if (entry.getValue().length() != 4) {
+ iterator.remove();
+ } else {
+ entry.setValue(entry.getValue() + updateSuffix);
}
+ }
- backend.dispose();
- // restore the first snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
- snapshot1.discardState();
+ // validate the state
+ backend.setCurrentKey(1);
+ backend.setCurrentKey(2);
+ assertFalse(state.contains(102));
+ backend.setCurrentKey(3);
+ for (Map.Entry<Integer, String> entry : state.entries()) {
+ assertEquals(4 + updateSuffix.length(), entry.getValue().length());
+ assertTrue(entry.getValue().endsWith(updateSuffix));
+ }
- MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+ backend.dispose();
+ // restore the first snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+ snapshot1.discardState();
- backend.setCurrentKey(1);
- assertEquals("1", restored1.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
- getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- backend.setCurrentKey(2);
- assertEquals("2", restored1.get(2));
- assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
- getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
- backend.dispose();
- // restore the second snapshot and validate it
- backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
- snapshot2.discardState();
+ backend.setCurrentKey(1);
+ assertEquals("1", restored1.get(1));
+ assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+ getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("2", restored1.get(2));
+ assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
+ getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- @SuppressWarnings("unchecked")
- MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
- @SuppressWarnings("unchecked")
- InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+ backend.dispose();
+ // restore the second snapshot and validate it
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+ snapshot2.discardState();
- backend.setCurrentKey(1);
- assertEquals("101", restored2.get(1));
- assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
- getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- backend.setCurrentKey(2);
- assertEquals("102", restored2.get(102));
- assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
- getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
- backend.setCurrentKey(3);
- assertEquals("103", restored2.get(103));
- assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
- getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ @SuppressWarnings("unchecked")
+ MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ @SuppressWarnings("unchecked")
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
- backend.dispose();
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ backend.setCurrentKey(1);
+ assertEquals("101", restored2.get(1));
+ assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
+ getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ backend.setCurrentKey(2);
+ assertEquals("102", restored2.get(102));
+ assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
+ getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ backend.setCurrentKey(3);
+ assertEquals("103", restored2.get(103));
+ assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+ getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+ backend.dispose();
}
/**
@@ -966,7 +1413,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -991,7 +1437,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello");
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1015,7 +1460,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ReducingState<String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1043,8 +1487,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
FoldingStateDescriptor<Integer, String> kvId =
new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
FoldingState<Integer, String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1071,7 +1513,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ListState<String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1098,7 +1539,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
MapStateDescriptor<String, String> kvId = new MapStateDescriptor<>("id", String.class, String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
MapState<String, String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1142,7 +1582,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
new DummyEnvironment("test", 1, 0));
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1224,7 +1663,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1430,7 +1868,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<IntValue> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1458,7 +1895,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
try {
backend.getPartitionedState(null, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1501,7 +1937,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
Integer.class,
-1);
desc.setQueryable("my-query");
- desc.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<Integer> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1524,7 +1959,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// ListState
ListStateDescriptor<Integer> desc = new ListStateDescriptor<>("list-state", Integer.class);
desc.setQueryable("my-query");
- desc.initializeSerializerUnlessSet(new ExecutionConfig());
ListState<Integer> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1552,7 +1986,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}, Integer.class);
desc.setQueryable("my-query");
- desc.initializeSerializerUnlessSet(new ExecutionConfig());
ReducingState<Integer> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1580,7 +2013,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}, Integer.class);
desc.setQueryable("my-query");
- desc.initializeSerializerUnlessSet(new ExecutionConfig());
FoldingState<Integer, Integer> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1602,7 +2034,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// MapState
MapStateDescriptor<Integer, String> desc = new MapStateDescriptor<>("map-state", Integer.class, String.class);
desc.setQueryable("my-query");
- desc.initializeSerializerUnlessSet(new ExecutionConfig());
MapState<Integer, String> state = backend.getPartitionedState(
VoidNamespace.INSTANCE,
@@ -1935,4 +2366,107 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
}
}
+
+ private KeyGroupsStateHandle runSnapshot(RunnableFuture<KeyGroupsStateHandle> snapshotRunnableFuture) throws Exception {
+ if(!snapshotRunnableFuture.isDone()) {
+ Thread runner = new Thread(snapshotRunnableFuture);
+ runner.start();
+ }
+ return snapshotRunnableFuture.get();
+ }
+
+ private static class TestPojo implements Serializable {
+ private String strField;
+ private Integer intField;
+
+ public TestPojo() {}
+
+ public TestPojo(String strField, Integer intField) {
+ this.strField = strField;
+ this.intField = intField;
+ }
+
+ public String getStrField() {
+ return strField;
+ }
+
+ public void setStrField(String strField) {
+ this.strField = strField;
+ }
+
+ public Integer getIntField() {
+ return intField;
+ }
+
+ public void setIntField(Integer intField) {
+ this.intField = intField;
+ }
+
+ @Override
+ public String toString() {
+ return "TestPojo{" +
+ "strField='" + strField + '\'' +
+ ", intField=" + intField +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestPojo testPojo = (TestPojo) o;
+
+ if (!strField.equals(testPojo.strField)) return false;
+ return intField.equals(testPojo.intField);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = strField.hashCode();
+ result = 31 * result + intField.hashCode();
+ return result;
+ }
+ }
+
+ /**
+ * We throw this in our {@link ExceptionThrowingTestSerializer}.
+ */
+ private static class ExpectedKryoTestException extends RuntimeException {}
+
+ /**
+ * Kryo {@code Serializer} that throws an expected exception. We use this to ensure
+ * that the state backend correctly uses a specified Kryo serializer.
+ */
+ public static class ExceptionThrowingTestSerializer extends JavaSerializer {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ throw new ExpectedKryoTestException();
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ throw new ExpectedKryoTestException();
+ }
+ }
+
+ /**
+ * Our custom version of {@link JavaSerializer} for checking whether restore with a registered
+ * serializer works when no serializer was previously registered.
+ *
+ * <p>This {@code Serializer} can only be used for writing, not for reading. With this we
+ * verify that state that was serialized without a registered {@code Serializer} is in fact
+ * not restored with a {@code Serializer} that was later registered.
+ */
+ public static class CustomKryoTestSerializer extends JavaSerializer {
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ super.write(kryo, output, object);
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ throw new ExpectedKryoTestException();
+ }
+ }
}