You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/01/28 12:34:10 UTC
[flink] branch master updated: [FLINK-25816][state] Remove checkpoint abortion notification of notify backend
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8c4e9a5 [FLINK-25816][state] Remove checkpoint abortion notification of notify backend
8c4e9a5 is described below
commit 8c4e9a5540e468e92829be32de41545eab7a8cba
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jan 26 10:30:49 2022 +0100
[FLINK-25816][state] Remove checkpoint abortion notification of notify backend
The notification currently causes an exception and adds complexity.
It's also not necessary, unlikely to be delivered (because of the
difference in checkpoint/materialization intervals) and unlikely to be
utilized (it will arrive only after the nested snapshot has completed
and most likely do the same GC as in completion notification).
---
.../changelog/ChangelogKeyedStateBackend.java | 51 ++--------------------
1 file changed, 4 insertions(+), 47 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index c746b2b..6bb710d 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -78,13 +78,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Optional;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -209,19 +207,6 @@ public class ChangelogKeyedStateBackend<K>
/** Checkpoint ID mapped to Materialization ID - used to notify nested backend of completion. */
private final NavigableMap<Long, Long> materializationIdByCheckpointId = new TreeMap<>();
- /**
- * Materialization ID mapped to Checkpoint IDs - used to notify nested backend of abortion.
- * Entry is removed when:
- *
- * <ol>
- * <li>some checkpoint of a Set completes (in which case {@link #keyedStateBackend} is {@link
- * CheckpointListener#notifyCheckpointComplete(long) notified of completion}.
- * <li>a newer checkpoint completes
- * <li>all checkpoints of a Set are aborted (in which case {@link #keyedStateBackend} is
- * {@link CheckpointListener#notifyCheckpointAborted(long) notified of abortion}.
- * </ol>
- */
- private final Map<Long, Set<Long>> pendingMaterializationConfirmations = new HashMap<>();
private long lastConfirmedMaterializationId = -1L;
@@ -400,15 +385,8 @@ public class ChangelogKeyedStateBackend<K>
ChangelogSnapshotState changelogStateBackendStateCopy = changelogSnapshotState;
- if (changelogStateBackendStateCopy.materializationID > lastConfirmedMaterializationId) {
- materializationIdByCheckpointId.put(
- checkpointId, changelogStateBackendStateCopy.materializationID);
- pendingMaterializationConfirmations
- .computeIfAbsent(
- changelogStateBackendStateCopy.materializationID,
- ign -> new HashSet<>())
- .add(checkpointId);
- }
+ materializationIdByCheckpointId.put(
+ checkpointId, changelogStateBackendStateCopy.materializationID);
return toRunnableFuture(
stateChangelogWriter
@@ -511,14 +489,8 @@ public class ChangelogKeyedStateBackend<K>
keyedStateBackend.notifyCheckpointComplete(materializationID);
lastConfirmedMaterializationId = materializationID;
}
- pendingMaterializationConfirmations.remove(materializationID);
}
- // there is a chance that nested backend will miss the abort notification
- // but there is no other way to cleanup this map
- Map<Long, Long> olderCheckpoints =
- materializationIdByCheckpointId.headMap(checkpointId, true);
- olderCheckpoints.values().forEach(pendingMaterializationConfirmations::remove);
- olderCheckpoints.clear();
+ materializationIdByCheckpointId.headMap(checkpointId, true).clear();
}
@Override
@@ -530,22 +502,7 @@ public class ChangelogKeyedStateBackend<K>
// This might change if the log ownership changes (the method won't likely be needed).
stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
}
- Long materializationID = materializationIdByCheckpointId.remove(checkpointId);
- if (materializationID != null) {
- Set<Long> checkpoints = pendingMaterializationConfirmations.get(materializationID);
- checkpoints.remove(checkpointId);
- if (checkpoints.isEmpty()) {
- if (materializationID < changelogSnapshotState.materializationID) {
- // Notification is not strictly required and will arrive only after the nested
- // snapshot has completed. It's also unlikely to be sent because of the
- // difference in checkpoint/materialization intervals. But it can still be
- // useful
- // for some backends.
- keyedStateBackend.notifyCheckpointAborted(materializationID);
- }
- pendingMaterializationConfirmations.remove(materializationID);
- }
- }
+ // TODO: Consider notifying nested state backend about checkpoint abortion (FLINK-25850)
}
// -------- Methods not simply delegating to wrapped state backend ---------