You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/22 20:30:05 UTC

[flink] 02/03: [FLINK-21351][checkpointing] Don't subsume last checkpoint

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd8e406e547973da55fa47e260a910293637d1d7
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:36 2021 +0100

    [FLINK-21351][checkpointing] Don't subsume last checkpoint
    
    When a savepoint is added to CompletedCheckpointStore
    all previous checkpoints will be removed if number to
    retain is 1.
    
    This makes future incremental checkpoints invalid since
    they can refer to the discarded state.
---
 .../checkpoint/CheckpointSubsumeHelper.java        | 60 +++++++++++++---
 .../DefaultCompletedCheckpointStoreTest.java       | 83 +++++++++++++++++++++-
 2 files changed, 132 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
index 7d3e7aa..da8e575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.util.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Deque;
+import java.util.Iterator;
+import java.util.Optional;
 
 /**
  * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
@@ -46,21 +47,60 @@ class CheckpointSubsumeHelper {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
 
     public static void subsume(
-            Deque<CompletedCheckpoint> checkpoints,
-            int numRetain,
-            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+            Deque<CompletedCheckpoint> checkpoints, int numRetain, SubsumeAction subsumeAction)
             throws Exception {
         if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
             return;
         }
+        CompletedCheckpoint latest = checkpoints.peekLast();
+        Optional<CompletedCheckpoint> latestNotSavepoint = getLatestNotSavepoint(checkpoints);
+        Iterator<CompletedCheckpoint> iterator = checkpoints.iterator();
+        while (checkpoints.size() > numRetain && iterator.hasNext()) {
+            CompletedCheckpoint next = iterator.next();
+            if (canSubsume(next, latest, latestNotSavepoint)) {
+                iterator.remove();
+                try {
+                    subsumeAction.subsume(next);
+                } catch (Exception e) {
+                    LOG.warn("Fail to subsume the old checkpoint.", e);
+                }
+            }
+            // Don't break out from the loop to subsume intermediate savepoints
+        }
+    }
 
-        while (checkpoints.size() > numRetain) {
-            CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
-            try {
-                subsumeAction.accept(completedCheckpoint);
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
+    private static Optional<CompletedCheckpoint> getLatestNotSavepoint(
+            Deque<CompletedCheckpoint> completed) {
+        Iterator<CompletedCheckpoint> descendingIterator = completed.descendingIterator();
+        while (descendingIterator.hasNext()) {
+            CompletedCheckpoint next = descendingIterator.next();
+            if (!next.getProperties().isSavepoint()) {
+                return Optional.of(next);
             }
         }
+        return Optional.empty();
+    }
+
+    private static boolean canSubsume(
+            CompletedCheckpoint next,
+            CompletedCheckpoint latest,
+            Optional<CompletedCheckpoint> latestNonSavepoint) {
+        if (next == latest) {
+            return false;
+        } else if (next.getProperties().isSavepoint()) {
+            return true;
+        } else if (latest.getProperties().isSynchronous()) {
+            // If the job has stopped with a savepoint then it's safe to subsume because no future
+            // snapshots will be taken during this run
+            return true;
+        } else {
+            // Don't remove the latest non-savepoint lest invalidate future incremental snapshots
+            return latestNonSavepoint.filter(checkpoint -> checkpoint != next).isPresent();
+        }
+    }
+
+    @FunctionalInterface
+    interface SubsumeAction {
+        void subsume(CompletedCheckpoint checkpoint) throws Exception;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 0430085..bb0578c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.FlinkMatchers;
@@ -25,6 +26,7 @@ import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper
 import org.apache.flink.runtime.persistence.TestingStateHandleStore;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -44,8 +47,11 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.asList;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -72,6 +78,56 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
         executorService.shutdownNow();
     }
 
+    @Test
+    public void testAtLeastOneCheckpointRetained() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 3L);
+        CompletedCheckpoint sp2 = getCheckpoint(true, 4L);
+        CompletedCheckpoint sp3 = getCheckpoint(true, 5L);
+        testCheckpointRetention(1, asList(cp1, cp2, sp1, sp2, sp3), asList(cp2, sp3));
+    }
+
+    @Test
+    public void testOlderSavepointSubsumed() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 3L);
+        testCheckpointRetention(1, asList(cp1, sp1, cp2), asList(cp2));
+    }
+
+    @Test
+    public void testSubsumeAfterStoppingWithSavepoint() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+        CompletedCheckpoint stop = getCheckpoint(CheckpointProperties.forSyncSavepoint(false), 3L);
+        testCheckpointRetention(1, asList(cp1, sp1, stop), asList(stop));
+    }
+
+    @Test
+    public void testNotSubsumedIfNotNeeded() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+        CompletedCheckpoint cp3 = getCheckpoint(false, 3L);
+        testCheckpointRetention(3, asList(cp1, cp2, cp3), asList(cp1, cp2, cp3));
+    }
+
+    private void testCheckpointRetention(
+            int numRetain,
+            List<CompletedCheckpoint> completed,
+            List<CompletedCheckpoint> expectedRetained)
+            throws Exception {
+        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
+                builder.setGetAllSupplier(() -> createStateHandles(3)).build();
+        final CompletedCheckpointStore completedCheckpointStore =
+                createCompletedCheckpointStore(stateHandleStore, numRetain);
+
+        for (CompletedCheckpoint c : completed) {
+            completedCheckpointStore.addCheckpoint(c, new CheckpointsCleaner(), () -> {});
+        }
+        assertEquals(expectedRetained, completedCheckpointStore.getAllCheckpoints());
+    }
+
     /**
      * We have three completed checkpoints(1, 2, 3) in the state handle store. We expect that {@link
      * DefaultCompletedCheckpointStore#recover()} should recover the sorted checkpoints by name.
@@ -298,8 +354,13 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
 
     private CompletedCheckpointStore createCompletedCheckpointStore(
             TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) {
+        return createCompletedCheckpointStore(stateHandleStore, 1);
+    }
+
+    private CompletedCheckpointStore createCompletedCheckpointStore(
+            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) {
         return new DefaultCompletedCheckpointStore<>(
-                1,
+                toRetain,
                 stateHandleStore,
                 new CheckpointStoreUtil() {
                     @Override
@@ -314,4 +375,24 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
                 },
                 executorService);
     }
+
+    private CompletedCheckpoint getCheckpoint(boolean isSavepoint, long id) {
+        return getCheckpoint(
+                isSavepoint
+                        ? CheckpointProperties.forSavepoint(false)
+                        : CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                id);
+    }
+
+    private CompletedCheckpoint getCheckpoint(CheckpointProperties props, long id) {
+        return new CompletedCheckpoint(
+                new JobID(),
+                id,
+                0L,
+                0L,
+                Collections.emptyMap(),
+                Collections.emptyList(),
+                props,
+                new TestCompletedCheckpointStorageLocation());
+    }
 }