You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/02/25 11:16:54 UTC

[hudi] branch master updated: [HUDI-3421]Pending clustering may break AbstractTableFileSystemView#getxxBaseFile() (#4810)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7428100  [HUDI-3421]Pending clustering may break AbstractTableFileSystemView#getxxBaseFile() (#4810)
7428100 is described below

commit 742810070bf43bbc13f22ee5514dd997c27b1db2
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Fri Feb 25 19:16:27 2022 +0800

    [HUDI-3421]Pending clustering may break AbstractTableFileSystemView#getxxBaseFile() (#4810)
---
 .../table/action/commit/TestUpsertPartitioner.java |  12 +-
 .../table/view/AbstractTableFileSystemView.java    |  23 +-
 .../table/view/TestHoodieTableFileSystemView.java  | 233 +++++++++++++++++++++
 .../hudi/common/testutils/HoodieTestTable.java     |   2 +-
 4 files changed, 258 insertions(+), 12 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 61d9d1d..3039eb3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -373,23 +373,23 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
             .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
     FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata));
 
-    // create file slice 002
-    FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1);
-    FileCreateUtils.createCommit(basePath, "002");
+    // create file slice 003
+    FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "3", 1);
+    FileCreateUtils.createCommit(basePath, "003");
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     // generate new data to be ingested
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
-    List<HoodieRecord> insertRecords = dataGenerator.generateInserts("003", 100);
+    List<HoodieRecord> insertRecords = dataGenerator.generateInserts("004", 100);
     WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
 
     HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);
     // create UpsertPartitioner
     UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config);
 
