You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:58 UTC

[hudi] 17/17: [HUDI-4934] Revert batch clean files (#6813)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d201c8420a5e1d999565fca9af04cb52638cbb72
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Wed Sep 28 15:51:45 2022 -0700

    [HUDI-4934] Revert batch clean files (#6813)
    
    * Revert "[HUDI-4792] Batch clean files to delete (#6580)"
    This reverts commit cbf9b83ca6d3dada14eea551a5bae25144ca0459.
---
 .../action/clean/CleanPlanActionExecutor.java      |  11 +-
 .../hudi/table/action/clean/CleanPlanner.java      | 215 ++++++++++-----------
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java |   4 +-
 .../table/view/AbstractTableFileSystemView.java    |  16 +-
 .../table/view/PriorityBasedFileSystemView.java    |   5 -
 .../view/RemoteHoodieTableFileSystemView.java      |  12 --
 .../common/table/view/TableFileSystemView.java     |  14 +-
 7 files changed, 112 insertions(+), 165 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index bd7ec798ed..7f3b437178 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -42,7 +42,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -117,15 +116,9 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
-          .parallelize(partitionsToClean, cleanerParallelism)
-          .mapPartitions(partitionIterator -> {
-            List<String> partitionList = new ArrayList<>();
-            partitionIterator.forEachRemaining(partitionList::add);
-            Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
-            return cleanResult.entrySet().iterator();
-          }, false).collectAsList()
+          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
           .stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
       Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index e85793d711..64e69b1d2a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -58,7 +58,6 @@ import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -223,10 +222,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
    * single file (i.e run it with versionsRetained = 1)
    */
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestVersions(List<String> partitionPaths) {
-    LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained()
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
+    LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
         + " file versions. ");
-    Map<String, Pair<Boolean, List<CleanFileInfo>>> map = new HashMap<>();
+    List<CleanFileInfo> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
         .flatMap(this::getSavepointedDataFiles)
@@ -234,48 +233,43 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
 
     // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
     // In other words, the file versions only apply to the active file groups.
-    List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
-    for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
-      List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty()));
-      boolean toDeletePartition = false;
-      for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
-        int keepVersions = config.getCleanerFileVersionsRetained();
-        // do not cleanup slice required for pending compaction
-        Iterator<FileSlice> fileSliceIterator =
-            fileGroup.getAllFileSlices()
-                .filter(fs -> !isFileSliceNeededForPendingCompaction(fs))
-                .iterator();
-        if (isFileGroupInPendingCompaction(fileGroup)) {
-          // We have already saved the last version of file-groups for pending compaction Id
-          keepVersions--;
-        }
+    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
+    boolean toDeletePartition = false;
+    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+    for (HoodieFileGroup fileGroup : fileGroups) {
+      int keepVersions = config.getCleanerFileVersionsRetained();
+      // do not cleanup slice required for pending compaction
+      Iterator<FileSlice> fileSliceIterator =
+          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
+      if (isFileGroupInPendingCompaction(fileGroup)) {
+        // We have already saved the last version of file-groups for pending compaction Id
+        keepVersions--;
+      }
 
-        while (fileSliceIterator.hasNext() && keepVersions > 0) {
-          // Skip this most recent version
-          fileSliceIterator.next();
-          keepVersions--;
-        }
-        // Delete the remaining files
-        while (fileSliceIterator.hasNext()) {
-          FileSlice nextSlice = fileSliceIterator.next();
-          Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
-          if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
-            // do not clean up a savepoint data file
-            continue;
-          }
-          deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
-        }
+      while (fileSliceIterator.hasNext() && keepVersions > 0) {
+        // Skip this most recent version
+        fileSliceIterator.next();
+        keepVersions--;
       }
-      // if there are no valid file groups for the partition, mark it to be deleted
-      if (partitionFileGroupList.getValue().isEmpty()) {
-        toDeletePartition = true;
+      // Delete the remaining files
+      while (fileSliceIterator.hasNext()) {
+        FileSlice nextSlice = fileSliceIterator.next();
+        Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
+        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
+          // do not clean up a savepoint data file
+          continue;
+        }
+        deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
       }
