You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/25 04:02:45 UTC

[GitHub] [hudi] garyli1019 commented on a diff in pull request #5185: [HUDI-3758] Optimize flink partition table with BucketIndex

garyli1019 commented on code in PR #5185:
URL: https://github.com/apache/hudi/pull/5185#discussion_r857243889


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexAssignFunction.java:
##########
@@ -173,17 +201,54 @@ private void bootstrapIndexIfNeed(String partition) {
     this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
       String fileID = fileSlice.getFileId();
       int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
-      if (bucketToLoad.contains(bucketNumber)) {
-        LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
-        if (bucketToFileIDMap.containsKey(bucketNumber)) {
-          throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found "
-              + "during the BucketStreamWriteFunction index bootstrap.", fileID, bucketNumber, partition));
-        } else {
-          LOG.info(String.format("Adding fileID %s to the bucket %s of partition %s.", fileID, bucketNumber, partition));
-          bucketToFileIDMap.put(bucketNumber, fileID);
+      if (isPartitionTable) {
+        fillIndex(bucketToFileIDMap, partition, bucketNumber, fileID);
+      } else {
+        if (bucketToLoad.contains(bucketNumber)) {
+          fillIndex(bucketToFileIDMap, partition, bucketNumber, fileID);
         }
       }
     });
     bucketIndex.put(partition, bucketToFileIDMap);
+
+    // no need to load
+    boolean noNeedLoadFiles = isPartitionTable ? bucketToFileIDMap.size() == bucketNum : bucketToFileIDMap.size() == bucketToLoad.size();
+    if (noNeedLoadFiles || isCowTable) {
+      return;
+    }
+    // reuse unCommitted log file id
+    try {
+      Map<Integer, String> partitionFsBucketToFileIDMap = new HashMap<>();

Review Comment:
   can we keep this in the bucket stream write function? IMO we don't need to a separate stage to do this job and this will make an extra layer of shuffling the data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org