You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/21 14:11:52 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

1996fanrui opened a new pull request, #21547:
URL: https://github.com/apache/flink/pull/21547

   ## What is the purpose of the change
   
   In rocksdb incremental checkpoint mode, during file upload, if some files have been uploaded and some files have not been uploaded, the checkpoint is canceled due to checkpoint timeout at this time, and the uploaded files will remain.
   
   ## Brief change log
   
   Using the CloseableRegistry as the tmpResourcesRegistry. If the async phase is failed, the tmpResourcesRegistry will cleanup all temporary resources.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added test: RocksDBStateUploaderTest#testUploadedSstCanBeCleanedUp*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1070247686


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Thanks for your quick feedback!
   
   > will “tmpResourcesRegistry.registerCloseable() throw IOException after close()” cause the job to failover?
   
   It doesn't cause job to failover directly, it just causes current checkpoint fails. 
   
   We can see the code, `outputStream.closeAndGetHandle();` and `tmpResourcesRegistry.registerCloseable()` are in the same method, they are executed in uploader thread. So the `tmpResourcesRegistry.registerCloseable()` throws exception is similar to `outputStream.closeAndGetHandle();`.
   
   As we know, if the [execution.checkpointing.tolerable-failed-checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints) > 0, the job will continue to run after writing checkpoint file fails. So `tmpResourcesRegistry.registerCloseable()` works too.
   
   <img width="998" alt="image" src="https://user-images.githubusercontent.com/38427477/212467318-27036f4f-9001-4cfa-abdc-e741f27dd8a6.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1070245915


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Thanks for the clarification, I forgot that the RocksDBSnapshotOperation is created for single checkpoint😊. 
   
   And I have another question: 
   will “`tmpResourcesRegistry.registerCloseable()` throw `IOException` after `close()`” cause the job to failover? If we set `execution.checkpointing.tolerable-failed-checkpoints` to infinity, if chk-1 timeout,  will the job continue to run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21547:
URL: https://github.com/apache/flink/pull/21547#issuecomment-1361366338

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "36cbe3cc7b8acbbce02401f20ee0fa3b39251d4d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "36cbe3cc7b8acbbce02401f20ee0fa3b39251d4d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 36cbe3cc7b8acbbce02401f20ee0fa3b39251d4d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1070226451


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Hi @fredia , thanks a lot for your review. 
   
   As I understand, the `tmpResourcesRegistry` cannot be reused by multiple checkpoints. If `tmpResourcesRegistry` is reused, it will be closed after chk-1 timeout, and chk-2 will call `tmpResourcesRegistry.registerCloseable()` after writing file. However, chk-2 will throw `IOException` even if chk-2 is fine.
   
   Current PR doesn't reuse the `tmpResourcesRegistry` by multiple checkpoints, the `RocksDBSnapshotOperation` is created for single checkpoint, the constructor includes the checkpointId.
   
   The `testUploadedSstCanBeCleanedUp` reuses `tmpResourcesRegistry` by multiple `uploader.uploadFilesToCheckpointFs`, it isn't reused by multiple checkpoints. A single checkpoint may call `uploader.uploadFilesToCheckpointFs` multiple times, for example: `RocksIncrementalSnapshotStrategy#uploadSstFiles`.
   
   Please correct me if I'm wrong, thanks~ 🤔
   
   <img width="995" alt="image" src="https://user-images.githubusercontent.com/38427477/212459586-7c7327a4-1b8c-4dcb-a16a-aa5bb56b9443.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1070226451


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Hi @fredia , thanks a lot for your review. 
   
   As I understand, the `tmpResourcesRegistry` cannot be reused by multiple checkpoints. If `tmpResourcesRegistry` is reused, it will be closed after chk-1 timeout, and chk-2 will call `tmpResourcesRegistry.registerCloseable()` after writing file. However, chk-2 will throw `IOException` even if chk-2 is fine.
   
   Current PR doesn't reuse the `tmpResourcesRegistry` by multiple checkpoints, the `RocksDBSnapshotOperation` is created for single checkpoint, the constructor includes the checkpointId.
   
   The `testUploadedSstCanBeCleanedUp` reuses `tmpResourcesRegistry` by multiple `uploader.uploadFilesToCheckpointFs`, it isn't reused by multiple checkpoints. I want to check if all files can be cleaned up, no matter the files written before or after `tmpResourcesRegistry.close`. 
   
   Please correct me if I'm wrong, thanks~ 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by "Zakelly (via GitHub)" <gi...@apache.org>.