-      map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
     }
-    return map;
+    // if there are no valid file groups for the partition, mark it to be deleted
+    if (fileGroups.isEmpty()) {
+      toDeletePartition = true;
+    }
+    return Pair.of(toDeletePartition, deletePaths);
   }
 
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPath) {
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
     return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
   }
 
@@ -296,9 +290,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
    *         and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
    */
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) {
-    LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. ");
-    Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanFileInfoPerPartitionMap = new HashMap<>();
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
+    LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
+    List<CleanFileInfo> deletePaths = new ArrayList<>();
 
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
@@ -310,79 +304,75 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
     if (commitTimeline.countInstants() > commitsRetained) {
       Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
       HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
+      // all replaced file groups before earliestCommitToRetain are eligible to clean
+      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
       // add active files
-      List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
-      for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
-        List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
-        // all replaced file groups before earliestCommitToRetain are eligible to clean
-        deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
-        for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
-          List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
-
-          if (fileSliceList.isEmpty()) {
+      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+      for (HoodieFileGroup fileGroup : fileGroups) {
+        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
+
+        if (fileSliceList.isEmpty()) {
+          continue;
+        }
+
+        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
+        String lastVersionBeforeEarliestCommitToRetain =
+            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
+
+        // Ensure there are more than 1 version of the file (we only clean old files from updates)
+        // i.e always spare the last commit.
+        for (FileSlice aSlice : fileSliceList) {
+          Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
+          String fileCommitTime = aSlice.getBaseInstantTime();
+          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
+            // do not clean up a savepoint data file
             continue;
           }
 
-          String lastVersion = fileSliceList.get(0).getBaseInstantTime();
-          String lastVersionBeforeEarliestCommitToRetain =
-              getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
-
-          // Ensure there are more than 1 version of the file (we only clean old files from updates)
-          // i.e always spare the last commit.
-          for (FileSlice aSlice : fileSliceList) {
-            Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
-            String fileCommitTime = aSlice.getBaseInstantTime();
-            if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
-              // do not clean up a savepoint data file
+          if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+            // Dont delete the latest commit and also the last commit before the earliest commit we
+            // are retaining
+            // The window of commit retain == max query run time. So a query could be running which
+            // still
+            // uses this file.
+            if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
+              // move on to the next file
               continue;
             }
-
-            if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
-              // Dont delete the latest commit and also the last commit before the earliest commit we
-              // are retaining
-              // The window of commit retain == max query run time. So a query could be running which
-              // still
-              // uses this file.
-              if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
-                // move on to the next file
-                continue;
-              }
-            } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-              // This block corresponds to KEEP_LATEST_BY_HOURS policy
-              // Do not delete the latest commit.
-              if (fileCommitTime.equals(lastVersion)) {
-                // move on to the next file
-                continue;
-              }
+          } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+            // This block corresponds to KEEP_LATEST_BY_HOURS policy
+            // Do not delete the latest commit.
+            if (fileCommitTime.equals(lastVersion)) {
+              // move on to the next file
+              continue;
             }
+          }
 
-            // Always keep the last commit
-            if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
-                .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
-              // this is a commit, that should be cleaned.
-              aFile.ifPresent(hoodieDataFile -> {
-                deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
-                if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
-                  deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
-                }
-              });
-              if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-                // If merge on read, then clean the log files for the commits as well
-                deletePaths.addAll(
-                    aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                        .collect(Collectors.toList()));
+          // Always keep the last commit
+          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
+              .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
+            // this is a commit, that should be cleaned.
+            aFile.ifPresent(hoodieDataFile -> {
+              deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
+              if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
+                deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
               }
+            });
+            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+              // 1. If merge on read, then clean the log files for the commits as well;
+              // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow.
+              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                  .collect(Collectors.toList()));
             }
           }
         }
