You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/22 14:14:40 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

1996fanrui opened a new pull request, #21131:
URL: https://github.com/apache/flink/pull/21131

   ## What is the purpose of the change
   
    Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint
   
   ## Brief change log
   
   The `Map<Long, ChannelStateCheckpointWriter> writers;` can be simplified to `long ongoingCheckpointId` and `ChannelStateCheckpointWriter writer`.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *ChannelStateWriteRequestDispatcherImplTest.testConcurrentUnalignedCheckpoint()*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented?  not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012016085


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   Hmmm, that's a bit confusing. Indeed it looks like this might be working. I've missed the importance of this `writer.isDone()` check. It's a bit strange that for detecting if the writer is actually a valid writer to use or not, sometimes internal fields like `writer != null` and `ongoingCheckpointId == request.getCheckpointId()`, but other times this decision is made by checking a state of another class like `writer.isDone()`?  
   
   > BTW, writer cannot be set to null during abort checkpoint. There may be some writeInput/writeOutput after abort. If set to null, req.onWriterMissing(); will throw exception.
   
   Wouldn't it be more explicit, if we:
   1. kept here only the `writer != null` check
   2. Maybe in `ChannelStateWriteRequest#abort` replace `CheckpointInProgressRequest` (this is confusing on its own) with a dedicated `CheckpointAbortRequest`
   3. When handling/dispatching `CheckpointAbortRequest` here, we would set the `this.writer = null` and set something like `this.abortedCheckpointId = this.ongoingCheckpointId` 
   4. when trying to access the writer, you could then check
   ```
   ChannelStateCheckpointWriter writer =
                       ongoingCheckpointId == request.getCheckpointId() && abortedCheckpointId != request.getCheckpointId() ? this.writer : null;
   ```
   ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   > I don't think so. Actually, all abort, start, writeInput, finishInput, writeOutput or finishOutput are executed in the Channel state writer Executor Thread. It is single thread, so the race condition isn't exist.
   
   I was thinking about something else, but I see now that I was wrong anyway. The abort call would not be executed because of the returned writer from
   ```
   ChannelStateCheckpointWriter writer =
                       ongoingCheckpointId == request.getCheckpointId() ? this.writer : null;
   ```
   would be `null` and `abort` request itself has `ignoreMissingWriter` set to `true`.
   
   If checkpoint 42 is completed due to receiving of a `CheckpointBarrier`, `ChannelStateWriter#finishInput` would actually work fine, and it would actually successfully execute this `checkState`.
   
   Long story short, as long as in the newly proposed version, `ongoingCheckpointId` is set correctly, all of the writes, input/output completions, aborts will be correctly handled - either ignored if they refer to not the ongoing checkpoint, or actually correctly processed?
   
   > Map<Long, ChannelStateCheckpointWriter> writers; cannot be cleaned up until the end of the Task.
   
   👍 Yeah, I've noticed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   > I have added the ChannelStateWriterImplTest.testAbortOldAndStartNewCheckpoint(). It is:
   
   I'm not sure if that's the correct level to test it. Because it's actually important how `SubtaskCheckpointCoordinatorImpl` is using the `ChannelStateWriter`. Can we implement this test using `SubtaskCheckpointCoordinatorImpl` (in `SubtaskCheckpointCoordinatorTest`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1288068334

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012524837


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   Hi @pnowojski , thanks for your suggestions.
   
   I updated. I created the `CheckpointAbortRequest` and `maxAbortedCheckpointId`. I didn't use the `abortedCheckpointId`, because the abort request may be received in advance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1295684218

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009238931


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   Hi @pnowojski 
   
   As I mentioned in the last comment, `writer` cannot be set to `null` during abort checkpoint.
   
   > If the 2. happens after 1., this checkState would fail, right?
   
   I don't think so. Actually, all `abort`, `start`, `writeInput`, `finishInput`, `writeOutput` or `finishOutput` are executed in the `Channel state writer Executor` Thread. It is single thread, so the race condition isn't exist.
   
   Please correct me if anything is wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1301962892

   Maybe we should rename the ticket/pr/commit. This doesn't simplify the class anymore 😓 Maybe 
   > Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl
   
   ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1287808423

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "347b50aa5e162df1e244a4ecd4a9a35be7078879",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "347b50aa5e162df1e244a4ecd4a9a35be7078879",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 347b50aa5e162df1e244a4ecd4a9a35be7078879 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009243043


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   BTW, after our analysis, the code of the master branch has a memory leak. When checkpoint fails, `Map<Long, ChannelStateCheckpointWriter> writers;` cannot be cleaned up until the end of the Task.
   
   After this PR, it will also be fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009147524


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   I think this might fail currently. I don't see `writer` being set to `null` if its checkpoint is aborted. Thus the following sequence I think will trigger this `IllegalStateException`.
   
   1. start checkpoint 42
   2. abort checkpoint 42
   3. start checkpoint 43



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   I think this `checkState` might be working now, only because we do not fail/reset writer if checkpoint is aborted. Otherwise this would fail when there is a race condition between
   1. receiving last of the checkpoint barriers from the input `ChannelStateWriter#finishInput` for a checkpoint `42` in `CheckpointBarrierHandler`
   2. aborting checkpoint `43` for example.
   
   
   Maybe the best equivalent of the old code would be something like:
   ```
   if (request.getCheckpointId() < ongoingCheckpointId) {
     // ignore obsolete request, do nothing
   } else if (request.getCheckpointId() == ongoingCheckpointId) {
     this.writer = null;
   } else { 
     throw new IllegalStateException("This should never happened, trying to completed checkpoint[%s] while the ongoingCheckpointId[%s] is older", ...);
   }
   ```
   
   Since the the code on the master also doesn't check if `writers.remove(42)` removed anything or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012587547


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   I was suggesting `abortedCheckpointId` (or maybe `boolean isAborted`?) instead of `maxAbortedCheckpointId` to simplify the code a bit. `SubtaskCheckpointCoordinator` should already take care of aborted checkpoints in advance. For example if the current checkpoint is `42`, and that's the checkpoint for which `ChannelStateWriter` is persisting data and `notifyCheckpointAborted(44)` arrives, `SubtaskCheckpointCoordinator` will ensure that `ChannelStateWriter#start(43)` and `ChannelStateWriter#start(44)` will never be called. So there is no need to duplicate this logic in `ChannelStateWriter`.
   
   But that's probably a small difference for the simplicity of the code. So I would be fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012728556


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   If checkpoint 42 is completed, the `ongoingCheckpointId=42` and `writer == null`.
   
   - `start(43)` has been added to the queue, but not be executed.
   - `abort(44)` add the queue head and tail
   
   When the dispatcher received the `abort(44)`, is the `abortedCheckpointId` 42?  Or the `isAborted` will be the true? 
    If yes, I prefer use `maxAbortedCheckpointId=44`. We can discard the `start(43)` directly. 
   
   
   Also, `maxAbortedCheckpointId` is more friendly to channel state file merging. Multiple subtasks will share the same `ChannelStateWriteRequestDispatcherImpl` in the future, the order of requests is not guaranteed between multiple subtasks.  A subtask's checkpoint 44 is aborted, and other subtasks should also be aborted here. But other subtasks may also `start(42)`.
   
   Do you think so?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1302018254

   > Maybe we should rename the ticket/pr/commit. This doesn't simplify the class anymore 😓 Maybe
   > 
   > > Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl
   > 
   > ?
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1295151769

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1302384607

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1302323850

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1008052894


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,19 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> this.writer = null);