Zakelly commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1092716236


##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java:
##########
@@ -67,50 +70,125 @@ public void testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException
                 new CheckpointStreamFactory() {
                     @Override
                     public CheckpointStateOutputStream createCheckpointStateOutputStream(
-                            CheckpointedStateScope scope) throws IOException {
+                            CheckpointedStateScope scope) {
                         return outputStream;
                     }
 
                     @Override
                     public boolean canFastDuplicate(
-                            StreamStateHandle stateHandle, CheckpointedStateScope scope)
-                            throws IOException {
+                            StreamStateHandle stateHandle, CheckpointedStateScope scope) {
                         return false;
                     }
 
                     @Override
                     public List<StreamStateHandle> duplicate(
-                            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
-                            throws IOException {
+                            List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) {
                         return null;
                     }
                 };
 
-        File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+        File file = TempDirUtils.newFile(temporaryFolder, String.valueOf(UUID.randomUUID()));
         generateRandomFileContent(file.getPath(), 20);
 
         Map<StateHandleID, Path> filePaths = new HashMap<>(1);
         filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
         try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
+            assertThatThrownBy(
+                            () ->
+                                    rocksDBStateUploader.uploadFilesToCheckpointFs(
+                                            filePaths,
+                                            checkpointStreamFactory,
+                                            CheckpointedStateScope.SHARED,
+                                            new CloseableRegistry(),
+                                            new CloseableRegistry()))
+                    .isEqualTo(expectedException);
+        }
+    }
+
+    @Test
+    void testUploadedSstCanBeCleanedUp() throws Exception {
+        SpecifiedException expectedException =
+                new SpecifiedException("throw exception while multi thread upload states.");
+
+        File checkpointPrivateFolder = TempDirUtils.newFolder(temporaryFolder, "private");
+        org.apache.flink.core.fs.Path checkpointPrivateDirectory =
+                org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
+
+        File checkpointSharedFolder = TempDirUtils.newFolder(temporaryFolder, "shared");
+        org.apache.flink.core.fs.Path checkpointSharedDirectory =
+                org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
+
+        FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
+
+        int sstFileCount = 6;
+        int fileStateSizeThreshold = 1024;
+        int writeBufferSize = 4096;
+        CheckpointStreamFactory checkpointStreamFactory =
+                new FsCheckpointStreamFactory(
+                        fileSystem,
+                        checkpointPrivateDirectory,
+                        checkpointSharedDirectory,
+                        fileStateSizeThreshold,
+                        writeBufferSize);
+
+        String localFolder = "local";
+        TempDirUtils.newFolder(temporaryFolder, localFolder);
+
+        Map<StateHandleID, Path> filePaths =
+                generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
+        CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
+        try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(sstFileCount)) {
             rocksDBStateUploader.uploadFilesToCheckpointFs(
                     filePaths,
                     checkpointStreamFactory,
                     CheckpointedStateScope.SHARED,
-                    new CloseableRegistry());
-            fail();
-        } catch (Exception e) {
-            assertEquals(expectedException, e);
+                    new CloseableRegistry(),
+                    tmpResourcesRegistry);
+
+            assertThatThrownBy(
+                            () ->
+                                    rocksDBStateUploader.uploadFilesToCheckpointFs(
+                                            filePaths,
+                                            new LastFailingCheckpointStateOutputStreamFactory(
+                                                    checkpointStreamFactory,
+                                                    sstFileCount,
+                                                    expectedException),
+                                            CheckpointedStateScope.SHARED,
+                                            new CloseableRegistry(),
+                                            tmpResourcesRegistry))
+                    .isEqualTo(expectedException);
+            tmpResourcesRegistry.close();

