You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/03 06:00:05 UTC

[GitHub] [flink] 1996fanrui commented on a diff in pull request #21131: [FLINK-29730][checkpoint] Simplify the ChannelStateWriteRequestDispatcherImpl due to not supported concurrent unaligned checkpoint

1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012524837


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   Hi @pnowojski , thanks for your suggestions.
   
   I updated. I created the `CheckpointAbortRequest` and `maxAbortedCheckpointId`. I didn't use the `abortedCheckpointId`, because the abort request may be received in advance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org