Review Comment:
   shouldn't you check here if this is the correct writer? Could it happen that the `this.writer` got reseted for a newer checkpoint but this callback happens later, reseting the field back to `null`? If this can not happen, it would be nice to at least have a `checkState` to make sure it's not happening? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009156965


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   I think this `checkState` might be working now, only because we do not fail/reset writer if checkpoint is aborted. Otherwise this would fail when there is a race condition between
   1. receiving last of the checkpoint barriers from the input `ChannelStateWriter#finishInput` for a checkpoint `42` in `CheckpointBarrierHandler`
   2. aborting checkpoint `43` for example.
   
   If the 2. happens after 1., this `checkState` would fail, right?
   
   Maybe the best equivalent of the old code would be something like:
   ```
   if (request.getCheckpointId() < ongoingCheckpointId) {
     // ignore obsolete request, do nothing
   } else if (request.getCheckpointId() == ongoingCheckpointId) {
     this.writer = null;
   } else { 
     throw new IllegalStateException("This should never happened, trying to completed checkpoint[%s] while the ongoingCheckpointId[%s] is older", ...);
   }
   ```
   
   Since the the code on the master also doesn't check if `writers.remove(42)` removed anything or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1008188410


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,19 @@ private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> this.writer = null);

Review Comment:
   Thanks @pnowojski  help review.
   
   Good catch, I have added the `checkState` here. After I think about it, I don't think it can happen. 
   
   The call stack of this callback: `ChannelStateWriteRequestDispatcherImpl#dispatchInternal` -> `ChannelStateCheckpointWriter(writer)#completeInput/completeOutput` -> `complete` -> `doComplete.run()` -> `callback` -> `ChannelStateWriteRequestDispatcherImpl.writer = null`
   
   First of all, here is single thread. `writer#completeInput/completeOutput` will clean up itself.
   
   If there is a new writer in `ChannelStateWriteRequestDispatcherImpl`, the old writer's callback will not be called. In other words: only the current writer will call the callback and clean itself up.
   
   Please correct me if anything is wrong, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009276172


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   Hi @pnowojski , thanks for your review.
   
   I have added the `ChannelStateWriterImplTest.testAbortOldAndStartNewCheckpoint()`. It is: 
   
   1. start checkpoint 42
   2. abort checkpoint 42
   3. start checkpoint 43
   
   And checked that result42 should fail due to `abort exception` instead of `new IllegalStateException()`,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #21131: [FLINK-29730][checkpoint] Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl

