You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/22 20:30:04 UTC

[flink] 01/03: [hotfix][checkpointing] Extract CheckpointSubsumeHelper

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38971479d4e4b5383d5e705db9ab0236b7010419
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:30 2021 +0100

    [hotfix][checkpointing] Extract CheckpointSubsumeHelper
    
    This is a pre-requisite refactoring for a subsequent bug fix.
---
 .../checkpoint/CheckpointSubsumeHelper.java        | 66 ++++++++++++++++++++++
 .../DefaultCompletedCheckpointStore.java           | 45 +++++++--------
 .../EmbeddedCompletedCheckpointStore.java          |  5 +-
 .../StandaloneCompletedCheckpointStore.java        | 10 +---
 4 files changed, 93 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
new file mode 100644
index 0000000..7d3e7aa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+/**
+ * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
+ * stores}. In general, checkpoints should be subsumed whenever state.checkpoints.num-retained is
+ * exceeded.
+ *
+ * <p>Additional considerations:
+ *
+ * <ul>
+ *   <li>Savepoints must be stored in the same queue to prevent duplicates (@see <a
+ *       href="https://issues.apache.org/jira/browse/FLINK-10354">FLINK-10354</a>).
+ *   <li>To prevent unlimited queue growth, savepoints are also counted in num-retained together
+ *       with checkpoints
+ *   <li>Savepoints actual state should NOT be discarded when they are subsumed.
+ *   <li>At least one (most recent) checkpoint (not savepoint) should be kept. Otherwise, subsequent
+ *       incremental checkpoints may refer to a discarded state (@see <a
+ *       href="https://issues.apache.org/jira/browse/FLINK-21351">FLINK-21351</a>).
+ *   <li>Except when the job is stopped with savepoint when no future checkpoints will be made.
+ * </ul>
+ */
+class CheckpointSubsumeHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
+
+    public static void subsume(
+            Deque<CompletedCheckpoint> checkpoints,
+            int numRetain,
+            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+            throws Exception {
+        if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
+            return;
+        }
+
+        while (checkpoints.size() > numRetain) {
+            CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
+            try {
+                subsumeAction.accept(completedCheckpoint);
+            } catch (Exception e) {
+                LOG.warn("Fail to subsume the old checkpoint.", e);
+            }
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 0db2d07..0f419fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -230,15 +230,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
 
         completedCheckpoints.addLast(checkpoint);
 
-        // Everything worked, let's remove a previous checkpoint if necessary.
-        while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-            final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
-            tryRemoveCompletedCheckpoint(
-                    completedCheckpoint,
-                    completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                    checkpointsCleaner,
-                    postCleanup);
-        }
+        CheckpointSubsumeHelper.subsume(
+                completedCheckpoints,
+                maxNumberOfCheckpointsToRetain,
+                completedCheckpoint ->
+                        tryRemoveCompletedCheckpoint(
+                                completedCheckpoint,
+                                completedCheckpoint.shouldBeDiscardedOnSubsume(),
+                                checkpointsCleaner,
+                                postCleanup));
 
         LOG.debug("Added {} to {}.", checkpoint, path);
     }
@@ -265,11 +265,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
             LOG.info("Shutting down");
 
             for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-                tryRemoveCompletedCheckpoint(
-                        checkpoint,
-                        checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
-                        checkpointsCleaner,
-                        () -> {});
+                try {
+                    tryRemoveCompletedCheckpoint(
+                            checkpoint,
+                            checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
+                            checkpointsCleaner,
+                            () -> {});
+                } catch (Exception e) {
+                    LOG.warn("Fail to remove checkpoint during shutdown.", e);
+                }
             }
 
             completedCheckpoints.clear();
@@ -292,14 +296,11 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
             CompletedCheckpoint completedCheckpoint,
             boolean shouldDiscard,
             CheckpointsCleaner checkpointsCleaner,
-            Runnable postCleanup) {
-        try {
-            if (tryRemove(completedCheckpoint.getCheckpointID())) {
-                checkpointsCleaner.cleanCheckpoint(
-                        completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to subsume the old checkpoint", e);
+            Runnable postCleanup)
+            throws Exception {
+        if (tryRemove(completedCheckpoint.getCheckpointID())) {
+            checkpointsCleaner.cleanCheckpoint(
+                    completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 174f0ea..d55857c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -62,9 +62,8 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor
             throws Exception {
         checkpoints.addLast(checkpoint);
 
-        if (checkpoints.size() > maxRetainedCheckpoints) {
-            removeOldestCheckpoint();
-        }
+        CheckpointSubsumeHelper.subsume(
+                checkpoints, maxRetainedCheckpoints, CompletedCheckpoint::discardOnSubsume);
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5863ec9..7723ff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -70,14 +70,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 
         checkpoints.addLast(checkpoint);
 
-        if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-            try {
-                CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-                checkpointToSubsume.discardOnSubsume();
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
-            }
-        }
+        CheckpointSubsumeHelper.subsume(
+                checkpoints, maxNumberOfCheckpointsToRetain, CompletedCheckpoint::discardOnSubsume);
     }
 
     @Override