You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/21 21:41:11 UTC

[hudi] branch master updated: [HUDI-4792] Batch clean files to delete (#6580)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf9b83ca6 [HUDI-4792] Batch clean files to delete (#6580)
cbf9b83ca6 is described below

commit cbf9b83ca6d3dada14eea551a5bae25144ca0459
Author: Nicolas Paris <ni...@riseup.net>
AuthorDate: Wed Sep 21 23:41:03 2022 +0200

    [HUDI-4792] Batch clean files to delete (#6580)
    
    This  patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition.
    This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions.
    Fixes issue #6373
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../action/clean/CleanPlanActionExecutor.java      |  11 +-
 .../hudi/table/action/clean/CleanPlanner.java      | 237 +++++++++++----------
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java |   4 +-
 .../table/view/AbstractTableFileSystemView.java    |  16 +-
 .../table/view/PriorityBasedFileSystemView.java    |   5 +
 .../view/RemoteHoodieTableFileSystemView.java      |  12 ++
 .../common/table/view/TableFileSystemView.java     |  14 +-
 7 files changed, 176 insertions(+), 123 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 7f3b437178..bd7ec798ed 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -116,9 +117,15 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions(partitionIterator -> {
+            List<String> partitionList = new ArrayList<>();
+            partitionIterator.forEachRemaining(partitionList::add);
+            Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
+            return cleanResult.entrySet().iterator();
+          }, false).collectAsList()
           .stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
       Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 671a522cab..5ed53f7ae0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -60,6 +60,7 @@ import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -225,10 +226,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
    * single file (i.e run it with versionsRetained = 1)
    */
-  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
-    LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestVersions(List<String> partitionPaths) {
+    LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained()
         + " file versions. ");
-    List<CleanFileInfo> deletePaths = new ArrayList<>();
+    Map<String, Pair<Boolean, List<CleanFileInfo>>> map = new HashMap<>();
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
         .flatMap(this::getSavepointedDataFiles)
@@ -236,43 +237,48 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
 
     // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
     // In other words, the file versions only apply to the active file groups.
-    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
-    boolean toDeletePartition = false;
-    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-    for (HoodieFileGroup fileGroup : fileGroups) {
-      int keepVersions = config.getCleanerFileVersionsRetained();
-      // do not cleanup slice required for pending compaction
-      Iterator<FileSlice> fileSliceIterator =
-          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
-      if (isFileGroupInPendingCompaction(fileGroup)) {
-        // We have already saved the last version of file-groups for pending compaction Id
-        keepVersions--;
-      }
+    List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
+    for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
+      List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty()));
+      boolean toDeletePartition = false;
+      for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
+        int keepVersions = config.getCleanerFileVersionsRetained();
+        // do not cleanup slice required for pending compaction
+        Iterator<FileSlice> fileSliceIterator =
+            fileGroup.getAllFileSlices()
+                .filter(fs -> !isFileSliceNeededForPendingCompaction(fs))
+                .iterator();
+        if (isFileGroupInPendingCompaction(fileGroup)) {
+          // We have already saved the last version of file-groups for pending compaction Id
+          keepVersions--;
+        }
 
-      while (fileSliceIterator.hasNext() && keepVersions > 0) {
-        // Skip this most recent version
-        fileSliceIterator.next();
-        keepVersions--;
-      }
-      // Delete the remaining files
-      while (fileSliceIterator.hasNext()) {
-        FileSlice nextSlice = fileSliceIterator.next();
-        Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
-        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
-          // do not clean up a savepoint data file
-          continue;
+        while (fileSliceIterator.hasNext() && keepVersions > 0) {
+          // Skip this most recent version
+          fileSliceIterator.next();
+          keepVersions--;
+        }
+        // Delete the remaining files
+        while (fileSliceIterator.hasNext()) {
+          FileSlice nextSlice = fileSliceIterator.next();
+          Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
+          if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
+            // do not clean up a savepoint data file
+            continue;
+          }
+          deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
         }
-        deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
       }
+      // if there are no valid file groups for the partition, mark it to be deleted
+      if (partitionFileGroupList.getValue().isEmpty()) {
+        toDeletePartition = true;
+      }
+      map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
     }
