You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/15 06:23:29 UTC
[hudi] branch master updated: [HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0811bb38fb [HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)
0811bb38fb is described below
commit 0811bb38fb61c19cb2759b96c268682a47516efe
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Jun 15 14:23:23 2022 +0800
[HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)
---
.../src/main/java/org/apache/hudi/io/HoodieMergeHandle.java | 6 ++++--
.../main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java | 4 +---
.../src/main/java/org/apache/hudi/io/FlinkMergeHandle.java | 8 +++++++-
3 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index a85df2a230..82c6de5761 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -195,8 +195,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
writeStatus.getStat().setFileId(fileId);
setWriteStatusPath();
- // Create Marker file
- createMarkerFile(partitionPath, newFileName);
+ // Create Marker file,
+ // uses name of `newFilePath` instead of `newFileName`
+ // in case the sub-class may roll over the file handle name.
+ createMarkerFile(partitionPath, newFilePath.getName());
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index 24da25b20b..cf912f620a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -137,10 +137,8 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
*/
protected String newFileNameWithRollover(int rollNumber) {
- // make the intermediate file as hidden
- final String fileID = "." + this.fileId;
return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
- fileID, hoodieTable.getBaseFileExtension());
+ this.fileId, hoodieTable.getBaseFileExtension());
}
@Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index e111771263..1bff89713b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -158,11 +158,17 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
*/
protected String newFileNameWithRollover(int rollNumber) {
- // make the intermediate file as hidden
return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
this.fileId, hoodieTable.getBaseFileExtension());
}
+ @Override
+ protected void setWriteStatusPath() {
+ // if there was rollover, should set up the path as the initial new file path.
+ Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
+ writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
+ }
+
@Override
public List<WriteStatus> close() {
try {