You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 13:21:56 UTC

[1/2] flink git commit: [FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook

Repository: flink
Updated Branches:
  refs/heads/release-1.3 6d178a959 -> f6a596fe3


[FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook

- wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied

This closes #3933.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3dba48ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3dba48ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3dba48ee

Branch: refs/heads/release-1.3
Commit: 3dba48ee6ad3dc6bdb9abfaa91accbb8581cfd2a
Parents: 6d178a9
Author: Wright, Eron <Er...@emc.com>
Authored: Wed May 17 09:46:13 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 19 15:21:23 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/hooks/MasterHooks.java   | 108 +++++++++++++++
 .../executiongraph/ExecutionGraphBuilder.java   |  15 ++-
 .../checkpoint/hooks/MasterHooksTest.java       | 131 +++++++++++++++++++
 3 files changed, 251 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3dba48ee/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 409019e..737e816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.hooks;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -29,6 +30,7 @@ import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -267,6 +269,112 @@ public class MasterHooks {
 	}
 
 	// ------------------------------------------------------------------------
+	//  hook management
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Wraps a hook such that the user-code classloader is applied when the hook is invoked.
+	 * @param hook the hook to wrap
+	 * @param userClassLoader the classloader to use
+	 */
+	public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
+		return new WrappedMasterHook<T>(hook, userClassLoader);
+	}
+
+	@VisibleForTesting
+	static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
+
+		private final MasterTriggerRestoreHook<T> hook;
+		private final ClassLoader userClassLoader;
+
+		WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
+			this.hook = hook;
+			this.userClassLoader = userClassLoader;
+		}
+
+		@Override
+		public String getIdentifier() {
+			Thread thread = Thread.currentThread();
+			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			thread.setContextClassLoader(userClassLoader);
+			try {
+				return hook.getIdentifier();
+			}
+			finally {
+				thread.setContextClassLoader(originalClassLoader);
+			}
+		}
+
+		@Nullable
+		@Override
+		public Future<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
+			Executor wrappedExecutor = new Executor() {
+				@Override
+				public void execute(Runnable command) {
+					executor.execute(new WrappedCommand(command));
+				}
+			};
+
+			Thread thread = Thread.currentThread();
+			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			thread.setContextClassLoader(userClassLoader);
+			try {
+				return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor);
+			}
+			finally {
+				thread.setContextClassLoader(originalClassLoader);
+			}
+		}
+
+		@Override
+		public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception {
+			Thread thread = Thread.currentThread();
+			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			thread.setContextClassLoader(userClassLoader);
+			try {
+				hook.restoreCheckpoint(checkpointId, checkpointData);
+			}
+			finally {
+				thread.setContextClassLoader(originalClassLoader);
+			}
+		}
+
+		@Nullable
+		@Override
+		public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
+			Thread thread = Thread.currentThread();
+			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			thread.setContextClassLoader(userClassLoader);
+			try {
+				return hook.createCheckpointDataSerializer();
+			}
+			finally {
+				thread.setContextClassLoader(originalClassLoader);
+			}
+		}
+
+		class WrappedCommand implements Runnable {
+			private final Runnable command;
+			WrappedCommand(Runnable command) {
+				this.command = command;
+			}
+
+			@Override
+			public void run() {
+				Thread thread = Thread.currentThread();
+				ClassLoader originalClassLoader = thread.getContextClassLoader();
+				thread.setContextClassLoader(userClassLoader);
+				try {
+					command.run();
+				}
+				finally {
+					thread.setContextClassLoader(originalClassLoader);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated */
 	private MasterHooks() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3dba48ee/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index de0d9d0..ecac2e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -257,9 +258,17 @@ public class ExecutionGraphBuilder {
 					throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
 				}
 
-				hooks = new ArrayList<>(hookFactories.length);
-				for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
-					hooks.add(factory.create());
+				Thread thread = Thread.currentThread();
+				ClassLoader originalClassLoader = thread.getContextClassLoader();
+				thread.setContextClassLoader(classLoader);
+				try {
+					hooks = new ArrayList<>(hookFactories.length);
+					for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
+						hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
+					}
+				}
+				finally {
+					thread.setContextClassLoader(originalClassLoader);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3dba48ee/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
new file mode 100644
index 0000000..3f8a48c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the MasterHooks utility class.
+ */
+public class MasterHooksTest {
+
+	// ------------------------------------------------------------------------
+	//  hook management
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void wrapHook() throws Exception {
+		final String id = "id";
+
+		Thread thread = Thread.currentThread();
+		final ClassLoader originalClassLoader = thread.getContextClassLoader();
+		final ClassLoader userClassLoader = new URLClassLoader(new URL[0]);
+
+		final Runnable command = spy(new Runnable() {
+			@Override
+			public void run() {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+			}
+		});
+
+		MasterTriggerRestoreHook<String> hook = spy(new MasterTriggerRestoreHook<String>() {
+			@Override
+			public String getIdentifier() {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+				return id;
+			}
+
+			@Nullable
+			@Override
+			public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+				executor.execute(command);
+				return null;
+			}
+
+			@Override
+			public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) throws Exception {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+			}
+
+			@Nullable
+			@Override
+			public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+				return null;
+			}
+		});
+
+		MasterTriggerRestoreHook<String> wrapped = MasterHooks.wrapHook(hook, userClassLoader);
+
+		// verify getIdentifier
+		wrapped.getIdentifier();
+		verify(hook, times(1)).getIdentifier();
+		assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+		// verify triggerCheckpoint and its wrapped executor
+		TestExecutor testExecutor = new TestExecutor();
+		wrapped.triggerCheckpoint(0L, 0, testExecutor);
+		assertEquals(originalClassLoader, thread.getContextClassLoader());
+		assertNotNull(testExecutor.command);
+		testExecutor.command.run();
+		verify(command, times(1)).run();
+		assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+		// verify restoreCheckpoint
+		wrapped.restoreCheckpoint(0L, "");
+		verify(hook, times(1)).restoreCheckpoint(eq(0L), eq(""));
+		assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+		// verify createCheckpointDataSerializer
+		wrapped.createCheckpointDataSerializer();
+		verify(hook, times(1)).createCheckpointDataSerializer();
+		assertEquals(originalClassLoader, thread.getContextClassLoader());
+	}
+
+	private static class TestExecutor implements Executor {
+		Runnable command;
+		@Override
+		public void execute(Runnable command) {
+			this.command = command;
+		}
+	}
+
+	private static <T> T mockGeneric(Class<?> clazz) {
+		@SuppressWarnings("unchecked")
+		Class<T> typedClass = (Class<T>) clazz;
+		return mock(typedClass);
+	}
+}