-    // if there are no valid file groups for the partition, mark it to be deleted
-    if (fileGroups.isEmpty()) {
-      toDeletePartition = true;
-    }
-    return Pair.of(toDeletePartition, deletePaths);
+    return map;
   }
 
-  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
+  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPath) {
     return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
   }
 
@@ -293,9 +299,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
    *         and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
    */
-  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
-    LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
-    List<CleanFileInfo> deletePaths = new ArrayList<>();
+  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) {
+    LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. ");
+    Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanFileInfoPerPartitionMap = new HashMap<>();
 
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
@@ -307,86 +313,90 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
     if (commitTimeline.countInstants() > commitsRetained) {
       Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
       HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
-      // all replaced file groups before earliestCommitToRetain are eligible to clean
-      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
       // add active files
-      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-      for (HoodieFileGroup fileGroup : fileGroups) {
-        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
-
-        if (fileSliceList.isEmpty()) {
-          continue;
-        }
-
-        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
-        String lastVersionBeforeEarliestCommitToRetain =
-            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
-
-        // Ensure there are more than 1 version of the file (we only clean old files from updates)
-        // i.e always spare the last commit.
-        for (FileSlice aSlice : fileSliceList) {
-          Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
-          String fileCommitTime = aSlice.getBaseInstantTime();
-          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
-            // do not clean up a savepoint data file
+      List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
+      for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
+        List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
+        // all replaced file groups before earliestCommitToRetain are eligible to clean
+        deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
+        for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
+          List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
+
+          if (fileSliceList.isEmpty()) {
             continue;
           }
 
-          if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
-            // Dont delete the latest commit and also the last commit before the earliest commit we
-            // are retaining
-            // The window of commit retain == max query run time. So a query could be running which
-            // still
-            // uses this file.
-            if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
-              // move on to the next file
-              continue;
-            }
-          } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-            // This block corresponds to KEEP_LATEST_BY_HOURS policy
-            // Do not delete the latest commit.
-            if (fileCommitTime.equals(lastVersion)) {
-              // move on to the next file
+          String lastVersion = fileSliceList.get(0).getBaseInstantTime();
+          String lastVersionBeforeEarliestCommitToRetain =
+              getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
+
+          // Ensure there are more than 1 version of the file (we only clean old files from updates)
+          // i.e always spare the last commit.
+          for (FileSlice aSlice : fileSliceList) {
+            Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
+            String fileCommitTime = aSlice.getBaseInstantTime();
+            if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
+              // do not clean up a savepoint data file
               continue;
             }
-          }
 
-          // Always keep the last commit
-          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
-              .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
-            // this is a commit, that should be cleaned.
-            aFile.ifPresent(hoodieDataFile -> {
-              deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
-              if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
-                deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
+            if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+              // Dont delete the latest commit and also the last commit before the earliest commit we
+              // are retaining
+              // The window of commit retain == max query run time. So a query could be running which
+              // still
+              // uses this file.
+              if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
+                // move on to the next file
+                continue;
+              }
+            } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+              // This block corresponds to KEEP_LATEST_BY_HOURS policy
+              // Do not delete the latest commit.
+              if (fileCommitTime.equals(lastVersion)) {
+                // move on to the next file
+                continue;
               }
-            });
-            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-              // If merge on read, then clean the log files for the commits as well
-              Predicate<HoodieLogFile> notCDCLogFile =
-                  hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
-              deletePaths.addAll(
-                  aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                      .collect(Collectors.toList()));
             }
-            if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
-              // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow.
-              // Here we need to clean uo these cdc log files.
-              Predicate<HoodieLogFile> isCDCLogFile =
-                  hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
-              deletePaths.addAll(
-                  aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                      .collect(Collectors.toList()));
+
+            // Always keep the last commit
+            if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
+                .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
+              // this is a commit, that should be cleaned.
+              aFile.ifPresent(hoodieDataFile -> {
+                deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
+                if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
+                  deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
+                }
+              });
+              if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+                // If merge on read, then clean the log files for the commits as well
+                Predicate<HoodieLogFile> notCDCLogFile =
+                    hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+                deletePaths.addAll(
+                    aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                        .collect(Collectors.toList()));
+              }
+              if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+                // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow.
+                // Here we need to clean uo these cdc log files.
+                Predicate<HoodieLogFile> isCDCLogFile =
+                    hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+                deletePaths.addAll(
+                    aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                        .collect(Collectors.toList()));
+              }
             }
           }
         }
