You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:27:57 UTC
[5/8] flink git commit: [FLINK-6714] [runtime] Use user classloader
for operator state copying on snapshots
[FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots
This closes #3987.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2f5dab3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2f5dab3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2f5dab3
Branch: refs/heads/master
Commit: b2f5dab330c8803ab72e2bbf4e94d68fc760c467
Parents: c793ea4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 25 16:54:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:18 2017 +0800
----------------------------------------------------------------------
.../state/DefaultOperatorStateBackend.java | 21 ++++++----
.../runtime/state/OperatorStateBackendTest.java | 42 ++++++++++++++++++++
2 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b2f5dab3/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eec2e93..0f96dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -185,13 +185,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
new HashMap<>(registeredStates.size());
// eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing
- for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
-
- PartitionableListState<?> listState = entry.getValue();
- if (null != listState) {
- listState = listState.deepCopy();
+ ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(userClassloader);
+ try {
+ for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
+ PartitionableListState<?> listState = entry.getValue();
+ if (null != listState) {
+ listState = listState.deepCopy();
+ }
+ registeredStatesDeepCopies.put(entry.getKey(), listState);
}
- registeredStatesDeepCopies.put(entry.getKey(), listState);
+ } finally {
+ Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
// implementation of the async IO operation, based on FutureTask
@@ -258,8 +263,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
task.run();
}
- LOG.info("DefaultOperatorStateBackend snapshot (" + streamFactory + ", synchronous part) in thread " +
- Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+ LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took {} ms.",
+ streamFactory, Thread.currentThread(), (System.currentTimeMillis() - syncStartTime));
return task;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b2f5dab3/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 31b75c7..d44f6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -35,6 +36,9 @@ import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -61,6 +65,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@@ -208,6 +213,43 @@ public class OperatorStateBackendTest {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
+
+ AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+ final Environment env = createMockEnvironment();
+ OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
+
+ // mock serializer which tests that on copy, the correct classloader is used as the context classloader
+ TypeSerializer<Integer> mockSerializer = mock(TypeSerializer.class);
+ when(mockSerializer.copy(Matchers.any(Integer.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Assert.assertEquals(env.getUserClassLoader(), Thread.currentThread().getContextClassLoader());
+ return null;
+ }
+ });
+ // return actual serializers / config snapshots so that the snapshot proceeds properly
+ when(mockSerializer.duplicate()).thenReturn(IntSerializer.INSTANCE);
+ when(mockSerializer.snapshotConfiguration()).thenReturn(IntSerializer.INSTANCE.snapshotConfiguration());
+
+ // write some state
+ ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", mockSerializer);
+ ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);
+
+ listState.add(42);
+
+ CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+ RunnableFuture<OperatorStateHandle> runnableFuture =
+ operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+ FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+ // make sure that the method of interest is called
+ verify(mockSerializer).copy(Matchers.any(Integer.class));
+ }
+
@Test
public void testSnapshotEmpty() throws Exception {
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);