You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:46 UTC

[hudi] 36/45: [HUDI-5223] Partial failover for flink (#7208)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bddf061a794df35936eb532ba7d747e7aa3fbb47
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Nov 16 14:47:38 2022 +0800

    [HUDI-5223] Partial failover for flink (#7208)
    
    Before the patch, when there are partial failover within the write tasks, the write task current instant was initialized as the latest inflight instant, the write task then waits for a new instant to write with so hangs and failover continuously.
    
    For a task recovered from failover (with attempt number greater than 0), the latest inflight instant can actually be reused, the intermediate data files can be cleaned with MARGER files post commit.
    
    (cherry picked from commit d3f957755abf76c64ff06fac6d857cba9bdbbacf)
---
 .../src/main/java/org/apache/hudi/io/FlinkMergeHandle.java   |  8 +-------
 .../apache/hudi/sink/common/AbstractStreamWriteFunction.java | 12 ++++++++++--
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 69121a9a04..a44783f99e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -143,13 +143,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
           break;
         }
 
-        // Override the old file name,
-        // In rare cases, when a checkpoint was aborted and the instant time
-        // is reused, the merge handle generates a new file name
-        // with the reused instant time of last checkpoint, which is duplicate,
-        // use the same name file as new base file in case data loss.
-        oldFilePath = newFilePath;
-        rolloverPaths.add(oldFilePath);
+        rolloverPaths.add(newFilePath);
         newFileName = newFileNameWithRollover(rollNumber++);
         newFilePath = makeNewFilePath(partitionPath, newFileName);
         LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index b4569894a2..7642e9f28f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -188,10 +188,9 @@ public abstract class AbstractStreamWriteFunction<I>
   // -------------------------------------------------------------------------
 
   private void restoreWriteMetadata() throws Exception {
-    String lastInflight = lastPendingInstant();
     boolean eventSent = false;
     for (WriteMetadataEvent event : this.writeMetadataState.get()) {
-      if (Objects.equals(lastInflight, event.getInstantTime())) {
+      if (Objects.equals(this.currentInstant, event.getInstantTime())) {
         // Reset taskID for event
         event.setTaskID(taskID);
         // The checkpoint succeed but the meta does not commit,
@@ -207,6 +206,15 @@ public abstract class AbstractStreamWriteFunction<I>
   }
 
   private void sendBootstrapEvent() {
+    int attemptId = getRuntimeContext().getAttemptNumber();
+    if (attemptId > 0) {
+      // either a partial or global failover, reuses the current inflight instant
+      if (this.currentInstant != null) {
+        LOG.info("Recover task[{}] for instant [{}] with attemptId [{}]", taskID, this.currentInstant, attemptId);
+        this.currentInstant = null;
+      }
+      return;
+    }
     this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
     LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
   }