You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/17 08:04:38 UTC

[GitHub] [flink] zentol commented on a diff in pull request #19993: [FLINK-28077][checkpoint] Fix the bug that tasks get stuck during cancellation in ChannelStateWriteRequestExecutorImpl

zentol commented on code in PR #19993:
URL: https://github.com/apache/flink/pull/19993#discussion_r899866865


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -98,6 +98,9 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
                     List<Buffer> buffers;
                     try {
                         buffers = dataFuture.get();
+                    } catch (InterruptedException e) {
+                        writer.fail(e);
+                        throw e;

Review Comment:
   does this maybe belong rather to FLINK-27792? AFAICT this isn't required for the issue at hand.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -109,6 +112,9 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
                     }
                 },
                 throwable -> {
+                    if (!dataFuture.isDone()) {
+                        return;
+                    }

Review Comment:
   While this will likely solve the issue I'm not sure if it is the correct solution.
   
   We could see in the logs that this future would eventually be completed, with several buffers being contained within. Admittedly this happened after the hosting TM shut down (so I'm not sure if it can happen in production where the JVM would go with it), but I do wonder if this couldn't cause a buffer leak.
   
   Would there be any down-side of doing the clean-up like this:
   ```
   dataFuture.thenAccept(
           buffers -> {
               try {
                   CloseableIterator.fromList(buffers, Buffer::recycleBuffer)
                           .close();
               } catch (Exception e) {
               }
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org