You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/01/28 12:34:10 UTC

[flink] branch master updated: [FLINK-25816][state] Remove checkpoint abortion notification of notify backend

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c4e9a5  [FLINK-25816][state] Remove checkpoint abortion notification of notify backend
8c4e9a5 is described below

commit 8c4e9a5540e468e92829be32de41545eab7a8cba
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jan 26 10:30:49 2022 +0100

    [FLINK-25816][state] Remove checkpoint abortion notification of notify backend
    
    The notification currently causes an exception and adds complexity.
    It's also not necessary, unlikely to be delivered (because of the
    difference in checkpoint/materialization intervals) and unlikely to be
    utilized (it will arrive only after the nested snapshot has completed
    and most likely do the same GC as in completion notification).
---
 .../changelog/ChangelogKeyedStateBackend.java      | 51 ++--------------------
 1 file changed, 4 insertions(+), 47 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index c746b2b..6bb710d 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -78,13 +78,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -209,19 +207,6 @@ public class ChangelogKeyedStateBackend<K>
 
     /** Checkpoint ID mapped to Materialization ID - used to notify nested backend of completion. */
     private final NavigableMap<Long, Long> materializationIdByCheckpointId = new TreeMap<>();
-    /**
-     * Materialization ID mapped to Checkpoint IDs - used to notify nested backend of abortion.
-     * Entry is removed when:
-     *
-     * <ol>
-     *   <li>some checkpoint of a Set completes (in which case {@link #keyedStateBackend} is {@link
-     *       CheckpointListener#notifyCheckpointComplete(long) notified of completion}.
-     *   <li>a newer checkpoint completes
-     *   <li>all checkpoints of a Set are aborted (in which case {@link #keyedStateBackend} is
-     *       {@link CheckpointListener#notifyCheckpointAborted(long) notified of abortion}.
-     * </ol>
-     */
-    private final Map<Long, Set<Long>> pendingMaterializationConfirmations = new HashMap<>();
 
     private long lastConfirmedMaterializationId = -1L;
 
@@ -400,15 +385,8 @@ public class ChangelogKeyedStateBackend<K>
 
         ChangelogSnapshotState changelogStateBackendStateCopy = changelogSnapshotState;
 
-        if (changelogStateBackendStateCopy.materializationID > lastConfirmedMaterializationId) {
-            materializationIdByCheckpointId.put(
-                    checkpointId, changelogStateBackendStateCopy.materializationID);
-            pendingMaterializationConfirmations
-                    .computeIfAbsent(
-                            changelogStateBackendStateCopy.materializationID,
-                            ign -> new HashSet<>())
-                    .add(checkpointId);
-        }
+        materializationIdByCheckpointId.put(
+                checkpointId, changelogStateBackendStateCopy.materializationID);
 
         return toRunnableFuture(
                 stateChangelogWriter
@@ -511,14 +489,8 @@ public class ChangelogKeyedStateBackend<K>
                 keyedStateBackend.notifyCheckpointComplete(materializationID);
                 lastConfirmedMaterializationId = materializationID;
             }
-            pendingMaterializationConfirmations.remove(materializationID);
         }
-        // there is a chance that nested backend will miss the abort notification
-        // but there is no other way to cleanup this map
-        Map<Long, Long> olderCheckpoints =
-                materializationIdByCheckpointId.headMap(checkpointId, true);
-        olderCheckpoints.values().forEach(pendingMaterializationConfirmations::remove);
-        olderCheckpoints.clear();
+        materializationIdByCheckpointId.headMap(checkpointId, true).clear();
     }
 
     @Override
@@ -530,22 +502,7 @@ public class ChangelogKeyedStateBackend<K>
             // This might change if the log ownership changes (the method won't likely be needed).
             stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
         }
-        Long materializationID = materializationIdByCheckpointId.remove(checkpointId);
-        if (materializationID != null) {
-            Set<Long> checkpoints = pendingMaterializationConfirmations.get(materializationID);
-            checkpoints.remove(checkpointId);
-            if (checkpoints.isEmpty()) {
-                if (materializationID < changelogSnapshotState.materializationID) {
-                    // Notification is not strictly required and will arrive only after the nested
-                    // snapshot has completed. It's also unlikely to be sent because of the
-                    // difference in checkpoint/materialization intervals. But it can still be
-                    // useful
-                    // for some backends.
-                    keyedStateBackend.notifyCheckpointAborted(materializationID);
-                }
-                pendingMaterializationConfirmations.remove(materializationID);
-            }
-        }
+        // TODO: Consider notifying nested state backend about checkpoint abortion (FLINK-25850)
     }
 
     // -------- Methods not simply delegating to wrapped state backend ---------