You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 13:36:03 UTC
[2/2] flink git commit: [FLINK-6606] Set UserCodeClassLoader as TCCL
for MasterTriggerRestoreHook
[FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook
- wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied
This closes #3933.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ad08163
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ad08163
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ad08163
Branch: refs/heads/master
Commit: 2ad081636e54a2b8fd98a935a95c0949818843ad
Parents: acea4cd
Author: Wright, Eron <Er...@emc.com>
Authored: Wed May 17 09:46:13 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 19 15:22:49 2017 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/hooks/MasterHooks.java | 108 +++++++++++++++
.../executiongraph/ExecutionGraphBuilder.java | 15 ++-
.../checkpoint/hooks/MasterHooksTest.java | 131 +++++++++++++++++++
3 files changed, 251 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ad08163/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 409019e..737e816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.hooks;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -29,6 +30,7 @@ import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -267,6 +269,112 @@ public class MasterHooks {
}
// ------------------------------------------------------------------------
+ // hook management
+ // ------------------------------------------------------------------------
+
+ /**
+ * Wraps a hook such that the user-code classloader is applied when the hook is invoked.
+ * @param hook the hook to wrap
+ * @param userClassLoader the classloader to use
+ */
+ public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
+ return new WrappedMasterHook<T>(hook, userClassLoader);
+ }
+
+ @VisibleForTesting
+ static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
+
+ private final MasterTriggerRestoreHook<T> hook;
+ private final ClassLoader userClassLoader;
+
+ WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
+ this.hook = hook;
+ this.userClassLoader = userClassLoader;
+ }
+
+ @Override
+ public String getIdentifier() {
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+ try {
+ return hook.getIdentifier();
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Future<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
+ Executor wrappedExecutor = new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(new WrappedCommand(command));
+ }
+ };
+
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+ try {
+ return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor);
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Override
+ public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception {
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+ try {
+ hook.restoreCheckpoint(checkpointId, checkpointData);
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Nullable
+ @Override
+ public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+ try {
+ return hook.createCheckpointDataSerializer();
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ class WrappedCommand implements Runnable {
+ private final Runnable command;
+ WrappedCommand(Runnable command) {
+ this.command = command;
+ }
+
+ @Override
+ public void run() {
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+ try {
+ command.run();
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
/** This class is not meant to be instantiated */
private MasterHooks() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ad08163/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 0e76cfb..041e309 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -256,9 +257,17 @@ public class ExecutionGraphBuilder {
throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
}
- hooks = new ArrayList<>(hookFactories.length);
- for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
- hooks.add(factory.create());
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(classLoader);
+ try {
+ hooks = new ArrayList<>(hookFactories.length);
+ for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
+ hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
+ }
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ad08163/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
new file mode 100644
index 0000000..3f8a48c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the MasterHooks utility class.
+ */
+public class MasterHooksTest {
+
+ // ------------------------------------------------------------------------
+ // hook management
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void wrapHook() throws Exception {
+ final String id = "id";
+
+ Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final ClassLoader userClassLoader = new URLClassLoader(new URL[0]);
+
+ final Runnable command = spy(new Runnable() {
+ @Override
+ public void run() {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ }
+ });
+
+ MasterTriggerRestoreHook<String> hook = spy(new MasterTriggerRestoreHook<String>() {
+ @Override
+ public String getIdentifier() {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ return id;
+ }
+
+ @Nullable
+ @Override
+ public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ executor.execute(command);
+ return null;
+ }
+
+ @Override
+ public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) throws Exception {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ }
+
+ @Nullable
+ @Override
+ public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ return null;
+ }
+ });
+
+ MasterTriggerRestoreHook<String> wrapped = MasterHooks.wrapHook(hook, userClassLoader);
+
+ // verify getIdentifier
+ wrapped.getIdentifier();
+ verify(hook, times(1)).getIdentifier();
+ assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+ // verify triggerCheckpoint and its wrapped executor
+ TestExecutor testExecutor = new TestExecutor();
+ wrapped.triggerCheckpoint(0L, 0, testExecutor);
+ assertEquals(originalClassLoader, thread.getContextClassLoader());
+ assertNotNull(testExecutor.command);
+ testExecutor.command.run();
+ verify(command, times(1)).run();
+ assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+ // verify restoreCheckpoint
+ wrapped.restoreCheckpoint(0L, "");
+ verify(hook, times(1)).restoreCheckpoint(eq(0L), eq(""));
+ assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+ // verify createCheckpointDataSerializer
+ wrapped.createCheckpointDataSerializer();
+ verify(hook, times(1)).createCheckpointDataSerializer();
+ assertEquals(originalClassLoader, thread.getContextClassLoader());
+ }
+
+ private static class TestExecutor implements Executor {
+ Runnable command;
+ @Override
+ public void execute(Runnable command) {
+ this.command = command;
+ }
+ }
+
+ private static <T> T mockGeneric(Class<?> clazz) {
+ @SuppressWarnings("unchecked")
+ Class<T> typedClass = (Class<T>) clazz;
+ return mock(typedClass);
+ }
+}