You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/22 20:30:04 UTC
[flink] 01/03: [hotfix][checkpointing] Extract
CheckpointSubsumeHelper
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 38971479d4e4b5383d5e705db9ab0236b7010419
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:30 2021 +0100
[hotfix][checkpointing] Extract CheckpointSubsumeHelper
This is a pre-requisite refactoring for a subsequent bug fix.
---
.../checkpoint/CheckpointSubsumeHelper.java | 66 ++++++++++++++++++++++
.../DefaultCompletedCheckpointStore.java | 45 +++++++--------
.../EmbeddedCompletedCheckpointStore.java | 5 +-
.../StandaloneCompletedCheckpointStore.java | 10 +---
4 files changed, 93 insertions(+), 33 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
new file mode 100644
index 0000000..7d3e7aa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+/**
+ * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
+ * stores}. In general, checkpoints should be subsumed whenever state.checkpoints.num-retained is
+ * exceeded.
+ *
+ * <p>Additional considerations:
+ *
+ * <ul>
+ * <li>Savepoints must be stored in the same queue to prevent duplicates (@see <a
+ * href="https://issues.apache.org/jira/browse/FLINK-10354">FLINK-10354</a>).
+ * <li>To prevent unlimited queue growth, savepoints are also counted in num-retained together
+ * with checkpoints
+ * <li>Savepoints actual state should NOT be discarded when they are subsumed.
+ * <li>At least one (most recent) checkpoint (not savepoint) should be kept. Otherwise, subsequent
+ * incremental checkpoints may refer to a discarded state (@see <a
+ * href="https://issues.apache.org/jira/browse/FLINK-21351">FLINK-21351</a>).
+ * <li>Except when the job is stopped with savepoint when no future checkpoints will be made.
+ * </ul>
+ */
+class CheckpointSubsumeHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
+
+ public static void subsume(
+ Deque<CompletedCheckpoint> checkpoints,
+ int numRetain,
+ ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+ throws Exception {
+ if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
+ return;
+ }
+
+ while (checkpoints.size() > numRetain) {
+ CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
+ try {
+ subsumeAction.accept(completedCheckpoint);
+ } catch (Exception e) {
+ LOG.warn("Fail to subsume the old checkpoint.", e);
+ }
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 0db2d07..0f419fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -230,15 +230,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
completedCheckpoints.addLast(checkpoint);
- // Everything worked, let's remove a previous checkpoint if necessary.
- while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
- final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
- tryRemoveCompletedCheckpoint(
- completedCheckpoint,
- completedCheckpoint.shouldBeDiscardedOnSubsume(),
- checkpointsCleaner,
- postCleanup);
- }
+ CheckpointSubsumeHelper.subsume(
+ completedCheckpoints,
+ maxNumberOfCheckpointsToRetain,
+ completedCheckpoint ->
+ tryRemoveCompletedCheckpoint(
+ completedCheckpoint,
+ completedCheckpoint.shouldBeDiscardedOnSubsume(),
+ checkpointsCleaner,
+ postCleanup));
LOG.debug("Added {} to {}.", checkpoint, path);
}
@@ -265,11 +265,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : completedCheckpoints) {
- tryRemoveCompletedCheckpoint(
- checkpoint,
- checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
- checkpointsCleaner,
- () -> {});
+ try {
+ tryRemoveCompletedCheckpoint(
+ checkpoint,
+ checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
+ checkpointsCleaner,
+ () -> {});
+ } catch (Exception e) {
+ LOG.warn("Fail to remove checkpoint during shutdown.", e);
+ }
}
completedCheckpoints.clear();
@@ -292,14 +296,11 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
CompletedCheckpoint completedCheckpoint,
boolean shouldDiscard,
CheckpointsCleaner checkpointsCleaner,
- Runnable postCleanup) {
- try {
- if (tryRemove(completedCheckpoint.getCheckpointID())) {
- checkpointsCleaner.cleanCheckpoint(
- completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
- }
- } catch (Exception e) {
- LOG.warn("Failed to subsume the old checkpoint", e);
+ Runnable postCleanup)
+ throws Exception {
+ if (tryRemove(completedCheckpoint.getCheckpointID())) {
+ checkpointsCleaner.cleanCheckpoint(
+ completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 174f0ea..d55857c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -62,9 +62,8 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor
throws Exception {
checkpoints.addLast(checkpoint);
- if (checkpoints.size() > maxRetainedCheckpoints) {
- removeOldestCheckpoint();
- }
+ CheckpointSubsumeHelper.subsume(
+ checkpoints, maxRetainedCheckpoints, CompletedCheckpoint::discardOnSubsume);
}
@VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5863ec9..7723ff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -70,14 +70,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
checkpoints.addLast(checkpoint);
- if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
- try {
- CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
- checkpointToSubsume.discardOnSubsume();
- } catch (Exception e) {
- LOG.warn("Fail to subsume the old checkpoint.", e);
- }
- }
+ CheckpointSubsumeHelper.subsume(
+ checkpoints, maxNumberOfCheckpointsToRetain, CompletedCheckpoint::discardOnSubsume);
}
@Override