You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/09/25 07:21:18 UTC
[flink] branch release-1.9 updated: [FLINK-14145] Fix
getLatestCheckpoint(true) returns wrong checkpoint
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 3aeeb5f [FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint
3aeeb5f is described below
commit 3aeeb5fede885f9efeb66fe271828b25ffd6571e
Author: Gyula Fora <gy...@apache.org>
AuthorDate: Mon Sep 23 15:26:42 2019 +0200
[FLINK-14145] Fix getLatestCheckpoint(true) returns wrong checkpoint
Closes #9756
---
.../checkpoint/CompletedCheckpointStore.java | 32 ++++------
.../StandaloneCompletedCheckpointStoreTest.java | 70 ++++++++++++++++++++--
2 files changed, 78 insertions(+), 24 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 1cda131..9ed3151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,31 +54,26 @@ public interface CompletedCheckpointStore {
* added.
*/
default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception {
- if (getAllCheckpoints().isEmpty()) {
+ List<CompletedCheckpoint> allCheckpoints = getAllCheckpoints();
+ if (allCheckpoints.isEmpty()) {
return null;
}
- CompletedCheckpoint candidate = getAllCheckpoints().get(getAllCheckpoints().size() - 1);
- if (isPreferCheckpointForRecovery && getAllCheckpoints().size() > 1) {
- List<CompletedCheckpoint> allCheckpoints;
- try {
- allCheckpoints = getAllCheckpoints();
- ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
- while (listIterator.hasPrevious()) {
- CompletedCheckpoint prev = listIterator.previous();
- if (!prev.getProperties().isSavepoint()) {
- candidate = prev;
- LOG.info("Found a completed checkpoint before the latest savepoint, will use it to recover!");
- break;
- }
+ CompletedCheckpoint lastCompleted = allCheckpoints.get(allCheckpoints.size() - 1);
+
+ if (isPreferCheckpointForRecovery && allCheckpoints.size() > 1 && lastCompleted.getProperties().isSavepoint()) {
+ ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
+ while (listIterator.hasPrevious()) {
+ CompletedCheckpoint prev = listIterator.previous();
+ if (!prev.getProperties().isSavepoint()) {
+ LOG.info("Found a completed checkpoint ({}) before the latest savepoint, will use it to recover!", prev);
+ return prev;
}
- } catch (Exception e) {
- LOG.error("Method getAllCheckpoints caused exception : ", e);
- throw new FlinkRuntimeException(e);
}
+ LOG.info("Did not find earlier checkpoint, using latest savepoint to recover.");
}
- return candidate;
+ return lastCompleted;
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 6f3c60b..4ed1050 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.SharedStateRegistry;
+
import org.junit.Test;
import java.io.IOException;
@@ -27,6 +29,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -41,7 +45,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
@Override
protected CompletedCheckpointStore createCompletedCheckpoints(
- int maxNumberOfCheckpointsToRetain) throws Exception {
+ int maxNumberOfCheckpointsToRetain) throws Exception {
return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
}
@@ -86,7 +90,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
assertTrue(checkpoint.isDiscarded());
verifyCheckpointDiscarded(taskStates);
}
-
+
/**
* Tests that the checkpoint does not exist in the store when we fail to add
* it into the store (i.e., there exists an exception thrown by the method).
@@ -96,16 +100,16 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
final int numCheckpointsToRetain = 1;
CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain);
-
+
for (long i = 0; i <= numCheckpointsToRetain; ++i) {
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
doReturn(i).when(checkpointToAdd).getCheckpointID();
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();
-
+
try {
store.addCheckpoint(checkpointToAdd);
-
+
// The checkpoint should be in the store if we successfully add it into the store.
List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
assertTrue(addedCheckpoints.contains(checkpointToAdd));
@@ -116,4 +120,60 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
}
}
}
+
+ @Test
+ public void testPreferCheckpointWithoutSavepoint() throws Exception {
+ StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5);
+ JobID jobId = new JobID();
+ store.addCheckpoint(checkpoint(jobId, 1L));
+ store.addCheckpoint(checkpoint(jobId, 2L));
+ store.addCheckpoint(checkpoint(jobId, 3L));
+
+ CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true);
+
+ assertThat(latestCheckpoint.getCheckpointID(), equalTo(3L));
+ }
+
+ @Test
+ public void testPreferCheckpointWithSavepoint() throws Exception {
+ StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5);
+ JobID jobId = new JobID();
+ store.addCheckpoint(checkpoint(jobId, 1L));
+ store.addCheckpoint(savepoint(jobId, 2L));
+ store.addCheckpoint(savepoint(jobId, 3L));
+
+ CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true);
+
+ assertThat(latestCheckpoint.getCheckpointID(), equalTo(1L));
+ }
+
+ @Test
+ public void testPreferCheckpointWithOnlySavepoint() throws Exception {
+ StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5);
+ JobID jobId = new JobID();
+ store.addCheckpoint(savepoint(jobId, 1L));
+ store.addCheckpoint(savepoint(jobId, 2L));
+
+ CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true);
+
+ assertThat(latestCheckpoint.getCheckpointID(), equalTo(2L));
+ }
+
+ private static CompletedCheckpoint checkpoint(JobID jobId, long checkpointId) {
+ return new TestCompletedCheckpoint(
+ jobId,
+ checkpointId,
+ checkpointId,
+ Collections.emptyMap(),
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE));
+ }
+
+ private static CompletedCheckpoint savepoint(JobID jobId, long checkpointId) {
+ return new TestCompletedCheckpoint(
+ jobId,
+ checkpointId,
+ checkpointId,
+ Collections.emptyMap(),
+ CheckpointProperties.forSavepoint());
+ }
}