You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/20 14:41:02 UTC
[2/3] flink git commit: [FLINK-5995] [checkpoints] Harden test for
state descriptor passing to OperatorState
[FLINK-5995] [checkpoints] Harden test for state descriptor passing to OperatorState
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f700caf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f700caf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f700caf
Branch: refs/heads/master
Commit: 3f700cafbb21c98a94c2ad21b90c1823963fed29
Parents: 614abd2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 17 13:52:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 20 13:20:03 2017 +0100
----------------------------------------------------------------------
.../runtime/state/OperatorStateBackendTest.java | 89 ++++++++++++++------
1 file changed, 63 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3f700caf/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 157d5ee..bc446f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,21 +21,27 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
import org.apache.flink.util.FutureUtil;
-import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -43,54 +49,67 @@ import static org.mockito.Mockito.when;
public class OperatorStateBackendTest {
- AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
-
- static Environment createMockEnvironment() {
- Environment env = mock(Environment.class);
- ExecutionConfig config = mock(ExecutionConfig.class);
- when(env.getExecutionConfig()).thenReturn(config);
- when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
- return env;
- }
-
- private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception {
- //TODO this is temporarily casted to test already functionality that we do not yet expose through public API
- return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
- createMockEnvironment(),
- "test-operator");
- }
+ private final ClassLoader classLoader = getClass().getClassLoader();
@Test
- public void testCreateNew() throws Exception {
- OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ public void testCreateOnAbstractStateBackend() throws Exception {
+ // we use the memory state backend as a subclass of the AbstractStateBackend
+ final AbstractStateBackend abstractStateBackend = new MemoryStateBackend();
+ OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+ createMockEnvironment(), "test-operator");
+
assertNotNull(operatorStateBackend);
assertTrue(operatorStateBackend.getRegisteredStateNames().isEmpty());
}
@Test
public void testRegisterStatesWithoutTypeSerializer() throws Exception {
- DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ // prepare an execution config with a non standard type registered
+ final Class<?> registeredType = FutureTask.class;
+
+ // validate the precondition of this test - if this condition fails, we need to pick a different
+ // example serializer
+ assertFalse(new KryoSerializer<>(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(registeredType)
+ instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
+
+ final ExecutionConfig cfg = new ExecutionConfig();
+ cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+
+ final DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg);
+
ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class);
ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class);
+
ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor);
assertNotNull(listState);
+
ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
assertNotNull(listState2);
+
assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
+
+ // make sure that type registrations are forwarded
+ TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer();
+ assertTrue(serializer instanceof KryoSerializer);
+ assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType)
+ instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
+
Iterator<String> it = listState2.get().iterator();
- assertTrue(!it.hasNext());
+ assertFalse(it.hasNext());
listState2.add("kevin");
listState2.add("sunny");
it = listState2.get().iterator();
assertEquals("kevin", it.next());
assertEquals("sunny", it.next());
- assertTrue(!it.hasNext());
+ assertFalse(it.hasNext());
}
@Test
public void testRegisterStates() throws Exception {
- DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ final DefaultOperatorStateBackend operatorStateBackend =
+ new DefaultOperatorStateBackend(classLoader, new ExecutionConfig());
+
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -173,7 +192,11 @@ public class OperatorStateBackendTest {
@Test
public void testSnapshotEmpty() throws Exception {
- DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+ final DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend)
+ abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
+
CheckpointStreamFactory streamFactory =
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
@@ -181,12 +204,16 @@ public class OperatorStateBackendTest {
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
- Assert.assertNull(stateHandle);
+ assertNull(stateHandle);
}
@Test
public void testSnapshotRestore() throws Exception {
- DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+ AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+ DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend)
+ abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
+
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -255,4 +282,14 @@ public class OperatorStateBackendTest {
}
}
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static Environment createMockEnvironment() {
+ Environment env = mock(Environment.class);
+ when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+ return env;
+ }
}