You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/15 06:23:29 UTC

[hudi] branch master updated: [HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0811bb38fb [HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)
0811bb38fb is described below

commit 0811bb38fb61c19cb2759b96c268682a47516efe
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Jun 15 14:23:23 2022 +0800

    [HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)
---
 .../src/main/java/org/apache/hudi/io/HoodieMergeHandle.java       | 6 ++++--
 .../main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java  | 4 +---
 .../src/main/java/org/apache/hudi/io/FlinkMergeHandle.java        | 8 +++++++-
 3 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index a85df2a230..82c6de5761 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -195,8 +195,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       writeStatus.getStat().setFileId(fileId);
       setWriteStatusPath();
 
-      // Create Marker file
-      createMarkerFile(partitionPath, newFileName);
+      // Create Marker file,
+      // uses name of `newFilePath` instead of `newFileName`
+      // in case the sub-class may roll over the file handle name.
+      createMarkerFile(partitionPath, newFilePath.getName());
 
       // Create the writer for writing the new version file
       fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index 24da25b20b..cf912f620a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -137,10 +137,8 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
    * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
    */
   protected String newFileNameWithRollover(int rollNumber) {
-    // make the intermediate file as hidden
-    final String fileID = "." + this.fileId;
     return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
-        fileID, hoodieTable.getBaseFileExtension());
+        this.fileId, hoodieTable.getBaseFileExtension());
   }
 
   @Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index e111771263..1bff89713b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -158,11 +158,17 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
    * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
    */
   protected String newFileNameWithRollover(int rollNumber) {
-    // make the intermediate file as hidden
     return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
         this.fileId, hoodieTable.getBaseFileExtension());
   }
 
+  @Override
+  protected void setWriteStatusPath() {
+    // if there was rollover, should set up the path as the initial new file path.
+    Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
+    writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
+  }
+
   @Override
   public List<WriteStatus> close() {
     try {