You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:27:57 UTC

[5/8] flink git commit: [FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots

[FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots

This closes #3987.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2f5dab3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2f5dab3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2f5dab3

Branch: refs/heads/master
Commit: b2f5dab330c8803ab72e2bbf4e94d68fc760c467
Parents: c793ea4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 25 16:54:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:18 2017 +0800

----------------------------------------------------------------------
 .../state/DefaultOperatorStateBackend.java      | 21 ++++++----
 .../runtime/state/OperatorStateBackendTest.java | 42 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2f5dab3/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eec2e93..0f96dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -185,13 +185,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 				new HashMap<>(registeredStates.size());
 
 		// eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing
-		for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
-
-			PartitionableListState<?> listState = entry.getValue();
-			if (null != listState) {
-				listState = listState.deepCopy();
+		ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(userClassloader);
+		try {
+			for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
+				PartitionableListState<?> listState = entry.getValue();
+				if (null != listState) {
+					listState = listState.deepCopy();
+				}
+				registeredStatesDeepCopies.put(entry.getKey(), listState);
 			}
-			registeredStatesDeepCopies.put(entry.getKey(), listState);
+		} finally {
+			Thread.currentThread().setContextClassLoader(snapshotClassLoader);
 		}
 
 		// implementation of the async IO operation, based on FutureTask
@@ -258,8 +263,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			task.run();
 		}
 
-		LOG.info("DefaultOperatorStateBackend snapshot (" + streamFactory + ", synchronous part) in thread " +
-				Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+		LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took {} ms.",
+				streamFactory, Thread.currentThread(), (System.currentTimeMillis() - syncStartTime));
 
 		return task;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b2f5dab3/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 31b75c7..d44f6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -35,6 +36,9 @@ import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -61,6 +65,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
@@ -208,6 +213,43 @@ public class OperatorStateBackendTest {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
+
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		final Environment env = createMockEnvironment();
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
+
+		// mock serializer which tests that on copy, the correct classloader is used as the context classloader
+		TypeSerializer<Integer> mockSerializer = mock(TypeSerializer.class);
+		when(mockSerializer.copy(Matchers.any(Integer.class))).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				Assert.assertEquals(env.getUserClassLoader(), Thread.currentThread().getContextClassLoader());
+				return null;
+			}
+		});
+		// return actual serializers / config snapshots so that the snapshot proceeds properly
+		when(mockSerializer.duplicate()).thenReturn(IntSerializer.INSTANCE);
+		when(mockSerializer.snapshotConfiguration()).thenReturn(IntSerializer.INSTANCE.snapshotConfiguration());
+
+		// write some state
+		ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", mockSerializer);
+		ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);
+
+		listState.add(42);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		// make sure that the method of interest is called
+		verify(mockSerializer).copy(Matchers.any(Integer.class));
+	}
+
 	@Test
 	public void testSnapshotEmpty() throws Exception {
 		final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);