You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/11 19:45:13 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #19907: [FLINK-27692][state] Support local recovery for materialized part of changelog

rkhachatryan commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r895055680


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java:
##########
@@ -76,7 +74,7 @@ public interface TaskLocalStateStore {
     /**
      * Remove all checkpoints from the store that match the given predicate.
      *
-     * @param matcher the predicate that selects the checkpoints for pruning.
+     * @param checkpointID the checkpoints for pruning.
      */
-    void pruneMatchingCheckpoints(LongPredicate matcher);
+    void pruneNotMatchingCheckpoints(long checkpointID);

Review Comment:
   This change (and the corresponding call sites) seems like a pure refactoring.
   If so, it's preferrable to put it into a separate commit according to [Flink guidelines](https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html#separate-refactoring-cleanup-and-independent-changes)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -307,14 +340,23 @@ public void abortCheckpoint(long abortedCheckpointId) {
                 jobVertexID,
                 subtaskIndex);
 
+        // delete the referenced checkpoint meantime
         pruneCheckpoints(
-                snapshotCheckpointId -> snapshotCheckpointId == abortedCheckpointId, false);
+                snapshotCheckpointId ->
+                        (snapshotCheckpointId == abortedCheckpointId
+                                || referredByCheckpointID.getOrDefault(snapshotCheckpointId, -1L)
+                                        == abortedCheckpointId),

Review Comment:
   Do I understand correctly that in case of changelog
   - discard does nothing for jm part, because of the empty 
   `discardState()` in `ChangelogStateBackendHandleImpl`
   - the local part can is discarded, although it could be re-used; but it won't hurt correctness (on recovery, non-local state will be used)
   
   ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -141,6 +158,29 @@ public void storeLocalState(
                     subtaskIndex);
         }
 
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = changelogStateBackendHandle.getMaterializationID();
+                    if (currentMaterializationID == null

Review Comment:
   This assumes that the snapshot contains at most one materialization ID.
   Can (and should) we rely on it? 
   I'm not sure whether it holds for down-scaling case.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -141,6 +158,29 @@ public void storeLocalState(
                     subtaskIndex);
         }
 
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = changelogStateBackendHandle.getMaterializationID();
+                    if (currentMaterializationID == null
+                            || materializationID != currentMaterializationID.f0) {
+                        currentMaterializationID = Tuple2.of(materializationID, checkpointId);
+                        referredByCheckpointID.clear();

Review Comment:
   Could you clarify what happens if two snapshots with different materialization IDs complete out of order?
   Isn't it possible that an older checkpoint (say, CP1 with mID=1) will "override" the newer one (say, CP2 with mID=2)?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -141,6 +158,29 @@ public void storeLocalState(
                     subtaskIndex);
         }
 
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = changelogStateBackendHandle.getMaterializationID();
+                    if (currentMaterializationID == null
+                            || materializationID != currentMaterializationID.f0) {
+                        currentMaterializationID = Tuple2.of(materializationID, checkpointId);
+                        referredByCheckpointID.clear();
+                    } else {
+                        referredByCheckpointID.put(currentMaterializationID.f1, checkpointId);

Review Comment:
   Shouldn't this (and other access) be synchronized, similar to `storedTaskStateByCheckpointID`?
   
   The method is called from `AsyncCheckpointRunnable`, which can be executed by different threads (plus other methods).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -98,6 +104,15 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
     @GuardedBy("lock")
     private final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID;
 
+    /** The relationship between checkpoints. (cp1,cp3) means cp3 refer to cp1's state. */
+    private final Map<Long, Long> referredByCheckpointID;
+
+    /**
+     * (MaterializationID, created by checkpointID). Because local store only keeps one checkpoint,
+     * at most one materialization can exist at the same time.
+     */
+    private Tuple2<Long, Long> currentMaterializationID;

Review Comment:
   `lastChangelogMaterializationIdAndCheckpoint`?
   
   Or maybe these two new fields should form a new helper class encapsulating changelog-specific logic? 
   And then probably tuple2 can be replaced with just two fields?
   
   I think it should be more clear and easier to test. WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -804,6 +833,9 @@ private static class ChangelogSnapshotState {
          */
         private final List<KeyedStateHandle> materializedSnapshot;
 
+        /** Materialized snapshot from the underlying delegated state backend for local recovery. */
+        private final List<KeyedStateHandle> localMaterializedSnapshot;

Review Comment:
   There is already `SnapshotResult` which is responsible for holding and distinguishing local and jm-owned snapshots. With this change, `ChangelogSnapshotState` overlaps with it.
   
   Maybe it makes sense to refactor`ChangelogSnapshotState` so that it holds a single `SnapshotResult`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org