Posted by GitBox <gi...@apache.org>.
pnowojski merged PR #21131:
URL: https://github.com/apache/flink/pull/21131


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1288125009

   Hi @pnowojski , please help take a look this PR in your free time.
   
   This PR can simplify the `ChannelStateWriteRequestDispatcherImpl`, and it is useful for FLINK-29730. 
   
   After your proposal in #20151 , `ChannelStateWriteRequestDispatcherImpl` is responsible for writing the channel state file for multiple subtasks. If the concurrent checkpoint isn't simplified, the code will be complex.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1297323237

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009230664


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   After step2 `abort checkpoint 42`, `writer.isDone()` will be true. So here won't throw exception during step3.
   
   BTW, `writer` cannot be set to `null` during abort checkpoint. There may be some `writeInput/writeOutput` after abort. If set to `null`, `req.onWriterMissing();` will throw exception.
   
   `writer` just be set to `null` when the checkpoint is completed, or `ChannelStateWriteRequestDispatcherImpl` failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012777571


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   I was thinking about a contract that `ChannelStateWriter` is allowed to ignore any request that's not addressed to the `ongoingCheckpointId`. So in your scenario `abort(44)` could be safely ignored. Once `SubtaskCheckpointCoordinator` decides to abort/complete checkpoint `43` (for example it can be due to `notifyCheckpointAborted(43)` rpc, checkpoint `43` completed, or checkpoint `45` starting (and super-seeding `43`) it would take care of calling `ChannelStateWriter#abort(43)`.
   
   > Also, maxAbortedCheckpointId is more friendly to channel state file merging.
   
   Ok, that might be a good argument to keep it as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #21131: [FLINK-29730][checkpoint] Do not support concurrent unaligned checkpoints in the ChannelStateWriteRequestDispatcherImpl

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #21131:
URL: https://github.com/apache/flink/pull/21131#issuecomment-1303100251

   Thanks, LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org