-      }
-      // if there are no valid file groups for the partition, mark it to be deleted
-      if (fileGroups.isEmpty()) {
-        toDeletePartition = true;
+        // if there are no valid file groups for the partition, mark it to be deleted
+        if (partitionFileGroupList.getValue().isEmpty()) {
+          toDeletePartition = true;
+        }
+        cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
       }
     }
-    return Pair.of(toDeletePartition, deletePaths);
+    return cleanFileInfoPerPartitionMap;
   }
 
   /**
@@ -394,10 +404,11 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained.
    * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to
    * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
+   *
    * @param partitionPath partition path to check
    * @return list of files to clean
    */
-  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
+  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestHours(List<String> partitionPath) {
     return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
   }
 
@@ -463,21 +474,23 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
   /**
    * Returns files to be cleaned for the given partitionPath based on cleaning policy.
    */
-  public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
+  public Map<String, Pair<Boolean, List<CleanFileInfo>>> getDeletePaths(List<String> partitionPaths) {
     HoodieCleaningPolicy policy = config.getCleanerPolicy();
-    Pair<Boolean, List<CleanFileInfo>> deletePaths;
+    Map<String, Pair<Boolean, List<CleanFileInfo>>> deletePaths;
     if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
-      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
+      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
-      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
+      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-      deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
+      deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths);
     } else {
       throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
     }
-    LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
-    if (deletePaths.getKey()) {
-      LOG.info("Partition " + partitionPath + " to be deleted");
+    for (String partitionPath : deletePaths.keySet()) {
+      LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath);
+      if (deletePaths.get(partitionPath).getLeft()) {
+        LOG.info("Partition " + partitionPath + " to be deleted");
+      }
     }
     return deletePaths;
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index baff4ebac8..967e313f4e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -57,7 +57,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
     HoodieWriteConfig writeConfig = getConfigBuilder(true)
         .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
-            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
          HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
@@ -81,7 +81,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
       client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
       client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
 
-      // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
+      // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
       for (int i = 5; i < 9; i++) {
         String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
         client.startCommitWithTime(instantTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 8cfd92d01f..89a184bf49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -116,7 +116,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   /**
    * Refresh commits timeline.
-   * 
+   *
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -750,6 +750,20 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
   }
 
+  @Override
+  public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
+    return getAllFileGroupsIncludingReplaced(partitionPaths)
+        .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
+  }
+
+  private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
+    List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
+    for (String partitionStr : partitionStrList) {
+      fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList())));
+    }
+    return fileGroupPerPartitionList.stream();
+  }
+
   private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
     try {
       readLock.lock();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index ff44c7cef0..9006bd45cb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -204,6 +204,11 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
     return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
   }
 
+  @Override
+  public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
+    return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
+  }
+
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
     return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index bd18ba22a2..5e52767fe2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -51,9 +51,11 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -377,6 +379,16 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
+    ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
+    for (String partitionPath : partitionPaths) {
+      Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
+      fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
+    }
+    return fileGroupPerPartitionList.stream();
+  }
+
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
     Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index c32e2cabb1..9c83c8f19c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -109,18 +109,18 @@ public interface TableFileSystemView {
     /**
      * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
      *
-     * @param partitionPath Partition path
-     * @param maxCommitTime Max Instant Time
+     * @param partitionPath                        Partition path
+     * @param maxCommitTime                        Max Instant Time
      * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
      */
     Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
-        boolean includeFileSlicesInPendingCompaction);
+                                                    boolean includeFileSlicesInPendingCompaction);
 
     /**
      * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
      * file-slice before and after compaction request instant is merged and returned.
-     * 
-     * @param partitionPath Partition Path
+     *
+     * @param partitionPath  Partition Path
      * @param maxInstantTime Max Instant Time
      * @return
      */
@@ -149,10 +149,12 @@ public interface TableFileSystemView {
    */
   Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
 
+  Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);
+
   /**
    * Return Pending Compaction Operations.
    *
-   * @return Pair<Pair<InstantTime,CompactionOperation>>
+   * @return Pair<Pair < InstantTime, CompactionOperation>>
    */
   Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();