You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/09 07:53:37 UTC
[2/2] flink git commit: [FLINK-8533] [checkpointing] Support
MasterTriggerRestoreHook state reinitialization
[FLINK-8533] [checkpointing] Support MasterTriggerRestoreHook state reinitialization
- ensure that JobManager calls restoreLatestCheckpointedState unconditionally
- add close method to hook interface
- fix test failure in `JobManagerStartupTest` (Windows only)
This closes #5427
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af29172b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af29172b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af29172b
Branch: refs/heads/master
Commit: af29172bf555558e579a92f0ee3e56ed76fd7a1a
Parents: d6d2c36
Author: Eron Wright <er...@gmail.com>
Authored: Thu Mar 29 15:21:00 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 9 09:53:01 2018 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 7 ++
.../checkpoint/MasterTriggerRestoreHook.java | 25 ++++++-
.../runtime/checkpoint/hooks/MasterHooks.java | 79 +++++++++++++++++++-
.../CheckpointCoordinatorMasterHooksTest.java | 37 ++++++++-
.../checkpoint/hooks/MasterHooksTest.java | 15 ++++
.../jobmanager/JobManagerStartupTest.java | 4 +-
.../WithMasterCheckpointHookConfigTest.java | 10 +++
7 files changed, 172 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4ddac003..b3ce577 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -321,6 +321,10 @@ public class CheckpointCoordinator {
periodicScheduling = false;
triggerRequestQueued = false;
+ // shut down the hooks
+ MasterHooks.close(masterHooks.values(), LOG);
+ masterHooks.clear();
+
// shut down the thread that handles the timeouts and pending triggers
timer.shutdownNow();
@@ -1021,6 +1025,9 @@ public class CheckpointCoordinator {
if (errorIfNoCheckpoint) {
throw new IllegalStateException("No completed checkpoint available");
} else {
+ LOG.debug("Resetting the master hooks.");
+ MasterHooks.reset(masterHooks.values(), LOG);
+
return false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
index 629ff9b..4476d06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -36,7 +36,11 @@ import java.util.concurrent.Executor;
* method. The hook's {@link #getIdentifier() identifier} is used to map data to hook in the presence
* of multiple hooks, and when resuming a savepoint that was potentially created by a different job.
* The identifier has a similar role as for example the operator UID in the streaming API.
- *
+ *
+ * <p>It is possible that a job fails (and is subsequently restarted) before any checkpoints were successful.
+ * In that situation, the checkpoint coordination calls {@link #reset()} to give the hook an
+ * opportunity to, for example, reset an external system to initial conditions.
+ *
* <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow graph. It is attached
* to the job graph, which gets sent to the cluster for execution. To avoid having to make the hook
* itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}.
@@ -64,6 +68,25 @@ public interface MasterTriggerRestoreHook<T> {
String getIdentifier();
/**
+ * This method is called by the checkpoint coordinator to reset the hook when
+ * execution is restarted in the absence of any checkpoint state.
+ *
+ * @throws Exception Exceptions encountered when calling the hook will cause execution to fail.
+ */
+ default void reset() throws Exception {
+
+ }
+
+ /**
+ * Tear-down method for the hook.
+ *
+ * @throws Exception Exceptions encountered when calling close will be logged.
+ */
+ default void close() throws Exception {
+
+ }
+
+ /**
* This method is called by the checkpoint coordinator prior when triggering a checkpoint, prior
* to sending the "trigger checkpoint" messages to the source tasks.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 92504bb..60ba9b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -25,11 +25,12 @@ import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -46,6 +47,54 @@ import java.util.concurrent.TimeoutException;
public class MasterHooks {
// ------------------------------------------------------------------------
+ // lifecycle
+ // ------------------------------------------------------------------------
+
+ /**
+ * Resets the master hooks.
+ *
+ * @param hooks The hooks to reset
+ *
+ * @throws FlinkException Thrown, if the hooks throw an exception.
+ */
+ public static void reset(
+ Collection<MasterTriggerRestoreHook<?>> hooks,
+ final Logger log) throws FlinkException {
+
+ for (MasterTriggerRestoreHook<?> hook : hooks) {
+ final String id = hook.getIdentifier();
+ try {
+ hook.reset();
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ throw new FlinkException("Error while resetting checkpoint master hook '" + id + '\'', t);
+ }
+ }
+ }
+
+ /**
+ * Closes the master hooks.
+ *
+ * @param hooks The hooks to close
+ *
+ * @throws FlinkException Thrown, if the hooks throw an exception.
+ */
+ public static void close(
+ Collection<MasterTriggerRestoreHook<?>> hooks,
+ final Logger log) throws FlinkException {
+
+ for (MasterTriggerRestoreHook<?> hook : hooks) {
+ try {
+ hook.close();
+ }
+ catch (Throwable t) {
+ log.warn("Failed to cleanly close a checkpoint master hook (" + hook.getIdentifier() + ")", t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
// checkpoint triggering
// ------------------------------------------------------------------------
@@ -292,6 +341,34 @@ public class MasterHooks {
}
@Override
+ public void reset() throws Exception {
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+
+ try {
+ hook.reset();
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ final Thread thread = Thread.currentThread();
+ final ClassLoader originalClassLoader = thread.getContextClassLoader();
+ thread.setContextClassLoader(userClassLoader);
+
+ try {
+ hook.close();
+ }
+ finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Override
public String getIdentifier() {
final Thread thread = Thread.currentThread();
final ClassLoader originalClassLoader = thread.getContextClassLoader();
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 3b345da..f644c01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -24,15 +24,15 @@ import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.junit.Test;
+import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -120,6 +120,39 @@ public class CheckpointCoordinatorMasterHooksTest {
} catch (IllegalArgumentException ignored) {}
}
+ @Test
+ public void testHookReset() throws Exception {
+ final String id1 = "id1";
+ final String id2 = "id2";
+
+ final MasterTriggerRestoreHook<String> hook1 = mockGeneric(MasterTriggerRestoreHook.class);
+ when(hook1.getIdentifier()).thenReturn(id1);
+ final MasterTriggerRestoreHook<String> hook2 = mockGeneric(MasterTriggerRestoreHook.class);
+ when(hook2.getIdentifier()).thenReturn(id2);
+
+ // create the checkpoint coordinator
+ final JobID jid = new JobID();
+ final ExecutionAttemptID execId = new ExecutionAttemptID();
+ final ExecutionVertex ackVertex = mockExecutionVertex(execId);
+ final CheckpointCoordinator cc = instantiateCheckpointCoordinator(jid, ackVertex);
+
+ cc.addMasterHook(hook1);
+ cc.addMasterHook(hook2);
+
+ // initialize the hooks
+ cc.restoreLatestCheckpointedState(
+ Collections.<JobVertexID, ExecutionJobVertex>emptyMap(),
+ false,
+ false);
+ verify(hook1, times(1)).reset();
+ verify(hook2, times(1)).reset();
+
+ // shutdown
+ cc.shutdown(JobStatus.CANCELED);
+ verify(hook1, times(1)).close();
+ verify(hook2, times(1)).close();
+ }
+
// ------------------------------------------------------------------------
// trigger / restore behavior
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index ccc8afe..4daaf87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -69,6 +69,16 @@ public class MasterHooksTest extends TestLogger {
return id;
}
+ @Override
+ public void reset() throws Exception {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ }
+
+ @Override
+ public void close() throws Exception {
+ assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+ }
+
@Nullable
@Override
public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
@@ -115,6 +125,11 @@ public class MasterHooksTest extends TestLogger {
wrapped.createCheckpointDataSerializer();
verify(hook, times(1)).createCheckpointDataSerializer();
assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+ // verify close
+ wrapped.close();
+ verify(hook, times(1)).close();
+ assertEquals(originalClassLoader, thread.getContextClassLoader());
}
private static class TestExecutor implements Executor {
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 838b124..e535415 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -70,7 +70,9 @@ public class JobManagerStartupTest extends TestLogger {
@After
public void after() {
// Cleanup test directory
- assertTrue(blobStorageDirectory.delete());
+ if (blobStorageDirectory != null) {
+ assertTrue(blobStorageDirectory.delete());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
index 2585ef5..7536044 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -116,6 +116,16 @@ public class WithMasterCheckpointHookConfigTest extends TestLogger {
}
@Override
+ public void reset() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
throw new UnsupportedOperationException();
}