You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/23 08:25:39 UTC

flink git commit: [FLINK-6328] [chkPts] Don't add savepoints to CompletedCheckpointStore

Repository: flink
Updated Branches:
  refs/heads/master 12b4185c6 -> 6f570e7b6


[FLINK-6328] [chkPts] Don't add savepoints to CompletedCheckpointStore

The lifecycle of savepoints is not managed by the CheckpointCoordinator and fully
in the hand of the user. Therefore, the CheckpointCoordinator cannot rely on them
when trying to recover from failures. E.g. a user moving a savepoint shortly before
a failure could completely break Flink's recovery mechanism because Flink cannot
skip failed checkpoints when recovering.

Therefore, until Flink is able to skip failed checkpoints when recovering, we should
not add savepoints to the CompletedCheckpointStore which is used to retrieve checkpoint
for recovery. The distinction of a savepoint is done on the basis of the
CheckpointProperties (CheckpointProperties.STANDARD_SAVEPOINT).

This closes #3965.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f570e7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f570e7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f570e7b

Branch: refs/heads/master
Commit: 6f570e7b6810e1645a4f7094f17ab9e8559fa139
Parents: 12b4185
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 22 17:41:14 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 23 10:24:20 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  39 ++++---
 .../checkpoint/CheckpointProperties.java        |   1 -
 .../checkpoint/CheckpointCoordinatorTest.java   | 117 +++++++++++++++++--
 3 files changed, 126 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f570e7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e224780..5201d43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -864,22 +864,28 @@ public class CheckpointCoordinator {
 			// the pending checkpoint must be discarded after the finalization
 			Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
 
-			try {
-				completedCheckpointStore.addCheckpoint(completedCheckpoint);
-			} catch (Exception exception) {
-				// we failed to store the completed checkpoint. Let's clean up
-				executor.execute(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							completedCheckpoint.discardOnFailedStoring();
-						} catch (Throwable t) {
-							LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+			// TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed
+			if (!completedCheckpoint.getProperties().isSavepoint()) {
+				try {
+					completedCheckpointStore.addCheckpoint(completedCheckpoint);
+				} catch (Exception exception) {
+					// we failed to store the completed checkpoint. Let's clean up
+					executor.execute(new Runnable() {
+						@Override
+						public void run() {
+							try {
+								completedCheckpoint.discardOnFailedStoring();
+							} catch (Throwable t) {
+								LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+							}
 						}
-					}
-				});
-				
-				throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+					});
+
+					throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
+				}
+
+				// drop those pending checkpoints that are at prior to the completed one
+				dropSubsumedCheckpoints(checkpointId);
 			}
 		} finally {
 			pendingCheckpoints.remove(checkpointId);
@@ -888,9 +894,6 @@ public class CheckpointCoordinator {
 		}
 		
 		rememberRecentCheckpointId(checkpointId);
-		
-		// drop those pending checkpoints that are at prior to the completed one
-		dropSubsumedCheckpoints(checkpointId);
 
 		// record the time when this was completed, to calculate
 		// the 'min delay between checkpoints'

http://git-wip-us.apache.org/repos/asf/flink/blob/6f570e7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 45c8a1b..6df7e71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -314,5 +314,4 @@ public class CheckpointProperties implements Serializable {
 			return EXTERNALIZED_CHECKPOINT_RETAINED;
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f570e7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3b44d9a..186a819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -104,7 +105,7 @@ import static org.mockito.Mockito.withSettings;
 /**
  * Tests for the checkpoint coordinator.
  */
-public class CheckpointCoordinatorTest {
+public class CheckpointCoordinatorTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -1500,8 +1501,8 @@ public class CheckpointCoordinatorTest {
 		assertTrue(pending.isDiscarded());
 		assertTrue(savepointFuture.isDone());
 
-		// the now we should have a completed checkpoint
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		// the now the saveppoint should be completed but not added to the completed checkpoint store
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
 		// validate that the relevant tasks got a confirmation message
@@ -1516,7 +1517,7 @@ public class CheckpointCoordinatorTest {
 			verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class));
 		}
 
-		CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
+		CompletedCheckpoint success = savepointFuture.get();
 		assertEquals(jid, success.getJobId());
 		assertEquals(timestamp, success.getTimestamp());
 		assertEquals(pending.getCheckpointId(), success.getCheckpointID());
@@ -1537,9 +1538,9 @@ public class CheckpointCoordinatorTest {
 		subtaskState2 = operatorStates.get(opID2).getState(vertex2.getParallelSubtaskIndex());
 
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-		CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
+		CompletedCheckpoint successNew = savepointFuture.get();
 		assertEquals(jid, successNew.getJobId());
 		assertEquals(timestampNew, successNew.getTimestamp());
 		assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1566,7 +1567,7 @@ public class CheckpointCoordinatorTest {
 	 * Triggers a savepoint and two checkpoints. The second checkpoint completes
 	 * and subsumes the first checkpoint, but not the first savepoint. Then we
 	 * trigger another checkpoint and savepoint. The 2nd savepoint completes and
-	 * subsumes the last checkpoint, but not the first savepoint.
+	 * does neither subsume the last checkpoint nor the first savepoint.
 	 */
 	@Test
 	public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1622,18 +1623,19 @@ public class CheckpointCoordinatorTest {
 		assertFalse(savepointFuture1.isDone());
 
 		assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
+		long checkpointId3 = counter.getLast();
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
 		Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
 		long savepointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
-		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
+		// 2nd savepoint should not subsume the last checkpoint and the 1st savepoint
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
 
-		assertEquals(1, coord.getNumberOfPendingCheckpoints());
-		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(2, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
 		assertFalse(savepointFuture1.isDone());
@@ -1643,9 +1645,15 @@ public class CheckpointCoordinatorTest {
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
 
-		assertEquals(0, coord.getNumberOfPendingCheckpoints());
-		assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints());
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 		assertTrue(savepointFuture1.isDone());
+
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
+
+		assertEquals(0, coord.getNumberOfPendingCheckpoints());
+		assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints());
 	}
 
 	private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3538,6 +3546,91 @@ public class CheckpointCoordinatorTest {
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
 	}
 
+	/**
+	 * FLINK-6328
+	 *
+	 * Tests that savepoints are not added to the {@link CompletedCheckpointStore} and,
+	 * thus, are not subject to job recovery. The reason that we don't want that (until
+	 * FLINK-4815 has been finished) is that the lifecycle of savepoints is not controlled
+	 * by the {@link CheckpointCoordinator}.
+	 */
+	@Test
+	public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Exception {
+		final JobID jobId = new JobID();
+		final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex vertex1 = mockExecutionVertex(executionAttemptId);
+		final CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+		final long checkpointTimestamp1 = 1L;
+		final long savepointTimestamp = 2L;
+		final long checkpointTimestamp2 = 3L;
+		final String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
+		final StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
+
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(
+			jobId,
+			600000L,
+			600000L,
+			0L,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			new ExecutionVertex[]{vertex1},
+			checkpointIDCounter,
+			completedCheckpointStore,
+			null,
+			Executors.directExecutor());
+
+		// trigger a first checkpoint
+		assertTrue(
+			"Triggering of a checkpoint should work.",
+			checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
+
+		assertTrue(0 == completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+		// complete the 1st checkpoint
+		checkpointCoordinator.receiveAcknowledgeMessage(
+			new AcknowledgeCheckpoint(
+				jobId,
+				executionAttemptId,
+				checkpointIDCounter.getLast()));
+
+		// check that the checkpoint has been completed
+		assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+		// trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore
+		Future<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+
+		checkpointCoordinator.receiveAcknowledgeMessage(
+			new AcknowledgeCheckpoint(
+				jobId,
+				executionAttemptId,
+				checkpointIDCounter.getLast()));
+
+		// check that no errors occurred
+		final CompletedCheckpoint savepoint = savepointFuture.get();
+
+		assertFalse(
+			"The savepoint should not have been added to the completed checkpoint store",
+			savepoint.getCheckpointID() == completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
+
+		assertTrue(
+			"Triggering of a checkpoint should work.",
+			checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
+
+		// complete the 2nd checkpoint
+		checkpointCoordinator.receiveAcknowledgeMessage(
+			new AcknowledgeCheckpoint(
+				jobId,
+				executionAttemptId,
+				checkpointIDCounter.getLast()));
+
+		assertTrue(
+			"The latest completed (proper) checkpoint should have been added to the completed checkpoint store.",
+			completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast());
+	}
+
 	private static final class SpyInjectingOperatorState extends OperatorState {
 
 		private static final long serialVersionUID = -4004437428483663815L;