You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/20 14:41:02 UTC

[2/3] flink git commit: [FLINK-5995] [checkpoints] Harden test for state descriptor passing to OperatorState

[FLINK-5995] [checkpoints] Harden test for state descriptor passing to OperatorState


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f700caf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f700caf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f700caf

Branch: refs/heads/master
Commit: 3f700cafbb21c98a94c2ad21b90c1823963fed29
Parents: 614abd2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 17 13:52:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 20 13:20:03 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/OperatorStateBackendTest.java | 89 ++++++++++++++------
 1 file changed, 63 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f700caf/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 157d5ee..bc446f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,21 +21,27 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
 import org.apache.flink.util.FutureUtil;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -43,54 +49,67 @@ import static org.mockito.Mockito.when;
 
 public class OperatorStateBackendTest {
 
-	AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
-
-	static Environment createMockEnvironment() {
-		Environment env = mock(Environment.class);
-		ExecutionConfig config = mock(ExecutionConfig.class);
-		when(env.getExecutionConfig()).thenReturn(config);
-		when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
-		return env;
-	}
-
-	private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception {
-		//TODO this is temporarily casted to test already functionality that we do not yet expose through public API
-		return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
-				createMockEnvironment(),
-				"test-operator");
-	}
+	private final ClassLoader classLoader = getClass().getClassLoader();
 
 	@Test
-	public void testCreateNew() throws Exception {
-		OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+	public void testCreateOnAbstractStateBackend() throws Exception {
+		// we use the memory state backend as a subclass of the AbstractStateBackend
+		final AbstractStateBackend abstractStateBackend = new MemoryStateBackend();
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
+				createMockEnvironment(), "test-operator");
+
 		assertNotNull(operatorStateBackend);
 		assertTrue(operatorStateBackend.getRegisteredStateNames().isEmpty());
 	}
 
 	@Test
 	public void testRegisterStatesWithoutTypeSerializer() throws Exception {
-		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		// prepare an execution config with a non standard type registered
+		final Class<?> registeredType = FutureTask.class;
+
+		// validate the precondition of this test - if this condition fails, we need to pick a different
+		// example serializer
+		assertFalse(new KryoSerializer<>(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(registeredType)
+				instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
+
+		final ExecutionConfig cfg = new ExecutionConfig();
+		cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);
+
+		final DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg);
+
 		ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class);
 		ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class);
+
 		ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor);
 		assertNotNull(listState);
+
 		ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
 		assertNotNull(listState2);
+
 		assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
+
+		// make sure that type registrations are forwarded
+		TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer();
+		assertTrue(serializer instanceof KryoSerializer);
+		assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType)
+				instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
+
 		Iterator<String> it = listState2.get().iterator();
-		assertTrue(!it.hasNext());
+		assertFalse(it.hasNext());
 		listState2.add("kevin");
 		listState2.add("sunny");
 
 		it = listState2.get().iterator();
 		assertEquals("kevin", it.next());
 		assertEquals("sunny", it.next());
-		assertTrue(!it.hasNext());
+		assertFalse(it.hasNext());
 	}
 
 	@Test
 	public void testRegisterStates() throws Exception {
-		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		final DefaultOperatorStateBackend operatorStateBackend =
+				new DefaultOperatorStateBackend(classLoader, new ExecutionConfig());
+
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -173,7 +192,11 @@ public class OperatorStateBackendTest {
 
 	@Test
 	public void testSnapshotEmpty() throws Exception {
-		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		final DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend)
+				abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
+
 		CheckpointStreamFactory streamFactory =
 				abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
 
@@ -181,12 +204,16 @@ public class OperatorStateBackendTest {
 				operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
 
 		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
-		Assert.assertNull(stateHandle);
+		assertNull(stateHandle);
 	}
 
 	@Test
 	public void testSnapshotRestore() throws Exception {
-		DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend)
+				abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
+
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -255,4 +282,14 @@ public class OperatorStateBackendTest {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static Environment createMockEnvironment() {
+		Environment env = mock(Environment.class);
+		when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
+		return env;
+	}
 }