You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 13:19:54 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #20306: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression

rkhachatryan commented on code in PR #20306:
URL: https://github.com/apache/flink/pull/20306#discussion_r925596963


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##########
@@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws
                 wrappedStreamClosed = true;
             }
         } finally {
-            if (!wrappedStreamClosed) {
+            if (!wrappedStreamClosed || compression) {
                 fsStream.close();
             }

Review Comment:
   The logic becomes a bit tricky IMO.
   I think we can simplify it by:
   1. wrapping the stream we want to close 
   2. in wrapper, making sure it is closed at most once
   3. close the wrapper using `try-finally`
   
   And we also get rid of `wrappedStreamClosed`.
   
   Something like this:
   
   ```
   try (FSDataOutputStream fsStream =
       wrapWithCloseOnce(fileSystem.create(path, NO_OVERWRITE))) {
       try (OutputStreamWithPos stream = wrap(fsStream)) {
           ...
       }
   }
   
   ```
   ```
   private static FSDataOutputStream wrapWithCloseOnce(FSDataOutputStream stream) {
       return new FSDataOutputStream() {
           private boolean closed;
   
           @Override
           public void close() throws IOException {
               if (!closed) {
                   closed = true;
                   stream.close();
               }
           }
   
           ...
       };
   }
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org