You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/04/29 06:10:25 UTC

[hudi] branch master updated: [HUDI-3758] Fix duplicate fileId error in MOR table type with flink bucket hash Index (#5185)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e421d536ea [HUDI-3758] Fix duplicate fileId error in MOR table type with flink bucket hash Index  (#5185)
e421d536ea is described below

commit e421d536eae236d0b2e29d9a4f59ed65822fd883
Author: 吴祥平 <40...@qq.com>
AuthorDate: Fri Apr 29 14:10:20 2022 +0800

    [HUDI-3758] Fix duplicate fileId error in MOR table type with flink bucket hash Index  (#5185)
    
    * fix duplicate fileId with bucket Index
    * replace to load FileGroup from FileSystemView
---
 .../apache/hudi/sink/bucket/BucketStreamWriteFunction.java   | 12 +++---------
 1 file changed, 3 insertions(+), 9 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 1456e8882f..11d5f36436 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -75,11 +75,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    */
   private Set<String> incBucketIndex;
 
-  /**
-   * Returns whether this is an empty table.
-   */
-  private boolean isEmptyTable;
-
   /**
    * Constructs a BucketStreamWriteFunction.
    *
@@ -99,7 +94,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
     this.bucketToLoad = getBucketToLoad();
     this.bucketIndex = new HashMap<>();
     this.incBucketIndex = new HashSet<>();
-    this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
   }
 
   @Override
@@ -162,7 +156,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    * This is a required operation for each restart to avoid having duplicate file ids for one bucket.
    */
   private void bootstrapIndexIfNeed(String partition) {
-    if (isEmptyTable || bucketIndex.containsKey(partition)) {
+    if (bucketIndex.containsKey(partition)) {
       return;
     }
     LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
@@ -170,8 +164,8 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
 
     // Load existing fileID belongs to this task
     Map<Integer, String> bucketToFileIDMap = new HashMap<>();
-    this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
-      String fileID = fileSlice.getFileId();
+    this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
+      String fileID = fileGroup.getFileGroupId().getFileId();
       int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
       if (bucketToLoad.contains(bucketNumber)) {
         LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));