-    // for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan
-    // So that only file slice2 can be used for ingestion.
+    // for now we have file slice1 and file slice3 and file slice1 is contained in pending clustering plan
+    // So that only file slice3 can be used for ingestion.
     assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested.");
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index bfcd6f4..208d7ef 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -381,6 +381,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
   }
 
   /**
+   * With async clustering, it is possible to see partial/complete base-files due to inflight-clustering, Ignore those
+   * base-files.
+   *
+   * @param baseFile base File
+   */
+  protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) {
+    List<String> pendingReplaceInstants =
+        metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+    return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime());
+  }
+
+  /**
    * Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction
    * Instant.
    *
@@ -492,7 +505,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
           .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
               .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
               ))
-              .filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()))
+              .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()))
           .filter(Option::isPresent).map(Option::get)
           .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
     } finally {
@@ -511,7 +524,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       } else {
         return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
             .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
-                instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
+                instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
             .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
       }
     } finally {
@@ -547,7 +560,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
           .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
           .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
           fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
-              && !isBaseFileDueToPendingCompaction(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
+              && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
           .map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
     } finally {
       readLock.unlock();
@@ -563,7 +576,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       return fetchAllBaseFiles(partitionPath)
           .filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId()))
           .filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
-          .filter(df -> !isBaseFileDueToPendingCompaction(df))
+          .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df))
           .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
     } finally {
       readLock.unlock();
@@ -953,7 +966,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   protected Option<HoodieBaseFile> getLatestBaseFile(HoodieFileGroup fileGroup) {
     return Option
-        .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst());
+        .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst());
   }
 
   /**
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 924c672..54bc138 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
 import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.CompactionOperation;
@@ -41,6 +42,7 @@ import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -50,12 +52,15 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.jupiter.api.BeforeEach;
@@ -1537,6 +1542,234 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
     assertFalse(fileIds.contains(fileId3));
   }
 
+  /**
+   *
+   * create hoodie table like
+   * .
+   * ├── .hoodie
+   * │   ├── .aux
+   * │   │   └── .bootstrap
+   * │   │       ├── .fileids
+   * │   │       └── .partitions
+   * │   ├── .temp
+   * │   ├── 1.commit
+   * │   ├── 1.commit.requested
+   * │   ├── 1.inflight
+   * │   ├── 2.replacecommit
+   * │   ├── 2.replacecommit.inflight
+   * │   ├── 2.replacecommit.requested
+   * │   ├── 3.commit
+   * │   ├── 3.commit.requested
+   * │   ├── 3.inflight
+   * │   ├── archived
+   * │   └── hoodie.properties
+   * └── 2020
+   *     └── 06
+   *         └── 27
+   *             ├── 5fe477d2-0150-46d4-833c-1e9cc8da9948_1-0-1_3.parquet
+   *             ├── 7e3208c8-fdec-4254-9682-8fff1e51ee8d_1-0-1_2.parquet
+   *             ├── e04b0e2d-1467-46b2-8ea6-f4fe950965a5_1-0-1_1.parquet
+   *             └── f3936b66-b3db-4fc8-a6d0-b1a7559016e6_1-0-1_1.parquet
+   *
+   * First test fsView API with finished clustering:
+   *  1. getLatestBaseFilesBeforeOrOn
+   *  2. getBaseFileOn
+   *  3. getLatestBaseFilesInRange
+   *  4. getAllBaseFiles
+   *  5. getLatestBaseFiles
+   *
+   * Then remove 2.replacecommit, 1.commit, 1.commit.requested, 1.inflight to simulate
+   * pending clustering at the earliest position in the active timeline and test these APIs again.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testHoodieTableFileSystemViewWithPendingClustering() throws IOException {
+    List<String> latestBaseFilesBeforeOrOn;
+    Option<HoodieBaseFile> baseFileOn;
+    List<String> latestBaseFilesInRange;
+    List<String> allBaseFiles;
+    List<String> latestBaseFiles;
+    List<String> latestBaseFilesPerPartition;
+    String partitionPath = "2020/06/27";
+    new File(basePath + "/" + partitionPath).mkdirs();
+    HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+
+    // will create 5 fileId in partition.
+    // fileId1 and fileId2 will be replaced by fileID3
+    // fileId4 and fileId5 will be committed after clustering finished.
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    String fileId3 = UUID.randomUUID().toString();
+    String fileId4 = UUID.randomUUID().toString();
+    String fileId5 = UUID.randomUUID().toString();
+
+    assertFalse(roView.getLatestBaseFiles(partitionPath)
+            .anyMatch(dfile -> dfile.getFileId().equals(fileId1)
+                || dfile.getFileId().equals(fileId2)
+                || dfile.getFileId().equals(fileId3)
+                || dfile.getFileId().equals(fileId4)
+                || dfile.getFileId().equals(fileId5)),
+        "No commit, should not find any data file");
+
+    // first insert commit
+    String commitTime1 = "1";
+    String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
+    String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
+    new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
+    new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
+
+    HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
+
+    // build writeStats
+    HashMap<String, List<String>> partitionToFile1 = new HashMap<>();
+    ArrayList<String> files1 = new ArrayList<>();
+    files1.add(fileId1);
+    files1.add(fileId2);
+    partitionToFile1.put(partitionPath, files1);
+    List<HoodieWriteStat> writeStats1 = buildWriteStats(partitionToFile1, commitTime1);
+
+    HoodieCommitMetadata commitMetadata1 =
+        CommitUtils.buildMetadata(writeStats1, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION);
+    saveAsComplete(commitTimeline, instant1, Option.of(commitMetadata1.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    commitTimeline.reload();
+
+    // replace commit
+    String commitTime2 = "2";
+    String fileName3 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId3);
+    new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile();
+
+    HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime2);
+    Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
+    List<String> replacedFileIds = new ArrayList<>();
+    replacedFileIds.add(fileId1);
+    replacedFileIds.add(fileId2);
+    partitionToReplaceFileIds.put(partitionPath, replacedFileIds);
+
+    HashMap<String, List<String>> partitionToFile2 = new HashMap<>();
+    ArrayList<String> files2 = new ArrayList<>();
+    files2.add(fileId3);
+    partitionToFile2.put(partitionPath, files2);
+    List<HoodieWriteStat> writeStats2 = buildWriteStats(partitionToFile2, commitTime2);
+
+    HoodieCommitMetadata commitMetadata2 =
+        CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION);
+    saveAsComplete(commitTimeline, instant2, Option.of(commitMetadata2.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    // another insert commit
+    String commitTime3 = "3";
+    String fileName4 = FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId4);
+    new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile();
+    HoodieInstant instant3 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime3);
+
+    // build writeStats
+    HashMap<String, List<String>> partitionToFile3 = new HashMap<>();
+    ArrayList<String> files3 = new ArrayList<>();
+    files3.add(fileId4);
+    partitionToFile3.put(partitionPath, files3);
+    List<HoodieWriteStat> writeStats3 = buildWriteStats(partitionToFile3, commitTime3);
+    HoodieCommitMetadata commitMetadata3 =
+        CommitUtils.buildMetadata(writeStats3, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION);
+    saveAsComplete(commitTimeline, instant3, Option.of(commitMetadata3.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    metaClient.reloadActiveTimeline();
+    refreshFsView();
+
+    ArrayList<String> commits = new ArrayList<>();
+    commits.add(commitTime1);
+    commits.add(commitTime2);
+    commits.add(commitTime3);
+
+    // do check
+    latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(2, latestBaseFilesBeforeOrOn.size());
+    assertTrue(latestBaseFilesBeforeOrOn.contains(fileId3));
+    assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4));
+
+    // could see fileId3 because clustering is committed.
+    baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3);
+    assertTrue(baseFileOn.isPresent());
+    assertEquals(baseFileOn.get().getFileId(), fileId3);
+
+    latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(2, latestBaseFilesInRange.size());
+    assertTrue(latestBaseFilesInRange.contains(fileId3));
+    assertTrue(latestBaseFilesInRange.contains(fileId4));
+
+    allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(2, allBaseFiles.size());
+    assertTrue(allBaseFiles.contains(fileId3));
+    assertTrue(allBaseFiles.contains(fileId4));
+
+    // could see fileId3 because clustering is committed.
+    latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(2, latestBaseFiles.size());
+    assertTrue(allBaseFiles.contains(fileId3));
+    assertTrue(allBaseFiles.contains(fileId4));
+
+    // could see fileId3 because clustering is committed.
+    latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(2, latestBaseFiles.size());
+    assertTrue(latestBaseFilesPerPartition.contains(fileId3));
+    assertTrue(latestBaseFilesPerPartition.contains(fileId4));
+
+    HoodieWrapperFileSystem fs = metaClient.getFs();
+    fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false);
+    fs.delete(new Path(basePath + "/.hoodie", "1.inflight"), false);
+    fs.delete(new Path(basePath + "/.hoodie", "1.commit.requested"), false);
+    fs.delete(new Path(basePath + "/.hoodie", "2.replacecommit"), false);
+
+    metaClient.reloadActiveTimeline();
+    refreshFsView();
+    // do check after delete some commit file
+    latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(3, latestBaseFilesBeforeOrOn.size());
+    assertTrue(latestBaseFilesBeforeOrOn.contains(fileId1));
+    assertTrue(latestBaseFilesBeforeOrOn.contains(fileId2));
+    assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4));
+
+    // couldn't see fileId3 because clustering is not committed.
+    baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3);
+    assertFalse(baseFileOn.isPresent());
+
+    latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(3, latestBaseFilesInRange.size());
+    assertTrue(latestBaseFilesInRange.contains(fileId1));
+    assertTrue(latestBaseFilesInRange.contains(fileId2));
+    assertTrue(latestBaseFilesInRange.contains(fileId4));
+
+    allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(3, allBaseFiles.size());
+    assertTrue(allBaseFiles.contains(fileId1));
+    assertTrue(allBaseFiles.contains(fileId2));
+    assertTrue(allBaseFiles.contains(fileId4));
+
+    // couldn't see fileId3 because clustering is not committed.
+    latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(3, latestBaseFiles.size());
+    assertTrue(allBaseFiles.contains(fileId1));
+    assertTrue(allBaseFiles.contains(fileId2));
+    assertTrue(allBaseFiles.contains(fileId4));
+
+    // couldn't see fileId3 because clustering is not committed.
+    latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
+    assertEquals(3, latestBaseFiles.size());
+    assertTrue(latestBaseFilesPerPartition.contains(fileId1));
+    assertTrue(latestBaseFilesPerPartition.contains(fileId2));
+    assertTrue(latestBaseFilesPerPartition.contains(fileId4));
+  }
+
+
+  // Generate Hoodie WriteStat For Given Partition
+  private List<HoodieWriteStat> buildWriteStats(HashMap<String, List<String>> partitionToFileIds, String commitTime) {
+    HashMap<String, List<Pair<String, Integer>>> maps = new HashMap<>();
+    for (String partition : partitionToFileIds.keySet()) {
+      List<Pair<String, Integer>> list = partitionToFileIds.get(partition).stream().map(fileId -> new ImmutablePair<String, Integer>(fileId, 0)).collect(Collectors.toList());
+      maps.put(partition, list);
+    }
+    return HoodieTestTable.generateHoodieWriteStatForPartition(maps, commitTime, false);
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index c55a389..f783122 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -1044,7 +1044,7 @@ public class HoodieTestTable {
     return testTableState;
   }
 
-  private static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
+  public static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
                                                                            String commitTime, boolean bootstrap) {
     List<HoodieWriteStat> writeStats = new ArrayList<>();
     for (Map.Entry<String, List<Pair<String, Integer>>> entry : partitionToFileIdMap.entrySet()) {