-        // if there are no valid file groups for the partition, mark it to be deleted
-        if (partitionFileGroupList.getValue().isEmpty()) {
-          toDeletePartition = true;
-        }
-        cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
+      }
+      // if there are no valid file groups for the partition, mark it to be deleted
+      if (fileGroups.isEmpty()) {
+        toDeletePartition = true;
       }
     }
-    return cleanFileInfoPerPartitionMap;
+    return Pair.of(toDeletePartition, deletePaths);
   }
 
   /**
@@ -390,11 +380,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained.
    * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to
    * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
-   *
    * @param partitionPath partition path to check
    * @return list of files to clean
    */
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestHours(List<String> partitionPath) {
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
     return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
   }
 
@@ -448,23 +437,21 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
   /**
    * Returns files to be cleaned for the given partitionPath based on cleaning policy.
    */
-  public Map<String, Pair<Boolean, List<CleanFileInfo>>> getDeletePaths(List<String> partitionPaths) {
+  public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
     HoodieCleaningPolicy policy = config.getCleanerPolicy();
-    Map<String, Pair<Boolean, List<CleanFileInfo>>> deletePaths;
+    Pair<Boolean, List<CleanFileInfo>> deletePaths;
     if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
-      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths);
+      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
-      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths);
+      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-      deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths);
+      deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
     } else {
       throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
     }
-    for (String partitionPath : deletePaths.keySet()) {
-      LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath);
-      if (deletePaths.get(partitionPath).getLeft()) {
-        LOG.info("Partition " + partitionPath + " to be deleted");
-      }
+    LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
+    if (deletePaths.getKey()) {
+      LOG.info("Partition " + partitionPath + " to be deleted");
     }
     return deletePaths;
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index 967e313f4e..baff4ebac8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -57,7 +57,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
     HoodieWriteConfig writeConfig = getConfigBuilder(true)
         .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
-            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
          HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
@@ -81,7 +81,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
       client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
       client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
 
-      // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
+      // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
       for (int i = 5; i < 9; i++) {
         String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
         client.startCommitWithTime(instantTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 89a184bf49..8cfd92d01f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -116,7 +116,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   /**
    * Refresh commits timeline.
-   *
+   * 
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -750,20 +750,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
   }
 
-  @Override
-  public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
-    return getAllFileGroupsIncludingReplaced(partitionPaths)
-        .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
-  }
-
-  private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
-    List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
-    for (String partitionStr : partitionStrList) {
-      fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList())));
-    }
-    return fileGroupPerPartitionList.stream();
-  }
-
   private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
     try {
       readLock.lock();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 9006bd45cb..ff44c7cef0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -204,11 +204,6 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
     return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
   }
 
-  @Override
-  public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
-    return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
-  }
-
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
     return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 5e52767fe2..bd18ba22a2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -51,11 +51,9 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -379,16 +377,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     }
   }
 
-  @Override
-  public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
-    ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
-    for (String partitionPath : partitionPaths) {
-      Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
-      fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
-    }
-    return fileGroupPerPartitionList.stream();
-  }
-
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
     Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 9c83c8f19c..c32e2cabb1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -109,18 +109,18 @@ public interface TableFileSystemView {
     /**
      * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
      *
-     * @param partitionPath                        Partition path
-     * @param maxCommitTime                        Max Instant Time
+     * @param partitionPath Partition path
+     * @param maxCommitTime Max Instant Time
      * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
      */
     Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
-                                                    boolean includeFileSlicesInPendingCompaction);
+        boolean includeFileSlicesInPendingCompaction);
 
     /**
      * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
      * file-slice before and after compaction request instant is merged and returned.
-     *
-     * @param partitionPath  Partition Path
+     * 
+     * @param partitionPath Partition Path
      * @param maxInstantTime Max Instant Time
      * @return
      */
@@ -149,12 +149,10 @@ public interface TableFileSystemView {
    */
   Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
 
-  Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);
-
   /**
    * Return Pending Compaction Operations.
    *
-   * @return Pair<Pair < InstantTime, CompactionOperation>>
+   * @return Pair<Pair<InstantTime,CompactionOperation>>
    */
   Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();