You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/02 17:02:40 UTC

[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012016085


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   Hmmm, that's a bit confusing. Indeed it looks like this might be working. I've missed the importance of this `writer.isDone()` check. It's a bit strange that for detecting if the writer is actually a valid writer to use or not, sometimes internal fields like `writer != null` and `ongoingCheckpointId == request.getCheckpointId()`, but other times this decision is made by checking a state of another class like `writer.isDone()`?  
   
   > BTW, writer cannot be set to null during abort checkpoint. There may be some writeInput/writeOutput after abort. If set to null, req.onWriterMissing(); will throw exception.
   
   Wouldn't it be more explicit, if we:
   1. kept here only the `writer != null` check
   2. Maybe in `ChannelStateWriteRequest#abort` replace `CheckpointInProgressRequest` (this is confusing on its own) with a dedicated `CheckpointAbortRequest`
   3. When handling/dispatching `CheckpointAbortRequest` here, we would set the `this.writer = null` and set something like `this.abortedCheckpointId = this.ongoingCheckpointId` 
   4. when trying to access the writer, you could then check
   ```
   ChannelStateCheckpointWriter writer =
                       ongoingCheckpointId == request.getCheckpointId() && abortedCheckpointId != request.getCheckpointId() ? this.writer : null;
   ```
   ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   > I don't think so. Actually, all abort, start, writeInput, finishInput, writeOutput or finishOutput are executed in the Channel state writer Executor Thread. It is single thread, so the race condition isn't exist.
   
   I was thinking about something else, but I see now that I was wrong anyway. The abort call would not be executed because of the returned writer from
   ```
   ChannelStateCheckpointWriter writer =
                       ongoingCheckpointId == request.getCheckpointId() ? this.writer : null;
   ```
   would be `null` and `abort` request itself has `ignoreMissingWriter` set to `true`.
   
   If checkpoint 42 is completed due to receiving of a `CheckpointBarrier`, `ChannelStateWriter#finishInput` would actually work fine, and it would actually successfully execute this `checkState`.
   
   Long story short, as long as in the newly proposed version, `ongoingCheckpointId` is set correctly, all of the writes, input/output completions, aborts will be correctly handled - either ignored if they refer to not the ongoing checkpoint, or actually correctly processed?
   
   > Map<Long, ChannelStateCheckpointWriter> writers; cannot be cleaned up until the end of the Task.
   
   👍 Yeah, I've noticed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   > I have added the ChannelStateWriterImplTest.testAbortOldAndStartNewCheckpoint(). It is:
   
   I'm not sure if that's the correct level to test it. Because it's actually important how `SubtaskCheckpointCoordinatorImpl` is using the `ChannelStateWriter`. Can we implement this test using `SubtaskCheckpointCoordinatorImpl` (in `SubtaskCheckpointCoordinatorTest`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org