Review Comment:
   Before this, would you mind checking the private folder or shared folder is not empty?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -147,6 +157,11 @@ private StreamStateHandle uploadLocalFileToCheckpointFs(
                 result = outputStream.closeAndGetHandle();
                 outputStream = null;
             }
+            if (result != null) {
+                StreamStateHandle finalResult = result;

Review Comment:
   How about adding a keyword 'final'? Or just let 'result' be final, i.e.:
   ```
   final StreamStateHandle result;
   if (closeableRegistry.unregisterCloseable(outputStream)) {
       result = outputStream.closeAndGetHandle();
       outputStream = null;
   } else {
       result = null;
   }
   // following other code...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21547:
URL: https://github.com/apache/flink/pull/21547#issuecomment-1375446507

   Hi @curcur @Myasuka , could you please help take a look this PR in your free time? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1070247686


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Thanks for your quick feedback!
   
   > will “tmpResourcesRegistry.registerCloseable() throw IOException after close()” cause the job to failover?
   
   It doesn't cause job to failover directly, it just causes current checkpoint fails. 
   
   We can see the code, `outputStream.closeAndGetHandle();` and `tmpResourcesRegistry.registerCloseable()` are in the same method, they are executed in uploader thread. So the `tmpResourcesRegistry.registerCloseable()` throws exception is similar to `outputStream.closeAndGetHandle();`.
   
   As we know, if the [execution.checkpointing.tolerable-failed-checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints) > 0, the job will continue to run after writing checkpoint file fails. So `tmpResourcesRegistry.registerCloseable()` is also work.
   
   <img width="998" alt="image" src="https://user-images.githubusercontent.com/38427477/212467318-27036f4f-9001-4cfa-abdc-e741f27dd8a6.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1069245033


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Can `tmpResourcesRegistry` be reused by multiple checkpoints?
   Suppose chk-1 left some files because of the timeout, `tmpResourcesRegistry` would clean these files and close; if chk-2 also timeout again, `tmpResourcesRegistry.registerCloseable()` would throw `IOException` as `testUploadedSstCanBeCleanedUp()` shown, is this as expected?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #21547:
URL: https://github.com/apache/flink/pull/21547#issuecomment-1411461596

   > Thanks for the PR! I have just noticed this issue and it is important to clean up the left-over files and maintain HDFS availability. Overall it LGTM. I left some minor comments, PTAL, thanks.
   
   Hi @Zakelly , thanks for your review and good suggestions. Updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur merged pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by "curcur (via GitHub)" <gi...@apache.org>.
curcur merged PR #21547:
URL: https://github.com/apache/flink/pull/21547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by "1996fanrui (via GitHub)" <gi...@apache.org>.
1996fanrui commented on PR #21547:
URL: https://github.com/apache/flink/pull/21547#issuecomment-1418402687

   Thanks @fredia @Zakelly for the review, and thanks @curcur for the review and merge~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fredia commented on a diff in pull request #21547: [FLINK-30461][checkpoint] Fix the bug that the shared files will remain forever after the rocksdb incremental checkpoint exception

Posted by GitBox <gi...@apache.org>.
fredia commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1070245915


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -325,6 +329,7 @@ protected RocksDBSnapshotOperation(
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.localBackupDirectory = localBackupDirectory;
+            this.tmpResourcesRegistry = new CloseableRegistry();

Review Comment:
   Thanks for the clarification, I forgot that the RocksDBSnapshotOperation is created for single checkpoint😊. 
   And I have another question: will “`tmpResourcesRegistry.registerCloseable()` throw `IOException` after `close()`” cause the job to failover? If we set `execution.checkpointing.tolerable-failed-checkpoints` to infinity, if chk-1 timeout,  will the job continue to run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org