You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:52 UTC
[09/11] flink git commit: [FLINK-5928] [checkpoints] Add
CheckpointCoordinatorExternalizedCheckpointsTest
[FLINK-5928] [checkpoints] Add CheckpointCoordinatorExternalizedCheckpointsTest
Problem: there were only unit tests for the checkpoint instances available
that don't test the behaviour of the checkpoint coordinator with respect
to externalized checkpoints.
This closes #3424
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c477d87c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c477d87c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c477d87c
Branch: refs/heads/master
Commit: c477d87c68f2da4340c8d469e1b4331e6a660ef0
Parents: 3446e66
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Feb 27 16:12:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 19:02:13 2017 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/PendingCheckpoint.java | 24 ++-
.../checkpoint/savepoint/SavepointStore.java | 47 ++++-
...tCoordinatorExternalizedCheckpointsTest.java | 197 +++++++++++++++++++
.../checkpoint/CheckpointCoordinatorTest.java | 2 +-
.../savepoint/SavepointStoreTest.java | 23 ++-
5 files changed, 282 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2c392b8..6c9dbaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -210,6 +210,7 @@ public class PendingCheckpoint {
}
public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
+
synchronized (lock) {
checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
@@ -222,10 +223,27 @@ public class PendingCheckpoint {
// but the checkpoints think more generic. we need to work with file handles
// here until the savepoint serializer accepts a generic stream factory
- final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
- final String externalPointer = metadataHandle.getFilePath().getParent().toString();
+ // We have this branch here, because savepoints and externalized checkpoints
+ // currently behave differently.
+ // Savepoints:
+ // - Metadata file in unique directory
+ // - External pointer can be the directory
+ // Externalized checkpoints:
+ // - Multiple metadata files per directory possible (need to be unique)
+ // - External pointer needs to be the file itself
+ //
+ // This should be unified as part of the JobManager metadata stream factories.
+ if (props.isSavepoint()) {
+ final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+ final String externalPointer = metadataHandle.getFilePath().getParent().toString();
+
+ return finalizeInternal(metadataHandle, externalPointer);
+ } else {
+ final FileStateHandle metadataHandle = SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint);
+ final String externalPointer = metadataHandle.getFilePath().toString();
- return finalizeInternal(metadataHandle, externalPointer);
+ return finalizeInternal(metadataHandle, externalPointer);
+ }
}
catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 5c8ac6b..7beb1b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -60,7 +60,13 @@ public class SavepointStore {
/** Magic number for sanity checks against stored savepoints. */
public static final int MAGIC_NUMBER = 0x4960672d;
- private static final String META_DATA_FILE = "_metadata ";
+ private static final String SAVEPOINT_METADATA_FILE = "_metadata";
+
+ /**
+ * Metadata file for an externalized checkpoint, random suffix added
+ * during store, because the parent directory is not unique.
+ */
+ static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = "checkpoint_metadata-";
/**
* Creates a savepoint directory.
@@ -122,7 +128,8 @@ public class SavepointStore {
*/
public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
// write and create the file handle
- FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint);
+ FileStateHandle metadataFileHandle = storeSavepointToHandle(directory,
+ SAVEPOINT_METADATA_FILE, savepoint);
// we return the savepoint directory path here!
// The directory path also works to resume from and is more elegant than the direct
@@ -135,19 +142,47 @@ public class SavepointStore {
*
* @param directory Target directory to store savepoint in
* @param savepoint Savepoint to be stored
- *
+ *
+ * @return State handle to the checkpoint metadata
+ * @throws IOException Failures during store are forwarded
+ */
+ public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(String directory, T savepoint) throws IOException {
+ return storeSavepointToHandle(directory, SAVEPOINT_METADATA_FILE, savepoint);
+ }
+
+ /**
+ * Stores the externalized checkpoint metadata file to a state handle.
+ *
+ * @param directory Target directory to store savepoint in
+ * @param savepoint Savepoint to be stored
+ *
+ * @return State handle to the checkpoint metadata
+ * @throws IOException Failures during store are forwarded
+ */
+ public static <T extends Savepoint> FileStateHandle storeExternalizedCheckpointToHandle(String directory, T savepoint) throws IOException {
+ String fileName = FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE);
+ return storeSavepointToHandle(directory, fileName, savepoint);
+ }
+
+ /**
+ * Stores the savepoint metadata file to a state handle.
+ *
+ * @param directory Target directory to store savepoint in
+ * @param savepoint Savepoint to be stored
+ *
* @return State handle to the checkpoint metadata
* @throws IOException Failures during store are forwarded
*/
- public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
+ static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
String directory,
+ String filename,
T savepoint) throws IOException {
checkNotNull(directory, "Target directory");
checkNotNull(savepoint, "Savepoint");
final Path basePath = new Path(directory);
- final Path metadataFilePath = new Path(basePath, META_DATA_FILE);
+ final Path metadataFilePath = new Path(basePath, filename);
final FileSystem fs = FileSystem.get(basePath.toUri());
@@ -219,7 +254,7 @@ public class SavepointStore {
// If this is a directory, we need to find the meta data file
if (status.isDir()) {
- Path candidatePath = new Path(path, META_DATA_FILE);
+ Path candidatePath = new Path(path, SAVEPOINT_METADATA_FILE);
if (fs.exists(candidatePath)) {
path = candidatePath;
LOG.info("Using savepoint file in {}", path);
http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
new file mode 100644
index 0000000..9f94f2f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * CheckpointCoordinator tests for externalized checkpoints.
+ *
+ * <p>This is separate from {@link CheckpointCoordinatorTest}, because that
+ * test is already huge and covers many different configurations.
+ */
+public class CheckpointCoordinatorExternalizedCheckpointsTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Triggers multiple externalized checkpoints and verifies that the metadata
+ * files have been created.
+ */
+ @Test
+ public void testTriggerAndConfirmSimpleExternalizedCheckpoint()
+ throws Exception {
+ final JobID jid = new JobID();
+
+ final ExternalizedCheckpointSettings externalizedCheckpointSettings =
+ ExternalizedCheckpointSettings.externalizeCheckpoints(false);
+
+ final File checkpointDir = tmp.newFolder();
+
+ // create some mock Execution vertices that receive the checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
+ ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
+
+ Map<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<>();
+ jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
+ jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ externalizedCheckpointSettings,
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new ExecutionVertex[] { vertex1, vertex2 },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ checkpointDir.getAbsolutePath(),
+ Executors.directExecutor());
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+ // ---------------
+ // trigger checkpoint 1
+ // ---------------
+
+ {
+ final long timestamp1 = System.currentTimeMillis();
+
+ coord.triggerCheckpoint(timestamp1, false);
+
+ long checkpointId1 = coord.getPendingCheckpoints().entrySet().iterator().next()
+ .getKey();
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
+
+ CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+
+ verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
+ verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+ }
+
+ // ---------------
+ // trigger checkpoint 2
+ // ---------------
+
+ {
+ final long timestamp2 = System.currentTimeMillis() + 7;
+ coord.triggerCheckpoint(timestamp2, false);
+
+ long checkpointId2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
+
+ CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+ verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
+ verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+ }
+
+ // ---------------
+ // trigger checkpoint 3
+ // ---------------
+
+ {
+ final long timestamp3 = System.currentTimeMillis() + 146;
+ coord.triggerCheckpoint(timestamp3, false);
+
+ long checkpointId3 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+ CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
+ verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
+ verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
+ }
+
+ coord.shutdown(JobStatus.FINISHED);
+ }
+
+ /**
+ * Verifies an externalized completed checkpoint instance.
+ *
+ * <p>The provided JobID, checkpoint ID, timestamp need to match. Also, the
+ * external pointer and external metadata need to be notNull and exist (currently
+ * assuming that they are file system based).
+ *
+ * @param checkpoint Completed checkpoint to check.
+ * @param jid JobID of the job the checkpoint belongs to.
+ * @param checkpointId Checkpoint ID of the checkpoint to check.
+ * @param timestamp Timestamp of the checkpoint to check.
+ */
+ private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
+ assertEquals(jid, checkpoint.getJobId());
+ assertEquals(checkpointId, checkpoint.getCheckpointID());
+ assertEquals(timestamp, checkpoint.getTimestamp());
+ assertNotNull(checkpoint.getExternalPointer());
+ assertNotNull(checkpoint.getExternalizedMetadata());
+ FileStateHandle fsHandle = (FileStateHandle) checkpoint.getExternalizedMetadata();
+ assertTrue(new File(fsHandle.getFilePath().getPath()).exists());
+ }
+
+ private static void verifyExternalizedCheckpointRestore(
+ CompletedCheckpoint checkpoint,
+ Map<JobVertexID, ExecutionJobVertex> jobVertices,
+ ExecutionVertex... vertices) throws IOException {
+
+ CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(
+ checkpoint.getJobId(),
+ jobVertices,
+ checkpoint.getExternalPointer(),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+
+ for (ExecutionVertex vertex : vertices) {
+ assertEquals(checkpoint.getTaskState(vertex.getJobvertexId()), loaded.getTaskState(vertex.getJobvertexId()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d8e46fa..1691370 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2586,7 +2586,7 @@ public class CheckpointCoordinatorTest {
return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
}
- private static ExecutionJobVertex mockExecutionJobVertex(
+ static ExecutionJobVertex mockExecutionJobVertex(
JobVertexID jobVertexID,
int parallelism,
int maxParallelism) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c477d87c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index dc19e47..1eb8055 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -19,12 +19,12 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import java.io.File;
-import java.util.Arrays;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -189,6 +189,27 @@ public class SavepointStoreTest {
assertEquals("Savepoint file not cleaned up on failure", 0, tmp.getRoot().listFiles().length);
}
+ /**
+ * Tests that multiple externalized checkpoints can be stored to the same
+ * directory.
+ */
+ @Test
+ public void testStoreExternalizedCheckpointsToSameDirectory() throws Exception {
+ String root = tmp.newFolder().getAbsolutePath();
+ FileSystem fs = FileSystem.get(new Path(root).toUri());
+
+ // Store
+ SavepointV1 savepoint = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
+
+ FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
+ fs.exists(store1.getFilePath());
+ assertTrue(store1.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
+
+ FileStateHandle store2 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
+ fs.exists(store2.getFilePath());
+ assertTrue(store2.getFilePath().getPath().contains(SavepointStore.EXTERNALIZED_CHECKPOINT_METADATA_FILE));
+ }
+
private static class NewSavepointSerializer implements SavepointSerializer<TestSavepoint> {
private static final NewSavepointSerializer INSTANCE = new NewSavepointSerializer();