You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/09 07:53:37 UTC

[2/2] flink git commit: [FLINK-8533] [checkpointing] Support MasterTriggerRestoreHook state reinitialization

[FLINK-8533] [checkpointing] Support MasterTriggerRestoreHook state reinitialization

- ensure that JobManager calls restoreLatestCheckpointedState unconditionally
- add close method to hook interface
- fix test failure in `JobManagerStartupTest` (Windows only)

This closes #5427


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af29172b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af29172b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af29172b

Branch: refs/heads/master
Commit: af29172bf555558e579a92f0ee3e56ed76fd7a1a
Parents: d6d2c36
Author: Eron Wright <er...@gmail.com>
Authored: Thu Mar 29 15:21:00 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 9 09:53:01 2018 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  7 ++
 .../checkpoint/MasterTriggerRestoreHook.java    | 25 ++++++-
 .../runtime/checkpoint/hooks/MasterHooks.java   | 79 +++++++++++++++++++-
 .../CheckpointCoordinatorMasterHooksTest.java   | 37 ++++++++-
 .../checkpoint/hooks/MasterHooksTest.java       | 15 ++++
 .../jobmanager/JobManagerStartupTest.java       |  4 +-
 .../WithMasterCheckpointHookConfigTest.java     | 10 +++
 7 files changed, 172 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4ddac003..b3ce577 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -321,6 +321,10 @@ public class CheckpointCoordinator {
 				periodicScheduling = false;
 				triggerRequestQueued = false;
 
+				// shut down the hooks
+				MasterHooks.close(masterHooks.values(), LOG);
+				masterHooks.clear();
+
 				// shut down the thread that handles the timeouts and pending triggers
 				timer.shutdownNow();
 
@@ -1021,6 +1025,9 @@ public class CheckpointCoordinator {
 				if (errorIfNoCheckpoint) {
 					throw new IllegalStateException("No completed checkpoint available");
 				} else {
+					LOG.debug("Resetting the master hooks.");
+					MasterHooks.reset(masterHooks.values(), LOG);
+
 					return false;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
index 629ff9b..4476d06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -36,7 +36,11 @@ import java.util.concurrent.Executor;
  * method. The hook's {@link #getIdentifier() identifier} is used to map data to hook in the presence
  * of multiple hooks, and when resuming a savepoint that was potentially created by a different job.
  * The identifier has a similar role as for example the operator UID in the streaming API.
- * 
+ *
+ * <p>It is possible that a job fails (and is subsequently restarted) before any checkpoints were successful.
+ * In that situation, the checkpoint coordination calls {@link #reset()} to give the hook an
+ * opportunity to, for example, reset an external system to initial conditions.
+ *
  * <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow graph. It is attached
  * to the job graph, which gets sent to the cluster for execution. To avoid having to make the hook
  * itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}.
@@ -64,6 +68,25 @@ public interface MasterTriggerRestoreHook<T> {
 	String getIdentifier();
 
 	/**
+	 * This method is called by the checkpoint coordinator to reset the hook when
+	 * execution is restarted in the absence of any checkpoint state.
+	 *
+	 * @throws Exception Exceptions encountered when calling the hook will cause execution to fail.
+	 */
+	default void reset() throws Exception {
+
+	}
+
+	/**
+	 * Tear-down method for the hook.
+	 *
+	 * @throws Exception Exceptions encountered when calling close will be logged.
+	 */
+	default void close() throws Exception {
+
+	}
+
+	/**
 	 * This method is called by the checkpoint coordinator prior when triggering a checkpoint, prior
 	 * to sending the "trigger checkpoint" messages to the source tasks.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 92504bb..60ba9b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -25,11 +25,12 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -46,6 +47,54 @@ import java.util.concurrent.TimeoutException;
 public class MasterHooks {
 
 	// ------------------------------------------------------------------------
+	//  lifecycle
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Resets the master hooks.
+	 *
+	 * @param hooks The hooks to reset
+	 *
+	 * @throws FlinkException Thrown, if the hooks throw an exception.
+	 */
+	public static void reset(
+		Collection<MasterTriggerRestoreHook<?>> hooks,
+		final Logger log) throws FlinkException {
+
+		for (MasterTriggerRestoreHook<?> hook : hooks) {
+			final String id = hook.getIdentifier();
+			try {
+				hook.reset();
+			}
+			catch (Throwable t) {
+				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+				throw new FlinkException("Error while resetting checkpoint master hook '" + id + '\'', t);
+			}
+		}
+	}
+
+	/**
+	 * Closes the master hooks.
+	 *
+	 * @param hooks The hooks to close
+	 *
+	 * @throws FlinkException Thrown, if the hooks throw an exception.
+	 */
+	public static void close(
+		Collection<MasterTriggerRestoreHook<?>> hooks,
+		final Logger log) throws FlinkException {
+
+		for (MasterTriggerRestoreHook<?> hook : hooks) {
+			try {
+				hook.close();
+			}
+			catch (Throwable t) {
+				log.warn("Failed to cleanly close a checkpoint master hook (" + hook.getIdentifier() + ")", t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  checkpoint triggering
 	// ------------------------------------------------------------------------
 
@@ -292,6 +341,34 @@ public class MasterHooks {
 		}
 
 		@Override
+		public void reset() throws Exception {
+			final Thread thread = Thread.currentThread();
+			final ClassLoader originalClassLoader = thread.getContextClassLoader();
+			thread.setContextClassLoader(userClassLoader);
+
+			try {
+				hook.reset();
+			}
+			finally {
+				thread.setContextClassLoader(originalClassLoader);
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			final Thread thread = Thread.currentThread();
+			final ClassLoader originalClassLoader = thread.getContextClassLoader();
+			thread.setContextClassLoader(userClassLoader);
+
+			try {
+				hook.close();
+			}
+			finally {
+				thread.setContextClassLoader(originalClassLoader);
+			}
+		}
+
+		@Override
 		public String getIdentifier() {
 			final Thread thread = Thread.currentThread();
 			final ClassLoader originalClassLoader = thread.getContextClassLoader();

http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 3b345da..f644c01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -24,15 +24,15 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.junit.Test;
 
+import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -120,6 +120,39 @@ public class CheckpointCoordinatorMasterHooksTest {
 		} catch (IllegalArgumentException ignored) {}
 	}
 
+	@Test
+	public void testHookReset() throws Exception {
+		final String id1 = "id1";
+		final String id2 = "id2";
+
+		final MasterTriggerRestoreHook<String> hook1 = mockGeneric(MasterTriggerRestoreHook.class);
+		when(hook1.getIdentifier()).thenReturn(id1);
+		final MasterTriggerRestoreHook<String> hook2 = mockGeneric(MasterTriggerRestoreHook.class);
+		when(hook2.getIdentifier()).thenReturn(id2);
+
+		// create the checkpoint coordinator
+		final JobID jid = new JobID();
+		final ExecutionAttemptID execId = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex = mockExecutionVertex(execId);
+		final CheckpointCoordinator cc = instantiateCheckpointCoordinator(jid, ackVertex);
+
+		cc.addMasterHook(hook1);
+		cc.addMasterHook(hook2);
+
+		// initialize the hooks
+		cc.restoreLatestCheckpointedState(
+			Collections.<JobVertexID, ExecutionJobVertex>emptyMap(),
+			false,
+			false);
+		verify(hook1, times(1)).reset();
+		verify(hook2, times(1)).reset();
+
+		// shutdown
+		cc.shutdown(JobStatus.CANCELED);
+		verify(hook1, times(1)).close();
+		verify(hook2, times(1)).close();
+	}
+
 	// ------------------------------------------------------------------------
 	//  trigger / restore behavior
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
index ccc8afe..4daaf87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooksTest.java
@@ -69,6 +69,16 @@ public class MasterHooksTest extends TestLogger {
 				return id;
 			}
 
+			@Override
+			public void reset() throws Exception {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+			}
+
+			@Override
+			public void close() throws Exception {
+				assertEquals(userClassLoader, Thread.currentThread().getContextClassLoader());
+			}
+
 			@Nullable
 			@Override
 			public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
@@ -115,6 +125,11 @@ public class MasterHooksTest extends TestLogger {
 		wrapped.createCheckpointDataSerializer();
 		verify(hook, times(1)).createCheckpointDataSerializer();
 		assertEquals(originalClassLoader, thread.getContextClassLoader());
+
+		// verify close
+		wrapped.close();
+		verify(hook, times(1)).close();
+		assertEquals(originalClassLoader, thread.getContextClassLoader());
 	}
 
 	private static class TestExecutor implements Executor {

http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 838b124..e535415 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -70,7 +70,9 @@ public class JobManagerStartupTest extends TestLogger {
 	@After
 	public void after() {
 		// Cleanup test directory
-		assertTrue(blobStorageDirectory.delete());
+		if (blobStorageDirectory != null) {
+			assertTrue(blobStorageDirectory.delete());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/af29172b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
index 2585ef5..7536044 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -116,6 +116,16 @@ public class WithMasterCheckpointHookConfigTest extends TestLogger {
 		}
 
 		@Override
+		public void reset() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void close() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
 			throw new UnsupportedOperationException();
 		}