You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "1996fanrui (via GitHub)" <gi...@apache.org> on 2023/04/13 14:01:26 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

1996fanrui opened a new pull request, #22392:
URL: https://github.com/apache/flink/pull/22392

   ## What is the purpose of the change
   
   Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition.
   
   ## Brief change log
   
   Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition.
   
   ## Verifying this change
   
   This change improved old tests and can be verified as follows:
   
     - PipelinedSubpartitionTest#testConsumeTimeoutableCheckpointBarrierQuickly
     - PipelinedSubpartitionTest#testTimeoutAlignedToUnalignedBarrier
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency):  no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1166290066


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Hi @pnowojski , thanks for your review.
   
   It's a great suggestion, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1168635822


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Ehhh. [4] is indeed a bit complicated. I think I misspoke, I actually meant something a bit different ([5] below).
   [3] Technically it works, but I don't like how fragile the contract there actually is, where value of the `volatile boolean unalignedCheckpoint` is only valid if other methods/things are happening in the correct order. To fix it, we would need something like:
   [5] Change `CheckpointMetricsBuilder#unalignedCheckpoint` into some kind of `CompletableFuture<Boolean>`.  Using that, `AsyncCheckpointRunnable` could just call `CheckpointMetricsBuilder#unalignedCheckpoint.get()`, without taking into account if that's safe or not. However I hoped that it can be set exactly as [3], but there is a problem. We know when to set it to `true`, but when to set it to `false` would require quite a bit of logic :/
   [6] Another potential solution would be to move out the completion of the `AsyncCheckpointRunnable` from that the async thread, into the mailbox thread, which would also remove some race conditions and simplify the logic. But that's probably not worth doing for the sake of this single flag...
   
   All in all, I'm started to think that maybe your original idea, to approximate the `true/false` flag based on the `bytesPersistedDuringAlignment > 0` might be the lesser evil. The case when `bytesPersistedDuringAlignment == 0` but the checkpoint barrier actually timed out in the output buffers is quite extreme/rare, and shouldn't be that significant to the end user and probably not worth of making the code so much more complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22392:
