You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/09/25 07:21:18 UTC

[flink] branch release-1.9 updated: [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 3aeeb5f  [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint
3aeeb5f is described below

commit 3aeeb5fede885f9efeb66fe271828b25ffd6571e
Author: Gyula Fora <gy...@apache.org>
AuthorDate: Mon Sep 23 15:26:42 2019 +0200

    [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint
    
    Closes #9756
---
 .../checkpoint/CompletedCheckpointStore.java       | 32 ++++------
 .../StandaloneCompletedCheckpointStoreTest.java    | 70 ++++++++++++++++++++--
 2 files changed, 78 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 1cda131..9ed3151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,31 +54,26 @@ public interface CompletedCheckpointStore {
 	 * added.
 	 */
 	default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception {
-		if (getAllCheckpoints().isEmpty()) {
+		List<CompletedCheckpoint> allCheckpoints = getAllCheckpoints();
+		if (allCheckpoints.isEmpty()) {
 			return null;
 		}
 
-		CompletedCheckpoint candidate = getAllCheckpoints().get(getAllCheckpoints().size() - 1);
-		if (isPreferCheckpointForRecovery && getAllCheckpoints().size() > 1) {
-			List<CompletedCheckpoint> allCheckpoints;
-			try {
-				allCheckpoints = getAllCheckpoints();
-				ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
-				while (listIterator.hasPrevious()) {
-					CompletedCheckpoint prev = listIterator.previous();
-					if (!prev.getProperties().isSavepoint()) {
-						candidate = prev;
-						LOG.info("Found a completed checkpoint before the latest savepoint, will use it to recover!");
-						break;
-					}
+		CompletedCheckpoint lastCompleted = allCheckpoints.get(allCheckpoints.size() - 1);
+
+		if (isPreferCheckpointForRecovery && allCheckpoints.size() > 1 && lastCompleted.getProperties().isSavepoint()) {
+			ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
+			while (listIterator.hasPrevious()) {
+				CompletedCheckpoint prev = listIterator.previous();
+				if (!prev.getProperties().isSavepoint()) {
+					LOG.info("Found a completed checkpoint ({}) before the latest savepoint, will use it to recover!", prev);
+					return prev;
 				}
-			} catch (Exception e) {
-				LOG.error("Method getAllCheckpoints caused exception : ", e);
-				throw new FlinkRuntimeException(e);
 			}
+			LOG.info("Did not find earlier checkpoint, using latest savepoint to recover.");
 		}
 
-		return candidate;
+		return lastCompleted;
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 6f3c60b..4ed1050 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -27,6 +29,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -41,7 +45,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 
 	@Override
 	protected CompletedCheckpointStore createCompletedCheckpoints(
-			int maxNumberOfCheckpointsToRetain) throws Exception {
+		int maxNumberOfCheckpointsToRetain) throws Exception {
 
 		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
 	}
@@ -86,7 +90,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 		assertTrue(checkpoint.isDiscarded());
 		verifyCheckpointDiscarded(taskStates);
 	}
-	
+
 	/**
 	 * Tests that the checkpoint does not exist in the store when we fail to add
 	 * it into the store (i.e., there exists an exception thrown by the method).
@@ -96,16 +100,16 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 
 		final int numCheckpointsToRetain = 1;
 		CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain);
-		
+
 		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
 			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
 			doReturn(i).when(checkpointToAdd).getCheckpointID();
 			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
 			doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();
-			
+
 			try {
 				store.addCheckpoint(checkpointToAdd);
-				
+
 				// The checkpoint should be in the store if we successfully add it into the store.
 				List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
 				assertTrue(addedCheckpoints.contains(checkpointToAdd));
@@ -116,4 +120,60 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 			}
 		}
 	}
+
+	@Test
+	public void testPreferCheckpointWithoutSavepoint() throws Exception {
+		StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5);
+		JobID jobId = new JobID();
+		store.addCheckpoint(checkpoint(jobId, 1L));
+		store.addCheckpoint(checkpoint(jobId, 2L));
+		store.addCheckpoint(checkpoint(jobId, 3L));
+
+		CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true);
+
+		assertThat(latestCheckpoint.getCheckpointID(), equalTo(3L));
+	}
+
+	@Test
+	public void testPreferCheckpointWithSavepoint() throws Exception {
+		StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5);
+		JobID jobId = new JobID();
+		store.addCheckpoint(checkpoint(jobId, 1L));
+		store.addCheckpoint(savepoint(jobId, 2L));
+		store.addCheckpoint(savepoint(jobId, 3L));
+
+		CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true);
+
+		assertThat(latestCheckpoint.getCheckpointID(), equalTo(1L));
+	}
+
+	@Test
+	public void testPreferCheckpointWithOnlySavepoint() throws Exception {
+		StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5);
+		JobID jobId = new JobID();
+		store.addCheckpoint(savepoint(jobId, 1L));
+		store.addCheckpoint(savepoint(jobId, 2L));
+
+		CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true);
+
+		assertThat(latestCheckpoint.getCheckpointID(), equalTo(2L));
+	}
+
+	private static CompletedCheckpoint checkpoint(JobID jobId, long checkpointId) {
+		return new TestCompletedCheckpoint(
+			jobId,
+			checkpointId,
+			checkpointId,
+			Collections.emptyMap(),
+			CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE));
+	}
+
+	private static CompletedCheckpoint savepoint(JobID jobId, long checkpointId) {
+		return new TestCompletedCheckpoint(
+			jobId,
+			checkpointId,
+			checkpointId,
+			Collections.emptyMap(),
+			CheckpointProperties.forSavepoint());
+	}
 }