You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:52 UTC

[09/11] flink git commit: [FLINK-5928] [checkpoints] Add CheckpointCoordinatorExternalizedCheckpointsTest

[FLINK-5928] [checkpoints] Add CheckpointCoordinatorExternalizedCheckpointsTest

Problem: there were only unit tests for the checkpoint instances available
that don't test the behaviour of the checkpoint coordinator with respect
to externalized checkpoints.

This closes #3424


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c477d87c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c477d87c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c477d87c

Branch: refs/heads/master
Commit: c477d87c68f2da4340c8d469e1b4331e6a660ef0
Parents: 3446e66
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Feb 27 16:12:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 19:02:13 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/PendingCheckpoint.java   |  24 ++-
 .../checkpoint/savepoint/SavepointStore.java    |  47 ++++-
 ...tCoordinatorExternalizedCheckpointsTest.java | 197 +++++++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   2 +-
 .../savepoint/SavepointStoreTest.java           |  23 ++-
 5 files changed, 282 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2c392b8..6c9dbaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -210,6 +210,7 @@ public class PendingCheckpoint {
 	}
 
 	public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
+
 		synchronized (lock) {
 			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
 
@@ -222,10 +223,27 @@ public class PendingCheckpoint {
 				//            but the checkpoints think more generic. we need to work with file handles
 				//            here until the savepoint serializer accepts a generic stream factory
 
-				final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
-				final String externalPointer = metadataHandle.getFilePath().getParent().toString();
+				// We have this branch here, because savepoints and externalized checkpoints
+				// currently behave differently.
+				// Savepoints:
+				//   - Metadata file in unique directory
+				//   - External pointer can be the directory
+				// Externalized checkpoints:
+				//   - Multiple metadata files per directory possible (need to be unique)
+				//   - External pointer needs to be the file itself
+				//
+				// This should be unified as part of the JobManager metadata stream factories.
+				if (props.isSavepoint()) {
+					final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+					final String externalPointer = metadataHandle.getFilePath().getParent().toString();
+	
+					return finalizeInternal(metadataHandle, externalPointer);
+				} else {
+					final FileStateHandle metadataHandle = SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint);
+					final String externalPointer = metadataHandle.getFilePath().toString();
 
-				return finalizeInternal(metadataHandle, externalPointer);
+					return finalizeInternal(metadataHandle, externalPointer);
+				}
 			}
 			catch (Throwable t) {
 				onCompletionPromise.completeExceptionally(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 5c8ac6b..7beb1b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -60,7 +60,13 @@ public class SavepointStore {
 	/** Magic number for sanity checks against stored savepoints. */
 	public static final int MAGIC_NUMBER = 0x4960672d;
 
-	private static final String META_DATA_FILE = "_metadata ";
+	private static final String SAVEPOINT_METADATA_FILE = "_metadata";
+
+	/**
+	 * Metadata file for an externalized checkpoint, random suffix added
+	 * during store, because the parent directory is not unique.
+	 */
+	static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = "checkpoint_metadata-";
 
 	/**
 	 * Creates a savepoint directory.
@@ -122,7 +128,8 @@ public class SavepointStore {
 	 */
 	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
 		// write and create the file handle
-		FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint);
+		FileStateHandle metadataFileHandle = storeSavepointToHandle(directory,
+			SAVEPOINT_METADATA_FILE, savepoint);
 
 		// we return the savepoint directory path here!
 		// The directory path also works to resume from and is more elegant than the direct
@@ -135,19 +142,47 @@ public class SavepointStore {
 	 *
 	 * @param directory Target directory to store savepoint in
 	 * @param savepoint Savepoint to be stored
-	 *                     
+	 *
+	 * @return State handle to the checkpoint metadata
+	 * @throws IOException Failures during store are forwarded
+	 */
+	public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(String directory, T savepoint) throws IOException {
+		return storeSavepointToHandle(directory, SAVEPOINT_METADATA_FILE, savepoint);
+	}
+
+	/**
+	 * Stores the externalized checkpoint metadata file to a state handle.
+	 *
+	 * @param directory Target directory to store savepoint in
+	 * @param savepoint Savepoint to be stored
+	 *
+	 * @return State handle to the checkpoint metadata
+	 * @throws IOException Failures during store are forwarded
+	 */
+	public static <T extends Savepoint> FileStateHandle storeExternalizedCheckpointToHandle(String directory, T savepoint) throws IOException {
+		String fileName = FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE);
+		return storeSavepointToHandle(directory, fileName, savepoint);
+	}
+
+	/**
+	 * Stores the savepoint metadata file to a state handle.
+	 *
+	 * @param directory Target directory to store savepoint in
+	 * @param savepoint Savepoint to be stored
+	 *
 	 * @return State handle to the checkpoint metadata
 	 * @throws IOException Failures during store are forwarded
 	 */
-	public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
+	static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
 			String directory,
+			String filename,
 			T savepoint) throws IOException {
 
 		checkNotNull(directory, "Target directory");
 		checkNotNull(savepoint, "Savepoint");
 
 		final Path basePath = new Path(directory);
-		final Path metadataFilePath = new Path(basePath, META_DATA_FILE);
+		final Path metadataFilePath = new Path(basePath, filename);
 
 		final FileSystem fs = FileSystem.get(basePath.toUri());
 
@@ -219,7 +254,7 @@ public class SavepointStore {
 
 		// If this is a directory, we need to find the meta data file
 		if (status.isDir()) {
-			Path candidatePath = new Path(path, META_DATA_FILE);
+			Path candidatePath = new Path(path, SAVEPOINT_METADATA_FILE);
 			if (fs.exists(candidatePath)) {
 				path = candidatePath;
 				LOG.info("Using savepoint file in {}", path);

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
new file mode 100644
index 0000000..9f94f2f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * CheckpointCoordinator tests for externalized checkpoints.
+ *
+ * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
+ * test is already huge and covers many different configurations.
+ */
+public class CheckpointCoordinatorExternalizedCheckpointsTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	/**
+	 * Triggers multiple externalized checkpoints and verifies that the metadata
+	 * files have been created.
+	 */
+	@Test
+	public void testTriggerAndConfirmSimpleExternalizedCheckpoint()
+		throws Exception {
+		final JobID jid = new JobID();
+
+		final ExternalizedCheckpointSettings externalizedCheckpointSettings =
+			ExternalizedCheckpointSettings.externalizeCheckpoints(false);
+
+		final File checkpointDir = tmp.newFolder();
+
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
+		ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
+
+		Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>();
+		jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
+		jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jid,
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			externalizedCheckpointSettings,
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new ExecutionVertex[] { vertex1, vertex2 },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			checkpointDir.getAbsolutePath(),
+			Executors.directExecutor());
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+		// ---------------
+		// trigger checkpoint 1
+		// ---------------
+
+		{
+			final long timestamp1 = System.currentTimeMillis();
+
+			coord.triggerCheckpoint(timestamp1, false);
+
+			long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next()
+				.getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
+
+			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+
+			verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
+			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+		}
+
+		// ---------------
+		// trigger checkpoint 2
+		// ---------------
+
+		{
+			final long timestamp2 = System.currentTimeMillis() + 7;
+			coord.triggerCheckpoint(timestamp2, false);
+
+			long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+
+			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+			verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
+			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+		}
+
+		// ---------------
+		// trigger checkpoint 3
+		// ---------------
+
+		{
+			final long timestamp3 = System.currentTimeMillis() + 146;
+			coord.triggerCheckpoint(timestamp3, false);
+
+			long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+			CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+			verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
+			verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+		}
+
+		coord.shutdown(JobStatus.FINISHED);
+	}
+
+	/**
+	 * Verifies an externalized completed checkpoint instance.
+	 *
+	 * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the
+	 * external pointer and external metadata need to be notNull and exist (currently
+	 * assuming that they are file system based).
+	 *
+	 * @param checkpoint Completed checkpoint to check.
+	 * @param jid JobID of the job the checkpoint belongs to.
+	 * @param checkpointId Checkpoint ID of the checkpoint to check.
+	 * @param timestamp Timestamp of the checkpoint to check.
+	 */
+	private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
+		assertEquals(jid, checkpoint.getJobId());
+		assertEquals(checkpointId, checkpoint.getCheckpointID());
+		assertEquals(timestamp, checkpoint.getTimestamp());
+		assertNotNull(checkpoint.getExternalPointer());
+		assertNotNull(checkpoint.getExternalizedMetadata());
+		FileStateHandle fsHandle = (FileStateHandle) checkpoint.getExternalizedMetadata();
+		assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
+	}
+
+	private static void verifyExternalizedCheckpointRestore(
+			CompletedCheckpoint checkpoint,
+			Map<JobVertexID, ExecutionJobVertex> jobVertices,
+			ExecutionVertex... vertices) throws IOException {
+
+		CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(
+				checkpoint.getJobId(),
+				jobVertices,
+				checkpoint.getExternalPointer(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+
+		for (ExecutionVertex vertex : vertices) {
+			assertEquals(checkpoint.getTaskState(vertex.getJobvertexId()), loaded.getTaskState(vertex.getJobvertexId()));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d8e46fa..1691370 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2586,7 +2586,7 @@ public class CheckpointCoordinatorTest {
 		return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
 	}
 
-	private static ExecutionJobVertex mockExecutionJobVertex(
+	static ExecutionJobVertex mockExecutionJobVertex(
 		JobVertexID jobVertexID,
 		int parallelism,
 		int maxParallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index dc19e47..1eb8055 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import java.io.File;
-import java.util.Arrays;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -189,6 +189,27 @@ public class SavepointStoreTest {
 		assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length);
 	}
 
+	/**
+	 * Tests that multiple externalized checkpoints can be stored to the same
+	 * directory.
+	 */
+	@Test
+	public void testStoreExternalizedCheckpointsToSameDirectory() throws Exception {
+		String root = tmp.newFolder().getAbsolutePath();
+		FileSystem fs = FileSystem.get(new Path(root).toUri());
+
+		// Store
+		SavepointV1 savepoint = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
+
+		FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
+		fs.exists(store1.getFilePath());
+		assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
+
+		FileStateHandle store2 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
+		fs.exists(store2.getFilePath());
+		assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
+	}
+
 	private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> {
 
 		private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();