You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/09 14:44:18 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

1996fanrui opened a new pull request, #20233:
URL: https://github.com/apache/flink/pull/20233

   ## What is the purpose of the change
   
   Fix the bug ChannelStateWriteResult might not fail after checkpoint abort.
   
   ## Brief change log
   
   Call the ChannelStateWriter.abort(checkpointId, exception, true) when checkpointId is aborted within checkpointState.  
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *SubtaskCheckpointCoordinatorTest#testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1269885153

   Thanks @1996fanrui for the update LGTM % last final minor issue that I think came back after reverting to the earlier version:
   https://github.com/apache/flink/pull/20233#discussion_r988921400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1183819033

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1269936458

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920121363


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -316,6 +316,10 @@ public void checkpointState(
             // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
             // checkpoint barrier align.
             operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
+            channelStateWriter.abort(
+                    metadata.getCheckpointId(),
+                    new CancellationException("checkpoint aborted via notification"),
+                    true);

Review Comment:
   Hi @pnowojski , thanks for your review. 
   
   I don't know why don't execute it here. 
   
   `checkAndClearAbortedStatus` is called twice:
   - The first is `if (lastCheckpointId >= metadata.getCheckpointId())`, and execute abort and checkAndClearAbortedStatus there.
   - The second is here, I guess we should abort here, because `checkAndClearAbortedStatus(metadata.getCheckpointId()) == true` means the checkpointId is aborted, we can call `channelStateWriter.abort` too.
   
   And I think the new unit test can reproduce this leak, that is, ChannelStateWriteResult might not fail after checkpoint abort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r923185089


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Hi @pnowojski 
   
   From the comment, we can get that the `ConcurrentMap<Long, ChannelStateWriteResult> results;` saves ongoingCheckpoints and abortedCheckpoints. 
   
   But the checkpoints that were aborted before start were not saved. We want aborted checkpoints to ignore start. So I introduced the ` NavigableSet<Long> abortedCheckpointIds;` to save all aborted checkpointIds. 
   
   - If checkpointId is aborted before start, it will be saved into this set. 
   - When the start is called, if checkpointId in this set, the start should be ignored.
   
   And introduced the `long ongoingCheckpointId;` to save the ongoing checkpointId. Because the concurrent unaligned checkpoint isn't supported, so the `long` is enough. And checkpoints with checkpointId less than `ongoingCheckpointId` should be aborted. So I remove these checkpointIds when the ongoingCheckpointId is updated. 
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r923351868


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Hi @pnowojski , thanks for your review.
   
   The `checkpointId < nextExpectedCheckpointId` may happen. For example, `AlternatingWaitingForFirstBarrierUnaligned#barrierReceived` don't check whether checkpointId is aborted, if received a barrier, it will call the ChannelStateWriter.start().
   
   - If `checkpointId < ongoingCheckpointId`, it should throw exception.
   - If `checkpointId >= ongoingCheckpointId && checkpointId < nextExpectedCheckpointId` that is `checkpointId >= ongoingCheckpointId && abortedCheckpointIds.contains(checkpointId)`,  it should ignore the checkpoint.
   - If `checkpointId >= ongoingCheckpointId && !abortedCheckpointIds.contains(checkpointId)`, the ongoingCheckpointId should  be updated, and start this checkpoint.
   
   `checkpointId >= ongoingCheckpointId && checkpointId < nextExpectedCheckpointId` and `checkpointId < ongoingCheckpointId` are processed differently. So, we should use `ongoingCheckpointId`, and just update it within `ChannelStateWriterImpl#start`
   
   We should ignore checkpoint when the checkpointId is aborted, so we  need the `NavigableSet<Long> abortedCheckpointIds;`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1179643941

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski merged PR #20233:
URL: https://github.com/apache/flink/pull/20233


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988603332


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -209,6 +213,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
         this.prepareInputSnapshot = prepareInputSnapshot;
         this.abortedCheckpointIds =
                 createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
+        this.maxRecordAbortedCheckpoints = maxRecordAbortedCheckpoints;

Review Comment:
   If so, I can fix it. I should fix it in this pr and using a new commit, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r930705894


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -120,15 +146,55 @@ public void testAbortIgnoresMissing() throws Exception {
         runWithSyncWorker(this::callAbort);
     }
 
+    @Test
+    public void testCheckpointAbortBeforeStart() throws Exception {
+        SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor();
+        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, worker);
+        writer.open();
+        callAbort(writer);
+
+        // It should be started.

Review Comment:
   nit: this comment is a bit miss leading. `it should be started` -> `start after abort should be ignored`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1183481165

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920072629


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -568,6 +572,53 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        MockEnvironment mockEnvironment = MockEnvironment.builder().build();

Review Comment:
   wrap with `try-with-resource`?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -316,6 +316,10 @@ public void checkpointState(
             // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
             // checkpoint barrier align.
             operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
+            channelStateWriter.abort(
+                    metadata.getCheckpointId(),
+                    new CancellationException("checkpoint aborted via notification"),
+                    true);

Review Comment:
   I think you can not relay on this code to be executed. `checkAndClearAbortedStatus` is using best effort `abortedCheckpointIds` set, which can be for example pruned if it grows too large. So if FLINK-26803 doesn't work without this fix, we have to deal with it differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r923351868


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Hi @pnowojski , thanks for your review.
   
   The `checkpointId < nextExpectedCheckpointId` may happen. For example, `AlternatingWaitingForFirstBarrierUnaligned#barrierReceived` don't check whether checkpointId is aborted, if received a barrier, it will call the ChannelStateWriter.start().
   
   So, we should use `ongoingCheckpointId`, and just update it within `ChannelStateWriterImpl#start`
   
   We should ignore checkpoint when the checkpointId is aborted, so we  need the `NavigableSet<Long> abortedCheckpointIds;`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    @GuardedBy("lock")
+    private final NavigableSet<Long> abortedCheckpointIds;
+
+    // The result of ongoingCheckpointId, the checkpoint that CheckpointId is less than
+    // ongoingCheckpointId should be aborted due to concurrent unaligned checkpoint is currently not
+    // supported.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;
 
     /**
      * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default}
      * {@link ChannelStateSerializer}, and a {@link ChannelStateWriteRequestExecutorImpl}.
      *
      * @param taskName
      * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint
-     * @param maxCheckpoints maximum number of checkpoints to be written currently or finished but
-     *     not taken yet.
      */
-    ChannelStateWriterImpl(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStorageWorkerView streamFactoryResolver,
-            int maxCheckpoints) {
+    public ChannelStateWriterImpl(
+            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
         this(
                 taskName,
-                new ConcurrentHashMap<>(maxCheckpoints),
                 new ChannelStateWriteRequestExecutorImpl(
                         taskName,
                         new ChannelStateWriteRequestDispatcherImpl(
                                 taskName,
                                 subtaskIndex,
                                 streamFactoryResolver,
-                                new ChannelStateSerializerImpl())),
-                maxCheckpoints);
+                                new ChannelStateSerializerImpl())));
     }
 
