You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:48 UTC

[hudi] 10/17: [HUDI-5511] Do not clean the CkpMetadata dir when restart the job (#7620)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b77aa2e1b8377c1788770f7e9e229bd8cd6f5d7f
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Mon Jan 9 09:56:35 2023 +0800

    [HUDI-5511] Do not clean the CkpMetadata dir when restart the job (#7620)
    
    In the beginning, we bootstrap the ckp metadata by cleaning all the messages.
    This introduces some corner case like 'the write task cannot fetch the pending instant correctly when restarting the job',
    if a checkpoint succeeds and the job crashes suddenly, the instant hasn't had time to commit, then the data loss happens,
    because the last pending instant would be rolled back, while the Flink engine thinks the checkpoint/instant is successful.
    
    Q: Why we clean the messages?
    A: To prevent inconsistencies between timeline and the messages.
    
    Q: Why we decide to keep the messages?
    A: There are two cases for the inconsistency:
    
    1. the timeline instant is complete but the ckp message is inflight (for committing instant),
    2. the timeline instant is pending while the ckp message does not start (for starting a new instant).
    
    For case1, there is no need to re-commit the instant, so it's okey the write task does not get any pending instant when recovering, for case2, the instant is basically pending, it would be rolled back which is in line with expectations.
    Keeping the ckp messages as it is can actually preserve correctness.
    
    (cherry picked from commit ff403c82d8a64569669e6371d4b8e85cba0c0656)
---
 .../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java       | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 6895b2a0c63..155f0e6905e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -90,13 +90,14 @@ public class CkpMetadata implements Serializable {
   // -------------------------------------------------------------------------
 
   /**
-   * Initialize the message bus, would clean all the messages and publish the last pending instant.
+   * Initialize the message bus, would keep all the messages.
    *
    * <p>This expects to be called by the driver.
    */
   public void bootstrap() throws IOException {
-    fs.delete(path, true);
-    fs.mkdirs(path);
+    if (!fs.exists(path)) {
+      fs.mkdirs(path);
+    }
   }
 
   public void startInstant(String instant) {