You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/16 07:42:10 UTC

[GitHub] [flink] gaoyunhaii commented on a change in pull request #18157: [FLINK-17808] Rename checkpoint meta file to "_metadata" until it has…

gaoyunhaii commented on a change in pull request #18157:
URL: https://github.com/apache/flink/pull/18157#discussion_r785398471



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
##########
@@ -44,7 +46,7 @@
 
     // ------------------------------------------------------------------------
 
-    private final FSDataOutputStream out;
+    private FSDataOutputStream out;

Review comment:
       Although related to the following comments, here we should have methods to keep it final, like introducing local variable when creating or narrow down the scope of try...catch in the constructor. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
##########
@@ -62,7 +64,15 @@ public FsCheckpointMetadataOutputStream(
         this.metadataFilePath = checkNotNull(metadataFilePath);
         this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir);
 
-        this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE);
+        try {
+            RecoverableWriter recoverableWriter = fileSystem.createRecoverableWriter();

Review comment:
       I have a bit concern on the compatibility when the `RecoverableWriter` could not be created: currently it only falls back to the normal output stream when the FileSystem throws UnsupportedOperationException. However, since users may have customized FileSystem implementation that changes this behavior for unsupported cases. 
   
   Might we fallback to the normal output stream with warnings whenever the `RecoverableWriter` failed to create? like
   
   ```
   RecoverableWriter recoverableWriter = null;
   try {
       recoverableWriter = ...
   } catch (Throwable e) {
      Log.Warn(...)
   }
   
   if (recoverableWriter != null) {
       // use recoverable writer
   } else {
      // use normal output stream
   }
   
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
##########
@@ -109,13 +119,19 @@ public void close() {
 
             try {
                 out.close();
-                fileSystem.delete(metadataFilePath, false);
+                if (!isRecoverableStream(out)) {
+                    fileSystem.delete(metadataFilePath, false);
+                }
             } catch (Throwable t) {
                 LOG.warn("Could not close the state stream for {}.", metadataFilePath, t);
             }
         }
     }
 
+    private boolean isRecoverableStream(FSDataOutputStream out) {

Review comment:
       It seems relaying on conditions and branches in different places might complicate the implementation. Might you introduce an interface for the two types of implementation? like
   
   ```
   interface MetadataOutputStreamBackend  {
        
       FSDataOutputStream getOutput();
   
       void commit();
   
       void close();
   }
   ```
   
   and have two implementations ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org