-    ChannelStateWriterImpl(
-            String taskName,
-            ConcurrentMap<Long, ChannelStateWriteResult> results,
-            ChannelStateWriteRequestExecutor executor,
-            int maxCheckpoints) {
+    ChannelStateWriterImpl(String taskName, ChannelStateWriteRequestExecutor executor) {
         this.taskName = taskName;
-        this.results = results;
-        this.maxCheckpoints = maxCheckpoints;
         this.executor = executor;
+        this.abortedCheckpointIds = new TreeSet<>();
+        this.ongoingCheckpointId = 0;
     }
 
     @Override
     public void start(long checkpointId, CheckpointOptions checkpointOptions) {
         LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
-        ChannelStateWriteResult result = new ChannelStateWriteResult();
-        ChannelStateWriteResult put =
-                results.computeIfAbsent(
+        synchronized (lock) {
+            if (isUnavailableCheckpoint(checkpointId)) {
+                LOG.debug(
+                        "The checkpoint {} of task {} has been aborted, so don't start.",
                         checkpointId,
-                        id -> {
-                            Preconditions.checkState(
-                                    results.size() < maxCheckpoints,
-                                    String.format(
-                                            "%s can't start %d, results.size() > maxCheckpoints: %d > %d",
-                                            taskName,
-                                            checkpointId,
-                                            results.size(),
-                                            maxCheckpoints));
-                            enqueue(
-                                    new CheckpointStartRequest(
-                                            checkpointId,
-                                            result,
-                                            checkpointOptions.getTargetLocation()),
-                                    false);
-                            return result;
-                        });
-        Preconditions.checkArgument(
-                put == result,
-                taskName + " result future already present for checkpoint " + checkpointId);
+                        taskName);

Review Comment:
   It may happen. For example, `AlternatingWaitingForFirstBarrierUnaligned#barrierReceived` don't check whether checkpointId is aborted, if received a barrier, it will call the ChannelStateWriter.start().
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    @GuardedBy("lock")
+    private final NavigableSet<Long> abortedCheckpointIds;
+
+    // The result of ongoingCheckpointId, the checkpoint that CheckpointId is less than
+    // ongoingCheckpointId should be aborted due to concurrent unaligned checkpoint is currently not
+    // supported.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1186551517

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r989031922


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -569,6 +571,58 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        String taskName = "test";
+        try (MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+                ChannelStateWriterImpl writer =
+                        new ChannelStateWriterImpl(taskName, 0, getStreamFactoryFactory());
+                SubtaskCheckpointCoordinator coordinator =
+                        new SubtaskCheckpointCoordinatorImpl(
+                                new TestCheckpointStorageWorkerView(100),
+                                taskName,
+                                StreamTaskActionExecutor.IMMEDIATE,
+                                newDirectExecutorService(),
+                                new DummyEnvironment(),
+                                (unused1, unused2) -> {},
+                                (unused1, unused2) -> CompletableFuture.completedFuture(null),
+                                128,
+                                writer,
+                                true,
+                                (callable, duration) -> () -> {})) {
+            writer.open();
+            final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment);
+            int checkpointId = 1;
+            // Abort checkpoint 1
+            coordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
+
+            coordinator.initInputsCheckpoint(
+                    checkpointId,
+                    CheckpointOptions.unaligned(
+                            CheckpointType.CHECKPOINT,
+                            CheckpointStorageLocationReference.getDefault()));
+            ChannelStateWriter.ChannelStateWriteResult writeResult =
+                    writer.getWriteResult(checkpointId);
+            assertNotNull(writeResult);
+            assertFalse(writeResult.isDone());
+            assertFalse(writeResult.getInputChannelStateHandles().isCompletedExceptionally());
+            assertFalse(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally());
+
+            coordinator.checkpointState(
+                    new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
+                    CheckpointOptions.forCheckpointWithDefaultLocation(),
+                    new CheckpointMetricsBuilder(),
+                    operatorChain,
+                    false,
+                    () -> true);
+            assertNull(writer.getWriteResult(checkpointId));
+            TimeUnit.MILLISECONDS.sleep(10);

Review Comment:
   One more thing. Can we get rid of this sleep? It's very likely it will be failing from time to time in the azure CI.
   
   It looks like we can replace it with
   ```
   writeResult.get();
   ```
   where
   
   ```
   @VisibleForTesting
   public void ChannelStateWriter#get() {
     inputChannelStateHandles.get();
     resultSubpartitionStateHandles.get();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988637888


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -209,6 +213,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
         this.prepareInputSnapshot = prepareInputSnapshot;
         this.abortedCheckpointIds =
                 createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
+        this.maxRecordAbortedCheckpoints = maxRecordAbortedCheckpoints;

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r989067790


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -569,6 +571,58 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        String taskName = "test";
+        try (MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+                ChannelStateWriterImpl writer =
+                        new ChannelStateWriterImpl(taskName, 0, getStreamFactoryFactory());
+                SubtaskCheckpointCoordinator coordinator =
+                        new SubtaskCheckpointCoordinatorImpl(
+                                new TestCheckpointStorageWorkerView(100),
+                                taskName,
+                                StreamTaskActionExecutor.IMMEDIATE,
+                                newDirectExecutorService(),
+                                new DummyEnvironment(),
+                                (unused1, unused2) -> {},
+                                (unused1, unused2) -> CompletableFuture.completedFuture(null),
+                                128,
+                                writer,
+                                true,
+                                (callable, duration) -> () -> {})) {
+            writer.open();
+            final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment);
+            int checkpointId = 1;
+            // Abort checkpoint 1
+            coordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
+
+            coordinator.initInputsCheckpoint(
+                    checkpointId,
+                    CheckpointOptions.unaligned(
+                            CheckpointType.CHECKPOINT,
+                            CheckpointStorageLocationReference.getDefault()));
+            ChannelStateWriter.ChannelStateWriteResult writeResult =
+                    writer.getWriteResult(checkpointId);
+            assertNotNull(writeResult);
+            assertFalse(writeResult.isDone());
+            assertFalse(writeResult.getInputChannelStateHandles().isCompletedExceptionally());
+            assertFalse(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally());
+
+            coordinator.checkpointState(
+                    new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
+                    CheckpointOptions.forCheckpointWithDefaultLocation(),
+                    new CheckpointMetricsBuilder(),
+                    operatorChain,
+                    false,
+                    () -> true);
+            assertNull(writer.getWriteResult(checkpointId));
+            TimeUnit.MILLISECONDS.sleep(10);

Review Comment:
   Thanks for your reminder. The unit test will fail when using the `ChannelStateWriter#get()` directly.
   
   `writeResult.getInputChannelStateHandles()` and `writeResult.getResultSubpartitionStateHandles()` are completed exceptionally, so the get will throw exception.
   
   I replace it with `ChannelStateWriter#waitForDone()`, it just wait for `inputChannelStateHandles` and `resultSubpartitionStateHandles` are done, and ignore any exception.
   
   ```
   @VisibleForTesting
   public void waitForDone() {
       try {
           inputChannelStateHandles.get();
       } catch (Throwable ignored) {
       }
       try {
           resultSubpartitionStateHandles.get();
       } catch (Throwable ignored) {
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920210091


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -316,6 +316,10 @@ public void checkpointState(
             // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
             // checkpoint barrier align.
             operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
+            channelStateWriter.abort(
+                    metadata.getCheckpointId(),
+                    new CancellationException("checkpoint aborted via notification"),
+                    true);

Review Comment:
   I think there is no harm in executing the abort call here, but it's still something like "best effort". Take a look at the `SubtaskCheckpointCoordinatorImpl#createAbortedCheckpointSetWithLimitSize` way `abortedCheckpointIds` set is constructed. If there are many checkpoints failures, the set will be pruned, and this code here won't be triggered: `channelStateWriter.abort` won't be called.
   
   What exact issue was this bug causing in FLINK-26803 and what were the symptoms? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r930700904


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -157,6 +151,14 @@ public void addInputData(
                 checkpointId,
                 info,
                 startSeqNum);
+        if (isCheckpointSubsumedOrAborted(checkpointId)) {
+            LOG.debug(
+                    "The checkpoint {} of task {} has been aborted, so don't addInputData.",
+                    checkpointId,
+                    taskName);
+            closeBuffers(iterator);
+            return;
+        }
         enqueue(write(checkpointId, info, iterator), false);

Review Comment:
   What will happen if there is a race condition between this `addInputData` call from the netty threads, and for example `abort` call?
   
   1. (Netty thread) calling `addInputData`
   2. (Netty thread) checking `isCheckpointSubsumedOrAborted`, method returns false
   3. (Netty thread) freezes for a 1ms
   4. (Task thread) calls `abort()`
   5. (Netty thread) wakes up and finishes `addInputData` method by calling `enqueue(write(checkpointId, info, iterator), false);`
   
   Will this lead to a resource leak? If not, then why do we need `if (isCheckpointSubsumedOrAborted(checkpointId))` check here in the first place? `isCheckpointSubsumedOrAborted` is quite costly due to lock acquisition after all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r930700904


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -157,6 +151,14 @@ public void addInputData(
                 checkpointId,
                 info,
                 startSeqNum);
+        if (isCheckpointSubsumedOrAborted(checkpointId)) {
+            LOG.debug(
+                    "The checkpoint {} of task {} has been aborted, so don't addInputData.",
+                    checkpointId,
+                    taskName);
+            closeBuffers(iterator);
+            return;
+        }
         enqueue(write(checkpointId, info, iterator), false);

Review Comment:
   What will happen if there is a race condition between this `addInputData` call from the netty threads, and for example `abort` call?
   
   1. (Netty thread) calling `addInputData`
   2. (Netty thread) checking `isCheckpointSubsumedOrAborted`, method returns false
   3. (Netty thread) freezes for a 1ms
   4. (Task thread) calls `abort()`
   5. (Netty thread) wakes up and finishes `addInputData` method by calling `enqueue(write(checkpointId, info, iterator), false);`
   
   Will this lead to a resource leak? If not, then why do we need `if (isCheckpointSubsumedOrAborted(checkpointId))` check here in the first place? `isCheckpointSubsumedOrAborted` might be quite costly due to lock acquisition after all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988921400


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -568,6 +572,53 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        MockEnvironment mockEnvironment = MockEnvironment.builder().build();

Review Comment:
   I think this issue came back up again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988587911


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -312,10 +317,15 @@ public void checkpointState(
         // Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint
         // if necessary.
         lastCheckpointId = metadata.getCheckpointId();
-        if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+        if (checkAndClearAbortedStatus(metadata.getCheckpointId())
+                || isAbortedAndRemovedCheckpointId(metadata.getCheckpointId())) {

Review Comment:
   My reasoning was that after inlining, we would have the logic of checking the set `abortedCheckpointIds` and `maxAbortedCheckpointId` very close together - as they should be, since they are very tightly coupled.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920275634


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -568,6 +572,53 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        MockEnvironment mockEnvironment = MockEnvironment.builder().build();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988586042


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -209,6 +213,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
         this.prepareInputSnapshot = prepareInputSnapshot;
         this.abortedCheckpointIds =
                 createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
+        this.maxRecordAbortedCheckpoints = maxRecordAbortedCheckpoints;

Review Comment:
   > Could we always use the max concurrent checkpoints and remove the SubtaskCheckpointCoordinatorImpl#DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS?
   
   I think so.
   
   > why need the SubtaskCheckpointCoordinatorImpl#DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS? Why don't use the max concurrent checkpoints here?
   
   Probably because when it was being implemented by Myasuka (Yun Tang) and reviewed by me 2 years ago, we haven't figured out this trick with using `max concurrent checkpoints` :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r925403222


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,73 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    // The checkpoint that checkpointId is less than or equal to maxAbortedCheckpointId should be
+    // aborted.
+    @GuardedBy("lock")
+    private long maxAbortedCheckpointId;
+
+    // The channel state write result of ongoingCheckpointId.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;
 
     /**
      * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default}
      * {@link ChannelStateSerializer}, and a {@link ChannelStateWriteRequestExecutorImpl}.
      *
      * @param taskName
      * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint
-     * @param maxCheckpoints maximum number of checkpoints to be written currently or finished but
-     *     not taken yet.
      */
-    ChannelStateWriterImpl(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStorageWorkerView streamFactoryResolver,
-            int maxCheckpoints) {
+    public ChannelStateWriterImpl(
+            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
         this(
                 taskName,
-                new ConcurrentHashMap<>(maxCheckpoints),
                 new ChannelStateWriteRequestExecutorImpl(
                         taskName,
                         new ChannelStateWriteRequestDispatcherImpl(
                                 taskName,
                                 subtaskIndex,
                                 streamFactoryResolver,
-                                new ChannelStateSerializerImpl())),
-                maxCheckpoints);
+                                new ChannelStateSerializerImpl())));
     }
 
-    ChannelStateWriterImpl(
-            String taskName,
-            ConcurrentMap<Long, ChannelStateWriteResult> results,
-            ChannelStateWriteRequestExecutor executor,
-            int maxCheckpoints) {
+    ChannelStateWriterImpl(String taskName, ChannelStateWriteRequestExecutor executor) {
         this.taskName = taskName;
-        this.results = results;
-        this.maxCheckpoints = maxCheckpoints;
         this.executor = executor;
+        this.ongoingCheckpointId = 0;
+        this.maxAbortedCheckpointId = 0;
     }
 
     @Override
     public void start(long checkpointId, CheckpointOptions checkpointOptions) {
         LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
-        ChannelStateWriteResult result = new ChannelStateWriteResult();
-        ChannelStateWriteResult put =
-                results.computeIfAbsent(
+        synchronized (lock) {
+            Preconditions.checkState(checkpointId > ongoingCheckpointId);
+            if (isCheckpointSubsumedOrAborted(checkpointId)) {
+                LOG.debug(
+                        "The checkpoint {} of task {} has been aborted, so don't start.",
                         checkpointId,
-                        id -> {
-                            Preconditions.checkState(
-                                    results.size() < maxCheckpoints,
-                                    String.format(
-                                            "%s can't start %d, results.size() > maxCheckpoints: %d > %d",
-                                            taskName,
-                                            checkpointId,
-                                            results.size(),
-                                            maxCheckpoints));
-                            enqueue(
-                                    new CheckpointStartRequest(
-                                            checkpointId,
-                                            result,
-                                            checkpointOptions.getTargetLocation()),
-                                    false);
-                            return result;
-                        });
-        Preconditions.checkArgument(
-                put == result,
-                taskName + " result future already present for checkpoint " + checkpointId);
+                        taskName);
+                return;
+            }
+            if (result != null) {
+                result.fail(new CancellationException("Cancel old checkpoint."));

Review Comment:
   @pnowojski , thanks for your reminder.
   
   It's a good advice. The fail will close stream, it may be slow, so it should hold the lock.
   
   I prefer `enqueue(ChannelStateWriteRequest.abort(...));`, if there is any problem with fail it should not affect the task thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r923305444


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -208,19 +238,42 @@ public void abort(long checkpointId, Throwable cause, boolean cleanup) {
         enqueue(
                 ChannelStateWriteRequest.abort(checkpointId, cause),
                 false); // abort enqueued but not started
-        if (cleanup) {
-            results.remove(checkpointId);
+        synchronized (lock) {
+            if (checkpointId >= ongoingCheckpointId) {
+                abortedCheckpointIds.add(checkpointId);
+            }
+            if (cleanup && checkpointId == ongoingCheckpointId) {
+                result = null;
+            }
         }
     }
 
     @Override
     public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
         LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
-        ChannelStateWriteResult result = results.remove(checkpointId);
-        Preconditions.checkArgument(
-                result != null,
-                taskName + " channel state write result not found for checkpoint " + checkpointId);
-        return result;
+        ChannelStateWriteResult returnResult;
+        synchronized (lock) {
+            Preconditions.checkState(checkpointId == ongoingCheckpointId);
+            returnResult = result;
+            result = null;
+        }
+        if (returnResult == null) {
+            Preconditions.checkArgument(
+                    isUnavailableCheckpoint(checkpointId),
+                    taskName
+                            + " channel state write result not found for checkpoint "
+                            + checkpointId);
+            returnResult = new ChannelStateWriteResult();
+            returnResult.fail(new CancellationException("The checkpoint is aborted."));
+        }
+        return returnResult;
+    }
+
+    private boolean isUnavailableCheckpoint(long checkpointId) {

Review Comment:
   `isUnavailableCheckpoint` -> `isCheckpointSubsumedOrAborted` or `isValidCheckpoint`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -208,19 +238,42 @@ public void abort(long checkpointId, Throwable cause, boolean cleanup) {
         enqueue(
                 ChannelStateWriteRequest.abort(checkpointId, cause),
                 false); // abort enqueued but not started
-        if (cleanup) {
-            results.remove(checkpointId);
+        synchronized (lock) {
+            if (checkpointId >= ongoingCheckpointId) {
+                abortedCheckpointIds.add(checkpointId);
+            }
+            if (cleanup && checkpointId == ongoingCheckpointId) {
+                result = null;
+            }
         }
     }
 
     @Override
     public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
         LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
-        ChannelStateWriteResult result = results.remove(checkpointId);
-        Preconditions.checkArgument(
-                result != null,
-                taskName + " channel state write result not found for checkpoint " + checkpointId);
-        return result;
+        ChannelStateWriteResult returnResult;
+        synchronized (lock) {
+            Preconditions.checkState(checkpointId == ongoingCheckpointId);
+            returnResult = result;
+            result = null;
+        }
+        if (returnResult == null) {
+            Preconditions.checkArgument(
+                    isUnavailableCheckpoint(checkpointId),
+                    taskName
+                            + " channel state write result not found for checkpoint "
+                            + checkpointId);
+            returnResult = new ChannelStateWriteResult();
+            returnResult.fail(new CancellationException("The checkpoint is aborted."));
+        }

Review Comment:
   Why can this be `null`? And if it happens, why are you creating a new empty `ChannelStateWriteResult` just to fail it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    @GuardedBy("lock")
+    private final NavigableSet<Long> abortedCheckpointIds;
+
+    // The result of ongoingCheckpointId, the checkpoint that CheckpointId is less than
+    // ongoingCheckpointId should be aborted due to concurrent unaligned checkpoint is currently not
+    // supported.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;
 
     /**
      * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default}
      * {@link ChannelStateSerializer}, and a {@link ChannelStateWriteRequestExecutorImpl}.
      *
      * @param taskName
      * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint
-     * @param maxCheckpoints maximum number of checkpoints to be written currently or finished but
-     *     not taken yet.
      */
-    ChannelStateWriterImpl(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStorageWorkerView streamFactoryResolver,
-            int maxCheckpoints) {
+    public ChannelStateWriterImpl(
+            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
         this(
                 taskName,
-                new ConcurrentHashMap<>(maxCheckpoints),
                 new ChannelStateWriteRequestExecutorImpl(
                         taskName,
                         new ChannelStateWriteRequestDispatcherImpl(
                                 taskName,
                                 subtaskIndex,
                                 streamFactoryResolver,
-                                new ChannelStateSerializerImpl())),
-                maxCheckpoints);
+                                new ChannelStateSerializerImpl())));
     }
 
-    ChannelStateWriterImpl(
-            String taskName,
-            ConcurrentMap<Long, ChannelStateWriteResult> results,
-            ChannelStateWriteRequestExecutor executor,
-            int maxCheckpoints) {
+    ChannelStateWriterImpl(String taskName, ChannelStateWriteRequestExecutor executor) {
         this.taskName = taskName;
-        this.results = results;
-        this.maxCheckpoints = maxCheckpoints;
         this.executor = executor;
+        this.abortedCheckpointIds = new TreeSet<>();
+        this.ongoingCheckpointId = 0;
     }
 
     @Override
     public void start(long checkpointId, CheckpointOptions checkpointOptions) {
         LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
-        ChannelStateWriteResult result = new ChannelStateWriteResult();
-        ChannelStateWriteResult put =
-                results.computeIfAbsent(
+        synchronized (lock) {
+            if (isUnavailableCheckpoint(checkpointId)) {
+                LOG.debug(
+                        "The checkpoint {} of task {} has been aborted, so don't start.",
                         checkpointId,
-                        id -> {
-                            Preconditions.checkState(
-                                    results.size() < maxCheckpoints,
-                                    String.format(
-                                            "%s can't start %d, results.size() > maxCheckpoints: %d > %d",
-                                            taskName,
-                                            checkpointId,
-                                            results.size(),
-                                            maxCheckpoints));
-                            enqueue(
-                                    new CheckpointStartRequest(
-                                            checkpointId,
-                                            result,
-                                            checkpointOptions.getTargetLocation()),
-                                    false);
-                            return result;
-                        });
-        Preconditions.checkArgument(
-                put == result,
-                taskName + " result future already present for checkpoint " + checkpointId);
+                        taskName);

Review Comment:
   Throw an exception instead?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Why do you need this `NavigableSet<Long> abortedCheckpointIds;` and `ongoingCheckpointId`? Wouldn't `long nextExpectedCheckpointId` be enough on it's own?
   
   ```
   abort(long checkpointId) {
     nextExpectedCheckpointId = max(nextExpectedCheckpointId, checkpointId + 1);
     ...
   }
   
   start(long checkpointId) {
     checkState(checkpointId >= nextExpectedCheckpointId, "Unexpected next checkpoint. Concurrent checkpoints are not supported");
   
     nextExpectedCheckpointId = max(nextExpectedCheckpointId, checkpointId + 1);
     ...
   }
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -68,6 +69,15 @@ public void testAddEventBuffer() throws Exception {
         }
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testStartMultipleTimes() throws Exception {
+        runWithSyncWorker(
+                writer -> {
+                    callStart(writer);
+                    callStart(writer);
+                });

Review Comment:
   Aren't we missing a unit test for:
   ```
   1.
   
   writer.start(44);
   writer.start(42); //expected exception
   ```
   ? Can you also squash the commit adding unit tests with the previous commit that was implementing the change?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -157,6 +149,14 @@ public void addInputData(
                 checkpointId,
                 info,
                 startSeqNum);
+        if (isUnavailableCheckpoint(checkpointId)) {
+            LOG.debug(
+                    "The checkpoint {} of task {} has been aborted, so don't addInputData.",
+                    checkpointId,
+                    taskName);
+            enqueue(closeBuffers(checkpointId, iterator), false);

Review Comment:
   If we are closing the buffers, why do we need to enqueue this as a request? Can not we just close the buffers directly here (and in other methods in this class)?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    @GuardedBy("lock")
+    private final NavigableSet<Long> abortedCheckpointIds;
+
+    // The result of ongoingCheckpointId, the checkpoint that CheckpointId is less than
+    // ongoingCheckpointId should be aborted due to concurrent unaligned checkpoint is currently not
+    // supported.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;

Review Comment:
   Why do you need this lock? (This class is supposed to be owned and used only by the task thread)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1179556655

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8b440b3ab0b604fe6184075ad704b76d0811a3d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8b440b3ab0b604fe6184075ad704b76d0811a3d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8b440b3ab0b604fe6184075ad704b76d0811a3d8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1183909068

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1183464593

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r923351868


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Hi @pnowojski , thanks for your review.
   
   The `checkpointId < nextExpectedCheckpointId` may happen. For example, `AlternatingWaitingForFirstBarrierUnaligned#barrierReceived` don't check whether checkpointId is aborted, if received a barrier, it will call the ChannelStateWriter.start().
   
   - If `checkpointId < ongoingCheckpointId`, it should throw exception.
   - If `checkpointId < nextExpectedCheckpointId` that is `checkpointId >= ongoingCheckpointId && abortedCheckpointIds.contains(checkpointId)`,  it should ignore the checkpoint.
   - If `checkpointId >= ongoingCheckpointId && !abortedCheckpointIds.contains(checkpointId)`, the ongoingCheckpointId should  be updated, and start this checkpoint.
   
   So, we should use `ongoingCheckpointId`, and just update it within `ChannelStateWriterImpl#start`
   
   We should ignore checkpoint when the checkpointId is aborted, so we  need the `NavigableSet<Long> abortedCheckpointIds;`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920260335


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -316,6 +316,10 @@ public void checkpointState(
             // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
             // checkpoint barrier align.
             operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
+            channelStateWriter.abort(
+                    metadata.getCheckpointId(),
+                    new CancellationException("checkpoint aborted via notification"),
+                    true);

Review Comment:
   Thanks for your clarification and reminder, this is really a bug. 
   
   I have updated this PR that fixed two other places where remove aborted checkpoint.
   
   
   > What exact issue was this bug causing in [FLINK-26803](https://issues.apache.org/jira/browse/FLINK-26803) and what were the symptoms?
   
   The bug will affect the new feature [FLINK-26803](https://issues.apache.org/jira/browse/FLINK-26803), because the channel state file can be closed only after the Checkpoints of all tasks of the shared file are complete or abort. So when the checkpoint of some tasks fails, if abort is not called, the file cannot be closed and all tasks sharing the file cannot execute `inputChannelStateHandles.completeExceptionally(e);` and `resultSubpartitionStateHandles.completeExceptionally(e);` , these AsyncCheckpointRunnable of shared tasks will wait forever.
   
   If the channel state file is not shared between tasks, the current task will not create an AsyncCheckpointRunnable, so the current task and other tasks will not be affected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r924252841


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    @GuardedBy("lock")
+    private final NavigableSet<Long> abortedCheckpointIds;
+
+    // The result of ongoingCheckpointId, the checkpoint that CheckpointId is less than
+    // ongoingCheckpointId should be aborted due to concurrent unaligned checkpoint is currently not
+    // supported.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;

Review Comment:
   Ops, sorry. I've mislead you. Now I remember it was supposed to be task owned class, but as a result of some discussions it ended up being used from the Netty thread (`addInputData` called from `ChannelStatePersister#maybePersist`). That's also why we needed `ConcurrentMap` here before.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Thanks for the explanation. But in that case, why do we need `abortedCheckpointIds` set? Couldn't we have just `long ongoingCheckpointId` and `long nextExpectedCheckpointId`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r924367855


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)

Review Comment:
   Hi @pnowojski , great idea! 
   
   Updated, but I used maxAbortedCheckpointId instead of nextExpectedCheckpointId. It should be clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1187561410

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1189732263

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r925283630


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,73 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    // The checkpoint that checkpointId is less than or equal to maxAbortedCheckpointId should be
+    // aborted.
+    @GuardedBy("lock")
+    private long maxAbortedCheckpointId;
+
+    // The channel state write result of ongoingCheckpointId.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;
 
     /**
      * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default}
      * {@link ChannelStateSerializer}, and a {@link ChannelStateWriteRequestExecutorImpl}.
      *
      * @param taskName
      * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint
-     * @param maxCheckpoints maximum number of checkpoints to be written currently or finished but
-     *     not taken yet.
      */
-    ChannelStateWriterImpl(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStorageWorkerView streamFactoryResolver,
-            int maxCheckpoints) {
+    public ChannelStateWriterImpl(
+            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
         this(
                 taskName,
-                new ConcurrentHashMap<>(maxCheckpoints),
                 new ChannelStateWriteRequestExecutorImpl(
                         taskName,
                         new ChannelStateWriteRequestDispatcherImpl(
                                 taskName,
                                 subtaskIndex,
                                 streamFactoryResolver,
-                                new ChannelStateSerializerImpl())),
-                maxCheckpoints);
+                                new ChannelStateSerializerImpl())));
     }
 
-    ChannelStateWriterImpl(
-            String taskName,
-            ConcurrentMap<Long, ChannelStateWriteResult> results,
-            ChannelStateWriteRequestExecutor executor,
-            int maxCheckpoints) {
+    ChannelStateWriterImpl(String taskName, ChannelStateWriteRequestExecutor executor) {
         this.taskName = taskName;
-        this.results = results;
-        this.maxCheckpoints = maxCheckpoints;
         this.executor = executor;
+        this.ongoingCheckpointId = 0;
+        this.maxAbortedCheckpointId = 0;
     }
 
     @Override
     public void start(long checkpointId, CheckpointOptions checkpointOptions) {
         LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
-        ChannelStateWriteResult result = new ChannelStateWriteResult();
-        ChannelStateWriteResult put =
-                results.computeIfAbsent(
+        synchronized (lock) {
+            Preconditions.checkState(checkpointId > ongoingCheckpointId);
+            if (isCheckpointSubsumedOrAborted(checkpointId)) {
+                LOG.debug(
+                        "The checkpoint {} of task {} has been aborted, so don't start.",
                         checkpointId,
-                        id -> {
-                            Preconditions.checkState(
-                                    results.size() < maxCheckpoints,
-                                    String.format(
-                                            "%s can't start %d, results.size() > maxCheckpoints: %d > %d",
-                                            taskName,
-                                            checkpointId,
-                                            results.size(),
-                                            maxCheckpoints));
-                            enqueue(
-                                    new CheckpointStartRequest(
-                                            checkpointId,
-                                            result,
-                                            checkpointOptions.getTargetLocation()),
-                                    false);
-                            return result;
-                        });
-        Preconditions.checkArgument(
-                put == result,
-                taskName + " result future already present for checkpoint " + checkpointId);
+                        taskName);
+                return;
+            }
+            if (result != null) {
+                result.fail(new CancellationException("Cancel old checkpoint."));

Review Comment:
   I think the exception should be `new CheckpointException(CheckpointFailureReason#CHECKPOINT_DECLINED_SUBSUMED)`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,73 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-    private static final int DEFAULT_MAX_CHECKPOINTS =
-            1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via
-    // mailbox)
 
     private final String taskName;
     private final ChannelStateWriteRequestExecutor executor;
-    private final ConcurrentMap<Long, ChannelStateWriteResult> results;
-    private final int maxCheckpoints;
 
-    /**
-     * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link
-     * #maxCheckpoints}.
-     */
-    public ChannelStateWriterImpl(
-            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
-        this(taskName, subtaskIndex, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
-    }
+    private final Object lock = new Object();
+
+    @GuardedBy("lock")
+    private long ongoingCheckpointId;
+
+    // The checkpoint that checkpointId is less than or equal to maxAbortedCheckpointId should be
+    // aborted.
+    @GuardedBy("lock")
+    private long maxAbortedCheckpointId;
+
+    // The channel state write result of ongoingCheckpointId.
+    @GuardedBy("lock")
+    private ChannelStateWriteResult result;
 
     /**
      * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default}
      * {@link ChannelStateSerializer}, and a {@link ChannelStateWriteRequestExecutorImpl}.
      *
      * @param taskName
      * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint
-     * @param maxCheckpoints maximum number of checkpoints to be written currently or finished but
-     *     not taken yet.
      */
-    ChannelStateWriterImpl(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStorageWorkerView streamFactoryResolver,
-            int maxCheckpoints) {
+    public ChannelStateWriterImpl(
+            String taskName, int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver) {
         this(
                 taskName,
-                new ConcurrentHashMap<>(maxCheckpoints),
                 new ChannelStateWriteRequestExecutorImpl(
                         taskName,
                         new ChannelStateWriteRequestDispatcherImpl(
                                 taskName,
                                 subtaskIndex,
                                 streamFactoryResolver,
-                                new ChannelStateSerializerImpl())),
-                maxCheckpoints);
+                                new ChannelStateSerializerImpl())));
     }
 
-    ChannelStateWriterImpl(
-            String taskName,
-            ConcurrentMap<Long, ChannelStateWriteResult> results,
-            ChannelStateWriteRequestExecutor executor,
-            int maxCheckpoints) {
+    ChannelStateWriterImpl(String taskName, ChannelStateWriteRequestExecutor executor) {
         this.taskName = taskName;
-        this.results = results;
-        this.maxCheckpoints = maxCheckpoints;
         this.executor = executor;
+        this.ongoingCheckpointId = 0;
+        this.maxAbortedCheckpointId = 0;
     }
 
     @Override
     public void start(long checkpointId, CheckpointOptions checkpointOptions) {
         LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
-        ChannelStateWriteResult result = new ChannelStateWriteResult();
-        ChannelStateWriteResult put =
-                results.computeIfAbsent(
+        synchronized (lock) {
+            Preconditions.checkState(checkpointId > ongoingCheckpointId);
+            if (isCheckpointSubsumedOrAborted(checkpointId)) {
+                LOG.debug(
+                        "The checkpoint {} of task {} has been aborted, so don't start.",
                         checkpointId,
-                        id -> {
-                            Preconditions.checkState(
-                                    results.size() < maxCheckpoints,
-                                    String.format(
-                                            "%s can't start %d, results.size() > maxCheckpoints: %d > %d",
-                                            taskName,
-                                            checkpointId,
-                                            results.size(),
-                                            maxCheckpoints));
-                            enqueue(
-                                    new CheckpointStartRequest(
-                                            checkpointId,
-                                            result,
-                                            checkpointOptions.getTargetLocation()),
-                                    false);
-                            return result;
-                        });
-        Preconditions.checkArgument(
-                put == result,
-                taskName + " result future already present for checkpoint " + checkpointId);
+                        taskName);
+                return;
+            }
+            if (result != null) {
+                result.fail(new CancellationException("Cancel old checkpoint."));

Review Comment:
   I think it would be safer to fail the futures outside of this lock. We don't know what other locks will be acquired from the actions that are connected to those futures.
   
   So something like:
   
   ```
   @Nullable ChannelStatWriteResult toFail = null;
   
   synchronized (lock) { 
     ...
     toFail = result;
     ...
   }
   
   if (toFail != null) {
     toFail.fail();
   }
   ```
   ?
   
   Or maybe for the sake of being consistent use the same pattern as in other places?
   
   ```
    enqueue(ChannelStateWriteRequest.abort(...));
   ```
   
   as it _seems_ to achieve a similar thing?
   
   I'm not sure which of this option is better here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -120,15 +141,28 @@ public void testAbortIgnoresMissing() throws Exception {
         runWithSyncWorker(this::callAbort);
     }
 
+    @Test
+    public void testOldCheckpointIsAborted() {
+        ChannelStateWriterImpl writer = openWriter();
+        callStart(1, writer);
+        ChannelStateWriteResult result1 = writer.getWriteResult(1);
+        assertFalse(result1.isDone());
+
+        callStart(2, writer);
+        assertNull(writer.getWriteResult(1));
+        assertFalse(writer.getWriteResult(2).isDone());
+
+        assertWriteResultIsCompletedExceptionally(result1);

Review Comment:
   Can you add assertion that the exception is `CheckpointException(CheckpointFailureReason#CHECKPOINT_DECLINED_SUBSUMED)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1189971613

   I've started a benchmark request to check the impact of replacing `ConcurrentHashMap` with a lock.
   
   http://codespeed.dak8s.net:8080/job/flink-benchmark-request/171/
   
   Also FYI @rkhachatryan, I've already reviewed this code, but you might be interested in this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1189094363

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1269854360

   > Thanks @1996fanrui for the update. I think this version looks much simpler compared to the previous. And sorry again for the detour.
   > 
   > I've left a couple of smaller comments.
   
   Hi @pnowojski , thanks for your review, I have addressed all comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920121363


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -316,6 +316,10 @@ public void checkpointState(
             // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
             // checkpoint barrier align.
             operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
+            channelStateWriter.abort(
+                    metadata.getCheckpointId(),
+                    new CancellationException("checkpoint aborted via notification"),
+                    true);

Review Comment:
   Hi @pnowojski , thanks for your review. 
   
   I don't know why don't execute it here. 
   
   `checkAndClearAbortedStatus` is called twice:
   - The first is `if (lastCheckpointId >= metadata.getCheckpointId())`, and execute abort and checkAndClearAbortedStatus there.
   - The second is here, I guess we should abort here, because `checkAndClearAbortedStatus(metadata.getCheckpointId())` means the checkpointId is aborted, we can call `channelStateWriter.abort` too.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r920121363


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -316,6 +316,10 @@ public void checkpointState(
             // broadcast cancel checkpoint marker to avoid downstream back-pressure due to
             // checkpoint barrier align.
             operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
+            channelStateWriter.abort(
+                    metadata.getCheckpointId(),
+                    new CancellationException("checkpoint aborted via notification"),
+                    true);

Review Comment:
   Hi @pnowojski , thanks for your review. 
   
   I don't know why don't execute it here. 
   
   `checkAndClearAbortedStatus` is called twice:
   - The first is `if (lastCheckpointId >= metadata.getCheckpointId())`, and execute abort and checkAndClearAbortedStatus there.
   - The second is here, I guess we should abort here, because `checkAndClearAbortedStatus(metadata.getCheckpointId()) == true` means the checkpointId is aborted, we can call `channelStateWriter.abort` too.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1187589584

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r925312733


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -121,15 +141,28 @@ public void testAbortIgnoresMissing() throws Exception {
         runWithSyncWorker(this::callAbort);
     }
 
+    @Test
+    public void testOldCheckpointIsAborted() {

Review Comment:
   Aren't we missing a test coverage for the main point of this bug? That the following sequence:
   ```
   callAbort(1, ...);
   callStart(1, ...);
   ```
   also leads to a completed `ChannelStateWriteResult`? Previously that would create an uncompleted `ChannelStateWriteResult`, that was causing a deadlocks in FLINK-26803 right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20233: [FLINK-28474][checkpoint] Simplify the ChannelStateWriterImpl due to concurrent unaligned checkpoint isn't supported

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20233:
URL: https://github.com/apache/flink/pull/20233#issuecomment-1190119143

   > Thanks @1996fanrui, I think the code looks mostly good. I've started a benchmark request to check the impact of replacing `ConcurrentHashMap` with a lock. I don't expect it to be an issue, but let's check that.
   > 
   > http://codespeed.dak8s.net:8080/job/flink-benchmark-request/171/
   > 
   > Also FYI @rkhachatryan, I've already reviewed this code, but you might be interested in this issue.
   
   Checkpoint is a relatively low-frequency behavior. My understanding is that this change should have little or no impact on data processing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988922357


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -157,6 +151,14 @@ public void addInputData(
                 checkpointId,
                 info,
                 startSeqNum);
+        if (isCheckpointSubsumedOrAborted(checkpointId)) {
+            LOG.debug(
+                    "The checkpoint {} of task {} has been aborted, so don't addInputData.",
+                    checkpointId,
+                    taskName);
+            closeBuffers(iterator);
+            return;
+        }
         enqueue(write(checkpointId, info, iterator), false);

Review Comment:
   ~this issue is no longer relevant~ we decided to go with another approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r988935163


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -568,6 +572,53 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        MockEnvironment mockEnvironment = MockEnvironment.builder().build();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20233: [FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might not fail after checkpoint abort

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r989031922


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -569,6 +571,58 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
+        String taskName = "test";
+        try (MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+                ChannelStateWriterImpl writer =
+                        new ChannelStateWriterImpl(taskName, 0, getStreamFactoryFactory());
+                SubtaskCheckpointCoordinator coordinator =
+                        new SubtaskCheckpointCoordinatorImpl(
+                                new TestCheckpointStorageWorkerView(100),
+                                taskName,
+                                StreamTaskActionExecutor.IMMEDIATE,
+                                newDirectExecutorService(),
+                                new DummyEnvironment(),
+                                (unused1, unused2) -> {},
+                                (unused1, unused2) -> CompletableFuture.completedFuture(null),
+                                128,
+                                writer,
+                                true,
+                                (callable, duration) -> () -> {})) {
+            writer.open();
+            final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment);
+            int checkpointId = 1;
+            // Abort checkpoint 1
+            coordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
+
+            coordinator.initInputsCheckpoint(
+                    checkpointId,
+                    CheckpointOptions.unaligned(
+                            CheckpointType.CHECKPOINT,
+                            CheckpointStorageLocationReference.getDefault()));
+            ChannelStateWriter.ChannelStateWriteResult writeResult =
+                    writer.getWriteResult(checkpointId);
+            assertNotNull(writeResult);
+            assertFalse(writeResult.isDone());
+            assertFalse(writeResult.getInputChannelStateHandles().isCompletedExceptionally());
+            assertFalse(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally());
+
+            coordinator.checkpointState(
+                    new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
+                    CheckpointOptions.forCheckpointWithDefaultLocation(),
+                    new CheckpointMetricsBuilder(),
+                    operatorChain,
+                    false,
+                    () -> true);
+            assertNull(writer.getWriteResult(checkpointId));
+            TimeUnit.MILLISECONDS.sleep(10);

Review Comment:
   One more thing I've missed before. Can we get rid of this sleep? It's very likely it will be failing from time to time in the azure CI.
   
   It looks like we can replace it with
   ```
   writeResult.get();
   ```
   where
   
   ```
   @VisibleForTesting
   public void ChannelStateWriter#get() {
     inputChannelStateHandles.get();
     resultSubpartitionStateHandles.get();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org