You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/01 03:36:40 UTC

[hudi] branch master updated: [HUDI-5921] Partition path should be considered in BucketIndexConcurrentFileWritesConflictResolutionStrategy (#8163)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b752df2a2f9 [HUDI-5921] Partition path should be considered in BucketIndexConcurrentFileWritesConflictResolutionStrategy (#8163)
b752df2a2f9 is described below

commit b752df2a2f9d2cdee79144d94fe776171696691f
Author: Manu <36...@users.noreply.github.com>
AuthorDate: Sat Apr 1 11:36:31 2023 +0800

    [HUDI-5921] Partition path should be considered in BucketIndexConcurrentFileWritesConflictResolutionStrategy (#8163)
    
    Generally to compare fileId and partition path for `ConcurrentOperation`, the fileId can still conflict among different partitions although the odds is very low.
    
    The relative file path are kept before the patch but never used, we can add it back if necessary.
---
 ...urrentFileWritesConflictResolutionStrategy.java | 22 ++++++----
 .../client/transaction/ConcurrentOperation.java    | 35 ++++++++--------
 ...urrentFileWritesConflictResolutionStrategy.java |  9 ++--
 .../hudi/table/action/compact/CompactHelpers.java  |  2 +
 ...urrentFileWritesConflictResolutionStrategy.java | 48 +++++++++++++++++-----
 .../org/apache/hudi/common/util/CommitUtils.java   | 23 ++++++-----
 6 files changed, 88 insertions(+), 51 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
index 503f1c42185..a15a4cc533c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -37,10 +37,20 @@ public class BucketIndexConcurrentFileWritesConflictResolutionStrategy
   @Override
   public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
     // TODO : UUID's can clash even for insert/insert, handle that case.
-    Set<String> bucketIdsSetForFirstInstant = extractBucketIds(thisOperation.getMutatedFileIds());
-    Set<String> bucketIdsSetForSecondInstant = extractBucketIds(otherOperation.getMutatedFileIds());
-    Set<String> intersection = new HashSet<>(bucketIdsSetForFirstInstant);
-    intersection.retainAll(bucketIdsSetForSecondInstant);
+    Set<String> partitionBucketIdSetForFirstInstant = thisOperation
+        .getMutatedPartitionAndFileIds()
+        .stream()
+        .map(partitionAndFileId ->
+            BucketIdentifier.partitionBucketIdStr(partitionAndFileId.getLeft(), BucketIdentifier.bucketIdFromFileId(partitionAndFileId.getRight()))
+        ).collect(Collectors.toSet());
+    Set<String> partitionBucketIdSetForSecondInstant = otherOperation
+        .getMutatedPartitionAndFileIds()
+        .stream()
+        .map(partitionAndFileId ->
+            BucketIdentifier.partitionBucketIdStr(partitionAndFileId.getLeft(), BucketIdentifier.bucketIdFromFileId(partitionAndFileId.getRight()))
+        ).collect(Collectors.toSet());
+    Set<String> intersection = new HashSet<>(partitionBucketIdSetForFirstInstant);
+    intersection.retainAll(partitionBucketIdSetForSecondInstant);
     if (!intersection.isEmpty()) {
       LOG.info("Found conflicting writes between first operation = " + thisOperation
           + ", second operation = " + otherOperation + " , intersecting bucket ids " + intersection);
@@ -48,8 +58,4 @@ public class BucketIndexConcurrentFileWritesConflictResolutionStrategy
     }
     return false;
   }
-
-  private static Set<String> extractBucketIds(Set<String> fileIds) {
-    return fileIds.stream().map(BucketIdentifier::bucketIdStrFromFileId).collect(Collectors.toSet());
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 4195c5bfab2..a83fee77eb7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -18,9 +18,7 @@
 
 package org.apache.hudi.client.transaction;
 
-import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
-import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieMetadataWrapper;
@@ -29,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,7 +39,7 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_AC
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
-import static org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord;
+import static org.apache.hudi.common.util.CommitUtils.getPartitionAndFileIdWithoutSuffixFromSpecificRecord;
 
 /**
  * This class is used to hold all information used to identify how to resolve conflicts between instants.
@@ -55,7 +54,7 @@ public class ConcurrentOperation {
   private final String actionState;
   private final String actionType;
   private final String instantTime;
-  private Set<String> mutatedFileIds = Collections.emptySet();
+  private Set<Pair<String, String>> mutatedPartitionAndFileIds = Collections.emptySet();
 
   public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException {
     this.metadataWrapper = new HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant, metaClient));
@@ -91,8 +90,8 @@ public class ConcurrentOperation {
     return operationType;
   }
 
-  public Set<String> getMutatedFileIds() {
-    return mutatedFileIds;
+  public Set<Pair<String, String>> getMutatedPartitionAndFileIds() {
+    return mutatedPartitionAndFileIds;
   }
 
   public Option<HoodieCommitMetadata> getCommitMetadataOption() {
@@ -104,21 +103,21 @@ public class ConcurrentOperation {
       switch (getInstantActionType()) {
         case COMPACTION_ACTION:
           this.operationType = WriteOperationType.COMPACT;
-          this.mutatedFileIds = this.metadataWrapper.getMetadataFromTimeline().getHoodieCompactionPlan().getOperations()
+          this.mutatedPartitionAndFileIds = this.metadataWrapper.getMetadataFromTimeline().getHoodieCompactionPlan().getOperations()
               .stream()
-              .map(HoodieCompactionOperation::getFileId)
+              .map(operation -> Pair.of(operation.getPartitionPath(), operation.getFileId()))
               .collect(Collectors.toSet());
           break;
         case COMMIT_ACTION:
         case DELTA_COMMIT_ACTION:
-          this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
-              .getPartitionToWriteStats()).keySet();
+          this.mutatedPartitionAndFileIds = getPartitionAndFileIdWithoutSuffixFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
+              .getPartitionToWriteStats());
           this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
           break;
         case REPLACE_COMMIT_ACTION:
           if (instant.isCompleted()) {
-            this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
-                this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()).keySet();
+            this.mutatedPartitionAndFileIds = getPartitionAndFileIdWithoutSuffixFromSpecificRecord(
+                this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats());
             this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType());
           } else {
             // we need to have different handling for requested and inflight replacecommit because
@@ -129,16 +128,16 @@ public class ConcurrentOperation {
             if (instant.isRequested()) {
               // for insert_overwrite/insert_overwrite_table clusteringPlan will be empty
               if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) {
-                this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+                this.mutatedPartitionAndFileIds = getPartitionAndFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
                 this.operationType = WriteOperationType.CLUSTER;
               }
             } else {
               if (inflightCommitMetadata != null) {
-                this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
+                this.mutatedPartitionAndFileIds = getPartitionAndFileIdWithoutSuffixFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats());
                 this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
               } else if (requestedReplaceMetadata != null) {
                 // inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
-                this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+                this.mutatedPartitionAndFileIds = getPartitionAndFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
                 this.operationType = WriteOperationType.CLUSTER;
               }
               // NOTE: it cannot be the case that instant is inflight, and both the requested and inflight replacecommit metadata are empty
@@ -157,7 +156,7 @@ public class ConcurrentOperation {
         case DELTA_COMMIT_ACTION:
         case REPLACE_COMMIT_ACTION:
         case LOG_COMPACTION_ACTION:
-          this.mutatedFileIds = CommitUtils.getFileIdWithoutSuffixAndRelativePaths(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats()).keySet();
+          this.mutatedPartitionAndFileIds = CommitUtils.getPartitionAndFileIdWithoutSuffix(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats());
           this.operationType = this.metadataWrapper.getCommitMetadata().getOperationType();
           break;
         default:
@@ -166,12 +165,12 @@ public class ConcurrentOperation {
     }
   }
 
-  private static Set<String> getFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata requestedReplaceMetadata) {
+  private static Set<Pair<String, String>> getPartitionAndFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata requestedReplaceMetadata) {
     return requestedReplaceMetadata
         .getClusteringPlan().getInputGroups()
         .stream()
         .flatMap(ig -> ig.getSlices().stream())
-        .map(HoodieSliceInfo::getFileId)
+        .map(fileSlice -> Pair.of(fileSlice.getPartitionPath(), fileSlice.getFileId()))
         .collect(Collectors.toSet());
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index a622486b17d..22bcd8d0e0c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
@@ -72,10 +73,10 @@ public class SimpleConcurrentFileWritesConflictResolutionStrategy
   @Override
   public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
     // TODO : UUID's can clash even for insert/insert, handle that case.
-    Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
-    Set<String> fileIdsSetForSecondInstant = otherOperation.getMutatedFileIds();
-    Set<String> intersection = new HashSet<>(fileIdsSetForFirstInstant);
-    intersection.retainAll(fileIdsSetForSecondInstant);
+    Set<Pair<String, String>> partitionAndFileIdsSetForFirstInstant = thisOperation.getMutatedPartitionAndFileIds();
+    Set<Pair<String, String>> partitionAndFileIdsSetForSecondInstant = otherOperation.getMutatedPartitionAndFileIds();
+    Set<Pair<String, String>> intersection = new HashSet<>(partitionAndFileIdsSetForFirstInstant);
+    intersection.retainAll(partitionAndFileIdsSetForSecondInstant);
     if (!intersection.isEmpty()) {
       LOG.info("Found conflicting writes between first operation = " + thisOperation
           + ", second operation = " + otherOperation + " , intersecting file ids " + intersection);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index f4498a82d4f..800e6a4acea 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -65,6 +66,7 @@ public class CompactHelpers<T, I, K, O> {
       metadata.addWriteStat(stat.getPartitionPath(), stat);
     }
     metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY, schema);
+    metadata.setOperationType(WriteOperationType.COMPACT);
     if (compactionPlan.getExtraMetadata() != null) {
       compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
     }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
index 24c578606d4..df5b03ec7dd 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -68,9 +68,9 @@ public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
     createCommit(newInstantTime);
     // consider commits before this are all successful
     // writer 1
-    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    createInflightCommit(HoodieTestTable.makeNewCommitTime(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
     // writer 2
-    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    createInflightCommit(HoodieTestTable.makeNewCommitTime(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
     Option<HoodieInstant> lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
     newInstantTime = HoodieTestTable.makeNewCommitTime();
     Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
@@ -87,14 +87,14 @@ public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
     Option<HoodieInstant> lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
     // writer 1 starts
     String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
-    createInflightCommit(currentWriterInstant);
+    createInflightCommit(currentWriterInstant, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
     // writer 2 starts and finishes
     String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
     createCommit(newInstantTime);
 
     Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
     SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
-    HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
+    HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
     timeline = timeline.reload();
     List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
         Collectors.toList());
@@ -111,6 +111,34 @@ public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
     }
   }
 
+  @Test
+  public void testConcurrentWritesWithDifferentPartition() throws Exception {
+    createCommit(HoodieActiveTimeline.createNewInstantTime());
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    // writer 1 starts
+    String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+    createInflightCommit(currentWriterInstant, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
+    // writer 2 starts and finishes
+    String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    createCommit(newInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
+    timeline = timeline.reload();
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
+        Collectors.toList());
+
+    // there should be 1 candidate instant
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
+    // there should be no conflict between writer 1 and writer 2
+    Assertions.assertFalse(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
+  }
+
   private void createCommit(String instantTime) throws Exception {
     String fileId1 = "00000001-file-" + instantTime + "-1";
     String fileId2 = "00000002-file-"  + instantTime + "-2";
@@ -126,25 +154,25 @@ public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy exten
         .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
   }
 
-  private HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName) {
+  private HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName, String partition) {
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
     commitMetadata.addMetadata("test", "test");
     HoodieWriteStat writeStat = new HoodieWriteStat();
     writeStat.setFileId(writeFileName);
-    commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+    commitMetadata.addWriteStat(partition, writeStat);
     commitMetadata.setOperationType(WriteOperationType.INSERT);
     return commitMetadata;
   }
 
-  private HoodieCommitMetadata createCommitMetadata(String instantTime) {
-    return createCommitMetadata(instantTime, "00000001-file-" + instantTime + "-1");
+  private HoodieCommitMetadata createCommitMetadata(String instantTime, String partition) {
+    return createCommitMetadata(instantTime, "00000001-file-" + instantTime + "-1", partition);
   }
 
-  private void createInflightCommit(String instantTime) throws Exception {
+  private void createInflightCommit(String instantTime, String partition) throws Exception {
     String fileId1 = "00000001-file-" + instantTime + "-1";
     String fileId2 = "00000002-file-" + instantTime + "-2";
     HoodieTestTable.of(metaClient)
         .addInflightCommit(instantTime)
-        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+        .withBaseFilesInPartition(partition, fileId1, fileId2);
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index ce94c92fd45..dbfffbeaf09 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -33,9 +34,10 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Helper class to generate commit metadata.
@@ -117,28 +119,27 @@ public class CommitUtils {
     return commitMetadata;
   }
 
-  public static HashMap<String, String> getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(Map<String, List<org.apache.hudi.avro.model.HoodieWriteStat>>
-                                                                                       partitionToWriteStats) {
-    HashMap<String, String> fileIdToPath = new HashMap<>();
+  public static Set<Pair<String, String>> getPartitionAndFileIdWithoutSuffixFromSpecificRecord(Map<String, List<org.apache.hudi.avro.model.HoodieWriteStat>>
+                                                                                                     partitionToWriteStats) {
+    Set<Pair<String, String>> partitionToFileId = new HashSet<>();
     // list all partitions paths
     for (Map.Entry<String, List<org.apache.hudi.avro.model.HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) {
       for (org.apache.hudi.avro.model.HoodieWriteStat stat : entry.getValue()) {
-        fileIdToPath.put(stat.getFileId(), stat.getPath());
+        partitionToFileId.add(Pair.of(entry.getKey(), stat.getFileId()));
       }
     }
-    return fileIdToPath;
+    return partitionToFileId;
   }
 
-  public static HashMap<String, String> getFileIdWithoutSuffixAndRelativePaths(Map<String, List<HoodieWriteStat>>
-      partitionToWriteStats) {
-    HashMap<String, String> fileIdToPath = new HashMap<>();
+  public static Set<Pair<String, String>> getPartitionAndFileIdWithoutSuffix(Map<String, List<HoodieWriteStat>> partitionToWriteStats) {
+    Set<Pair<String, String>> partitionTofileId = new HashSet<>();
     // list all partitions paths
     for (Map.Entry<String, List<HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) {
       for (HoodieWriteStat stat : entry.getValue()) {
-        fileIdToPath.put(stat.getFileId(), stat.getPath());
+        partitionTofileId.add(Pair.of(entry.getKey(), stat.getFileId()));
       }
     }
-    return fileIdToPath;
+    return partitionTofileId;
   }
 
   /**