You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 13:21:57 UTC
[2/2] flink git commit: [FLINK-6606] Hide WrapperMasterHook by making
it private
[FLINK-6606] Hide WrapperMasterHook by making it private
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6a596fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6a596fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6a596fe
Branch: refs/heads/release-1.3
Commit: f6a596fe3f8924602c34636959e3d7e48cb2262f
Parents: 3dba48e
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 19 15:06:35 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 19 15:21:24 2017 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/hooks/MasterHooks.java | 41 +++++++++++---------
.../executiongraph/ExecutionGraphBuilder.java | 5 ++-
.../checkpoint/hooks/MasterHooksTest.java | 11 ++----
3 files changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 737e816..1851eb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.checkpoint.hooks;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import javax.annotation.Nullable;
@@ -278,25 +278,25 @@ public class MasterHooks {
* @param userClassLoader the classloader to use
*/
public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
- return new WrappedMasterHook<T>(hook, userClassLoader);
+ return new WrappedMasterHook<>(hook, userClassLoader);
}
- @VisibleForTesting
- static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
+ private static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
private final MasterTriggerRestoreHook<T> hook;
private final ClassLoader userClassLoader;
WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
- this.hook = hook;
- this.userClassLoader = userClassLoader;
+ this.hook = Preconditions.checkNotNull(hook);
+ this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
}
@Override
public String getIdentifier() {
- Thread thread = Thread.currentThread();
- ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.getIdentifier();
}
@@ -315,9 +315,10 @@ public class MasterHooks {
}
};
- Thread thread = Thread.currentThread();
- ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor);
}
@@ -328,9 +329,10 @@ public class MasterHooks {
@Override
public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception {
- Thread thread = Thread.currentThread();
- ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
hook.restoreCheckpoint(checkpointId, checkpointData);
}
@@ -342,9 +344,10 @@ public class MasterHooks {
@Nullable
@Override
public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
- Thread thread = Thread.currentThread();
- ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.createCheckpointDataSerializer();
}
@@ -353,17 +356,19 @@ public class MasterHooks {
}
}
- class WrappedCommand implements Runnable {
+ private class WrappedCommand implements Runnable {
private final Runnable command;
+
WrappedCommand(Runnable command) {
- this.command = command;
+ this.command = Preconditions.checkNotNull(command);
}
@Override
public void run() {
- Thread thread = Thread.currentThread();
- ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
command.run();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ecac2e4..db22da6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -258,9 +258,10 @@ public class ExecutionGraphBuilder {
throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
}
- Thread thread = Thread.currentThread();
- ClassLoader originalClassLoader = thread.getContextClassLoader();
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(classLoader);
+
try {
hooks = new ArrayList<>(hookFactories.length);
for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index 3f8a48c..f4270dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint.hooks;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import javax.annotation.Nullable;
@@ -31,7 +32,6 @@ import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -39,7 +39,7 @@ import static org.mockito.Mockito.verify;
/**
* Tests for the MasterHooks utility class.
*/
-public class MasterHooksTest {
+public class MasterHooksTest extends TestLogger {
// ------------------------------------------------------------------------
// hook management
@@ -117,15 +117,10 @@ public class MasterHooksTest {
private static class TestExecutor implements Executor {
Runnable command;
+
@Override
public void execute(Runnable command) {
this.command = command;
}
}
-
- private static <T> T mockGeneric(Class<?> clazz) {
- @SuppressWarnings("unchecked")
- Class<T> typedClass = (Class<T>) clazz;
- return mock(typedClass);
- }
}