You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/22 04:38:49 UTC

[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

yunfengzhou-hub commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r926407667


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -245,7 +262,13 @@ public void notifyCheckpointAborted(long checkpointId) {
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs fully in the
         // scheduler's main thread executor
-        mainThreadExecutor.execute(() -> coordinator.notifyCheckpointAborted(checkpointId));
+        mainThreadExecutor.execute(
+                () -> {
+                    subtaskGatewayMap
+                            .values()
+                            .forEach(x -> x.openGatewayAndUnmarkCheckpoint(checkpointId));
+                    coordinator.notifyCheckpointAborted(checkpointId);

Review Comment:
   As we are removing the `afterSourceBarrierInjection()` and the invocation of `openGatewayAndUnmarkCheckpoint()` in it, we need to add this method to the possible afterward methods, including `handleEventFromOperator` and `notifyCheckpointAborted`.
   
   During our offline discussion, we doubted whether the invocation of this method in `abortCurrentTriggering` would be enough. I found that `notifyCheckpointAborted` and `abortCurrentTriggering` might be invoked independently, so it is still necessary to invoke this method in `notifyCheckpointAborted`. I have added test cases in `CoordinatorEventsExactlyOnceTest` to verify this situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org