URL: https://github.com/apache/flink/pull/22392#issuecomment-1507031958

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c8d6b945ad2c8ff9e2d0c3328dba3d7f8cd59767",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c8d6b945ad2c8ff9e2d0c3328dba3d7f8cd59767",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c8d6b945ad2c8ff9e2d0c3328dba3d7f8cd59767 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1166686196


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   > AsyncCheckpointRunnable and the alignment timer, are running in different threads, creating both problems with the actual memory visibility AND race conditions?
   
   
   Before this PR, the `CheckpointMetricsBuilder#setUnalignedCheckpoint` is only called on Task thread[1]. And IIUC, registerTimer should be executed by task thread as well.`AsyncCheckpointRunnable` won't call `CheckpointMetricsBuilder#setUnalignedCheckpoint`, `AsyncCheckpointRunnable` just uses it to build the metrics. So it cannot be modified concurrently. 
   
   After detailed analysis, I guess the first comment[2] should be reverted. It may lead to wrong unaligned type due to the order of execution, for example:
   
   1. registerTimer thread: aligned barrier timeout to unaligned
   2. registerTimer thread: channelStateFuture.complete(inflightBuffers)
   3. Channel state writer thread: write these buffers and complete the `resultSubpartitionStateFuture`
   4. AsyncCheckpointRunnable thread: all states are written, and build metrics
   5. registerTimer thread: call `CheckpointMetricsBuilder#setUnalignedCheckpoint(true)`
   
   If the inflightBuffers is empty or very small, the step 3 and step 4 will faster than step5, and then the unaligned type will be wrong.
   
   Based on this case, i think the solution is :
   
   1.  `CheckpointMetricsBuilder#setUnalignedCheckpoint(true)` should be executed before `channelStateFuture.complete(inflightBuffers)`, that is, `CheckpointMetricsBuilder` should be passed to `PipelinedSubpartition#alignedBarrierTimeout`.
   2. Add the volatile for `CheckpointMetricsBuilder#unalignedCheckpoint` to ensure AsyncCheckpointRunnable can read it correctly.
   
   I updated the solution here[3].
   
   > Shouldn't this be set in the AsyncCheckpointRunnable thread via a code path similar to org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.SnapshotsFinalizeResult#bytesPersistedDuringAlignment?
   
   This solution can work. Actually, I have tried to this solution. And I found this code path is too complex, it includes too many exception cases(in ChannelStateCheckpointWriter), completedFuture(includes dataFuture and resultFuture) and complete these futures after merging channel state.
   
   However, I implemented a POC version[4] using this solution.  Core process:
   - Adding a `CompletableFuture<Boolean> timeoutToUnaligned` inside of `PipelinedSubpartition`, and complete it when complete channelStateFuture
   - `ChannelStateWriteResult`(it's at subtask level) added a `CompletableFuture<Boolean> resultSubpartitionTimeoutToUnaligned;`, the future will be completed in the following cases: 
     - 1. true: Any subpartition be switched from aligned to unaligned checkpoint. 
     - 2. false: This result was completed and all subpartitions don't switched to unaligned checkpoint. 
     - 3. false: This result fails  before any subpartition switched to unaligned checkpoint.
   - `ChannelStateWriteResult` will pass the result to `OperatorSnapshotFutures`, and then pass it to `AsyncCheckpointRunnable`
   
   
   Solution2 is more complex than solution1, however, it's more reasonable. Which one do you prefer?
   
   
   [1] https://github.com/apache/flink/blob/a81ffa6d019d9891bd3a54f50fb36ad847721daa/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L738
   [2] https://github.com/apache/flink/pull/22392#discussion_r1165593019
   [3] https://github.com/1996fanrui/flink/commit/c23c5dbf08567b84a8258b11539c1d4e06ab2dff
   [4] https://github.com/1996fanrui/flink/commit/d5f2537bb02ea0248b75c9cdc467a60dea541cf3
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1168635822


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Ehhh. [4] is indeed a bit complicated. I think I misspoke, I actually meant something a bit different ([5] below).
   [3] Technically it works, but I don't like how fragile the contract there actually is, where value of the `volatile boolean unalignedCheckpoint` is only valid if other methods/things are happening in the correct order. To fix it, we would need something like:
   [5] Change `CheckpointMetricsBuilder#unalignedCheckpoint` into some kind of `CompletableFuture<Boolean>`.  Using that, `AsyncCheckpointRunnable` could just call `CheckpointMetricsBuilder#unalignedCheckpoint.get()`, without taking into account if that's safe or not. However I hoped that it can be set exactly as [3], but there is a problem. We know when to set it to `true`, but when to set it to `false` would require quite a bit of logic :/
   [6] Another potential solution would be to move out the completion of the `AsyncCheckpointRunnable` from that the async thread, into the mailbox thread, which would also remove some race conditions and simplify the logic. But that's probably not worth doing for the sake of this single flag...
   
   All in all, I'm started to think that maybe your original idea, to approximate the `true/false` flag based on the `bytesPersistedDuringAlignment > 0` might be the lesser evil. The case when `bytesPersistedDuringAlignment == 0` but the checkpoint barrier actually timed out in the output buffers is quite extreme/rare, and shouldn't be that significant to the end user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1172386310


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Yes, lets' do that. Apart of that let's create a ticket to explain the problem with the approximation, linking to this conversation, and setting it priority to "not a priority" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1172355862


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   > All in all, I'm started to think that maybe your original idea, to approximate the true/false flag based on the bytesPersistedDuringAlignment > 0 might be the lesser evil. The case when bytesPersistedDuringAlignment == 0 but the checkpoint barrier actually timed out in the output buffers is quite extreme/rare, and shouldn't be that significant to the end user and probably not worth of making the code so much more complicated.
   
   I agree with you, these solutions are complicated, and it probably not worth of making the code so much more complicated, so I prefer generate unaligned checkpoint type based on persisted data.
   
   Do you think it's ok? If yes, I can go ahead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1172401958


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Thanks for your quick response, I created FLINK-31864 to briefly explain it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1165593019


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Maybe instead of adding a dependency to `CheckpointMetrics` to all of the call stack down to the subpartition, can `alignedBarrierTimeout` return `true` or `false` depending if the barrier has timed out or not? 🤔 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                operatorChain.alignedBarrierTimeout(checkpointId);
+                                operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Can we safely pass the `CheckpointMetricsBuilder` to the timer thread? Maybe I'm mis-remembering something, but the ownership and fully responsibility for `CheckpointMetricsBuilder` seems to be passed from the `SubtaskCheckpointCoordinatorImpl` to the `AsyncCheckpointRunnable`, which uses it to build the metrics. `AsyncCheckpointRunnable` and the alignment timer, are running in different threads, creating both problems with the actual memory visibility AND race conditions?
   
   Shouldn't this be set in the `AsyncCheckpointRunnable` thread via a code path similar to `org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.SnapshotsFinalizeResult#bytesPersistedDuringAlignment`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #22392:
URL: https://github.com/apache/flink/pull/22392#issuecomment-1517672547

   > Thanks for the fix!
   > 
   > Can you add a small unit test? Apart of that LGTM Feel free to merge after adding the unit test and with green azure :)
   
   Thanks for the quick feedback, updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui merged pull request #22392: [FLINK-31588][checkpoint] Correct the unaligned checkpoint type after aligned barrier timeout to unaligned barrier on PipelinedSubpartition

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui merged PR #22392:
URL: https://github.com/apache/flink/pull/22392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org