[2/2] flink git commit: [FLINK-6606] Hide WrapperMasterHook by making it private

Posted by tr...@apache.org.
[FLINK-6606] Hide WrapperMasterHook by making it private


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6a596fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6a596fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6a596fe

Branch: refs/heads/release-1.3
Commit: f6a596fe3f8924602c34636959e3d7e48cb2262f
Parents: 3dba48e
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 19 15:06:35 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 19 15:21:24 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/hooks/MasterHooks.java   | 41 +++++++++++---------
 .../executiongraph/ExecutionGraphBuilder.java   |  5 ++-
 .../checkpoint/hooks/MasterHooksTest.java       | 11 ++----
 3 files changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 737e816..1851eb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint.hooks;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -278,25 +278,25 @@ public class MasterHooks {
 	 * @param userClassLoader the classloader to use
 	 */
 	public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
-		return new WrappedMasterHook<T>(hook, userClassLoader);
+		return new WrappedMasterHook<>(hook, userClassLoader);
 	}
 
-	@VisibleForTesting
-	static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
+	private static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
 
 		private final MasterTriggerRestoreHook<T> hook;
 		private final ClassLoader userClassLoader;
 
 		WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
-			this.hook = hook;
-			this.userClassLoader = userClassLoader;
+			this.hook = Preconditions.checkNotNull(hook);
+			this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
 		}
 
 		@Override
 		public String getIdentifier() {
-			Thread thread = Thread.currentThread();
-			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			final Thread thread = Thread.currentThread();
+			final ClassLoader originalClassLoader = thread.getContextClassLoader();
 			thread.setContextClassLoader(userClassLoader);
+
 			try {
 				return hook.getIdentifier();
 			}
@@ -315,9 +315,10 @@ public class MasterHooks {
 				}
 			};
 
-			Thread thread = Thread.currentThread();
-			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			final Thread thread = Thread.currentThread();
+			final ClassLoader originalClassLoader = thread.getContextClassLoader();
 			thread.setContextClassLoader(userClassLoader);
+
 			try {
 				return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor);
 			}
@@ -328,9 +329,10 @@ public class MasterHooks {
 
 		@Override
 		public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception {
-			Thread thread = Thread.currentThread();
-			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			final Thread thread = Thread.currentThread();
+			final ClassLoader originalClassLoader = thread.getContextClassLoader();
 			thread.setContextClassLoader(userClassLoader);
+
 			try {
 				hook.restoreCheckpoint(checkpointId, checkpointData);
 			}
@@ -342,9 +344,10 @@ public class MasterHooks {
 		@Nullable
 		@Override
 		public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
-			Thread thread = Thread.currentThread();
-			ClassLoader originalClassLoader = thread.getContextClassLoader();
+			final Thread thread = Thread.currentThread();
+			final ClassLoader originalClassLoader = thread.getContextClassLoader();
 			thread.setContextClassLoader(userClassLoader);
+
 			try {
 				return hook.createCheckpointDataSerializer();
 			}
@@ -353,17 +356,19 @@ public class MasterHooks {
 			}
 		}
 
-		class WrappedCommand implements Runnable {
+		private class WrappedCommand implements Runnable {
 			private final Runnable command;
+
 			WrappedCommand(Runnable command) {
-				this.command = command;
+				this.command = Preconditions.checkNotNull(command);
 			}
 
 			@Override
 			public void run() {
-				Thread thread = Thread.currentThread();
-				ClassLoader originalClassLoader = thread.getContextClassLoader();
+				final Thread thread = Thread.currentThread();
+				final ClassLoader originalClassLoader = thread.getContextClassLoader();
 				thread.setContextClassLoader(userClassLoader);
+
 				try {
 					command.run();
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ecac2e4..db22da6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -258,9 +258,10 @@ public class ExecutionGraphBuilder {
 					throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
 				}
 
-				Thread thread = Thread.currentThread();
-				ClassLoader originalClassLoader = thread.getContextClassLoader();
+				final Thread thread = Thread.currentThread();
+				final ClassLoader originalClassLoader = thread.getContextClassLoader();
 				thread.setContextClassLoader(classLoader);
+
 				try {
 					hooks = new ArrayList<>(hookFactories.length);
 					for (MasterTriggerRestoreHook.Factory factory : hookFactories) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index 3f8a48c..f4270dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.hooks;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -31,7 +32,6 @@ import java.util.concurrent.Executor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -39,7 +39,7 @@ import static org.mockito.Mockito.verify;
 /**
  * Tests for the MasterHooks utility class.
  */
-public class MasterHooksTest {
+public class MasterHooksTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  hook management
@@ -117,15 +117,10 @@ public class MasterHooksTest {
 
 	private static class TestExecutor implements Executor {
 		Runnable command;
+
 		@Override
 		public void execute(Runnable command) {
 			this.command = command;
 		}
 	}
-
-	private static <T> T mockGeneric(Class<?> clazz) {
-		@SuppressWarnings("unchecked")
-		Class<T> typedClass = (Class<T>) clazz;
-		return mock(typedClass);
-	}
 }