You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/26 07:39:53 UTC

[hudi] branch master updated: [HUDI-5485] Add File System View API for batch listing and improve savepoint performance with metadata table (#7690)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 45da30dc3ec [HUDI-5485] Add File System View API for batch listing and improve savepoint performance with metadata table (#7690)
45da30dc3ec is described below

commit 45da30dc3ecfd4e7315d8f33d95504a5ac7cbd1a
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Jan 25 23:39:46 2023 -0800

    [HUDI-5485] Add File System View API for batch listing and improve savepoint performance with metadata table (#7690)
    
    The savepoint action needs to list all the latest base files before or on the target instant time. When the metadata table is enabled, for each partition, the metadata table is scanned from the Spark executor, which leads to a lot of S3 requests. This significantly degrades the savepoint performance when the number of table partitions is huge. Such repeated scanning of the metadata table is unnecessary and can be avoided by scanning the metadata table once.
    
    This commit fixes the issue by adding a new file system view API that batches the file listing of all partitions so that only one metadata table lookup call (`tableMetadata.getAllFilesInPartitions`) is invoked on the driver side for savepoint action.
---
 .../action/savepoint/SavepointActionExecutor.java  |  55 +++++-
 .../java/org/apache/hudi/client/TestSavepoint.java | 152 +++++++++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |   2 +-
 .../table/view/AbstractTableFileSystemView.java    | 190 +++++++++++++++++----
 .../table/view/PriorityBasedFileSystemView.java    |   7 +
 .../view/RemoteHoodieTableFileSystemView.java      |  45 ++++-
 .../common/table/view/TableFileSystemView.java     |  11 ++
 .../metadata/HoodieMetadataFileSystemView.java     |  25 +++
 .../hudi/timeline/service/RequestHandler.java      |   9 +
 .../timeline/service/handlers/BaseFileHandler.java |  11 ++
 10 files changed, 458 insertions(+), 49 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 3bfdd20721e..4e0ae1da223 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -84,15 +85,33 @@ public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
           "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
 
       context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime + " " + table.getConfig().getTableName());
-      List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
-      Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
-        // Scan all partitions files with this commit time
-        LOG.info("Collecting latest files in partition path " + partitionPath);
-        TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
-        List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
-            .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
-        return new ImmutablePair<>(partitionPath, latestFiles);
-      }, null);
+      TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+      Map<String, List<String>> latestFilesMap;
+      // NOTE: for performance, we have to use different logic here for listing the latest files
+      // before or on the given instant:
+      // (1) using metadata-table-based file listing: instead of parallelizing the partition
+      // listing which incurs unnecessary metadata table reads, we directly read the metadata
+      // table once in a batch manner through the timeline server;
+      // (2) using direct file system listing:  we parallelize the partition listing so that
+      // each partition can be listed on the file system concurrently through Spark.
+      // Note that
+      if (shouldUseBatchLookup(config)) {
+        latestFilesMap = view.getAllLatestBaseFilesBeforeOrOn(instantTime).entrySet().stream()
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().map(HoodieBaseFile::getFileName).collect(Collectors.toList())));
+      } else {
+        List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
+        latestFilesMap = context.mapToPair(partitions, partitionPath -> {
+          // Scan all partitions files with this commit time
+          LOG.info("Collecting latest files in partition path " + partitionPath);
+          List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+              .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+          return new ImmutablePair<>(partitionPath, latestFiles);
+        }, null);
+      }
+
       HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
       // Nothing to save in the savepoint
       table.getActiveTimeline().createNewInstant(
@@ -106,4 +125,22 @@ public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
       throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
     }
   }
+
+  /**
+   * Whether to use batch lookup for listing the latest base files in metadata table.
+   * <p>
+   * Note that metadata table has to be enabled, and the storage type of the file system view
+   * cannot be EMBEDDED_KV_STORE or SPILLABLE_DISK (these two types are not integrated with
+   * metadata table, see HUDI-5612).
+   *
+   * @param config Write configs.
+   * @return {@code true} if using batch lookup; {@code false} otherwise.
+   */
+  private boolean shouldUseBatchLookup(HoodieWriteConfig config) {
+    FileSystemViewStorageType storageType =
+        config.getClientSpecifiedViewStorageConfig().getStorageType();
+    return config.getMetadataConfig().enabled()
+        && !FileSystemViewStorageType.EMBEDDED_KV_STORE.equals(storageType)
+        && !FileSystemViewStorageType.SPILLABLE_DISK.equals(storageType);
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
new file mode 100644
index 00000000000..765c24d6a65
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.table.view.FileSystemViewStorageType.EMBEDDED_KV_STORE;
+import static org.apache.hudi.common.table.view.FileSystemViewStorageType.MEMORY;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for savepoint operation.
+ */
+public class TestSavepoint extends HoodieClientTestBase {
+
+  private static Stream<Arguments> testSavepointParams() {
+    return Arrays.stream(new Object[][] {
+        {true, MEMORY}, {true, EMBEDDED_KV_STORE},
+        {false, MEMORY}, {false, EMBEDDED_KV_STORE}
+    }).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testSavepointParams")
+  public void testSavepoint(boolean enableMetadataTable,
+                            FileSystemViewStorageType storageType) throws IOException {
+    HoodieWriteConfig cfg = getWriteConfig(enableMetadataTable, storageType);
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+      List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+      client.savepoint("user", "hoodie-savepoint-unit-test");
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
+      HoodieTimeline savepointTimeline = table.getActiveTimeline().getSavePointTimeline();
+      assertEquals(1, savepointTimeline.countInstants());
+
+      Map<String, HoodieSavepointPartitionMetadata> savepointPartitionMetadataMap =
+          TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+                  savepointTimeline.getInstantDetails(savepointTimeline.firstInstant().get()).get())
+              .getPartitionMetadata();
+
+      HoodieTimeline commitsTimeline = table.getActiveTimeline().getCommitsTimeline();
+      Map<String, List<HoodieWriteStat>> partitionToWriteStats = HoodieCommitMetadata.fromBytes(
+              commitsTimeline.getInstantDetails(commitsTimeline.firstInstant().get()).get(),
+              HoodieCommitMetadata.class)
+          .getPartitionToWriteStats();
+
+      assertEquals(partitionToWriteStats.size(), savepointPartitionMetadataMap.size());
+      for (Map.Entry<String, List<HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) {
+        String partition = entry.getKey();
+        assertTrue(savepointPartitionMetadataMap.containsKey(partition));
+        assertEquals(
+            entry.getValue().stream().map(path -> getFileNameFromPath(path.getPath()))
+                .sorted().collect(Collectors.toList()),
+            savepointPartitionMetadataMap.get(partition).getSavepointDataFile()
+                .stream().sorted().collect(Collectors.toList())
+        );
+      }
+    }
+  }
+
+  private HoodieWriteConfig getWriteConfig(boolean enableMetadataTable,
+                                           FileSystemViewStorageType storageType) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2)
+        .withDeleteParallelism(2)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(
+            ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024)
+            .orcMaxFileSize(1024 * 1024).build())
+        .forTable(RAW_TRIPS_TEST_NAME)
+        .withEmbeddedTimelineServerEnabled(true)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
+            .withRemoteServerPort(timelineServicePort)
+            .withStorageType(storageType)
+            .build())
+        .withMetadataConfig(
+            HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        .build();
+  }
+
+  private String getFileNameFromPath(String path) {
+    String[] parts = path.split("/");
+    return parts[parts.length - 1];
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 94e15206c39..2b4172781c6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -21,8 +21,8 @@ package org.apache.hudi.testutils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index ca847bedaf0..8ea017977e9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -52,6 +53,7 @@ import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -60,6 +62,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
@@ -122,7 +125,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   /**
    * Refresh commits timeline.
-   * 
+   *
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -164,13 +167,13 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    * Build FileGroups from passed in file-status.
    */
   protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline,
-      boolean addPendingCompactionFileSlice) {
+                                                  boolean addPendingCompactionFileSlice) {
     return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline,
         addPendingCompactionFileSlice);
   }
 
   protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> baseFileStream,
-      Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
+                                                  Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
     Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
         baseFileStream.collect(Collectors.groupingBy(baseFile -> {
           String partitionPathStr = getPartitionPathFor(baseFile);
@@ -227,7 +230,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
         // get replace instant mapping for each partition, fileId
         return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e ->
-                new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
+            new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
       } catch (HoodieIOException ex) {
 
         if (ex.getIOException() instanceof FileNotFoundException) {
@@ -289,6 +292,114 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    */
   protected abstract void resetViewState();
 
+  /**
+   * Batch loading all the partitions if needed.
+   *
+   * @return A list of relative partition paths of all partitions.
+   */
+  private List<String> ensureAllPartitionsLoadedCorrectly() {
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+    try {
+      List<String> formattedPartitionList = getAllPartitionPaths().stream()
+          .map(this::formatPartitionKey).collect(Collectors.toList());
+      ensurePartitionsLoadedCorrectly(formattedPartitionList);
+      return formattedPartitionList;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get all partition paths", e);
+    }
+  }
+
+  /**
+   * Allows lazily loading the partitions if needed.
+   *
+   * @param partitionList list of partitions to be loaded if not present.
+   */
+  private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+    Set<String> partitionSet = new HashSet<>();
+    synchronized (addedPartitions) {
+      partitionList.forEach(partition -> {
+        if (!addedPartitions.containsKey(partition) && !isPartitionAvailableInStore(partition)) {
+          partitionSet.add(partition);
+        }
+      });
+
+      if (!partitionSet.isEmpty()) {
+        long beginTs = System.currentTimeMillis();
+        // Not loaded yet
+        try {
+          LOG.info("Building file system view for partitions " + partitionSet);
+
+          // Pairs of relative partition path and absolute partition path
+          List<Pair<String, Path>> absolutePartitionPathList = partitionSet.stream()
+              .map(partition -> Pair.of(
+                  partition, FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+              .collect(Collectors.toList());
+          long beginLsTs = System.currentTimeMillis();
+          Map<Pair<String, Path>, FileStatus[]> statusesMap =
+              listPartitions(absolutePartitionPathList);
+          long endLsTs = System.currentTimeMillis();
+          LOG.debug("Time taken to list partitions " + partitionSet + " =" + (endLsTs - beginLsTs));
+          statusesMap.forEach((partitionPair, statuses) -> {
+            String relativePartitionStr = partitionPair.getLeft();
+            List<HoodieFileGroup> groups = addFilesToView(statuses);
+            if (groups.isEmpty()) {
+              storePartitionView(relativePartitionStr, new ArrayList<>());
+            }
+            LOG.debug("#files found in partition (" + relativePartitionStr + ") =" + statuses.length);
+          });
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to list base files in partitions " + partitionSet, e);
+        }
+        long endTs = System.currentTimeMillis();
+        LOG.debug("Time to load partition " + partitionSet + " =" + (endTs - beginTs));
+      }
+
+      partitionSet.forEach(partition ->
+          addedPartitions.computeIfAbsent(partition, partitionPathStr -> true)
+      );
+    }
+  }
+
+  /***
+   * @return A list of relative partition paths of all partitions.
+   * @throws IOException upon error.
+   */
+  protected List<String> getAllPartitionPaths() throws IOException {
+    throw new HoodieException("Getting all partition paths with file system listing sequentially "
+        + "can be very slow. This should not be invoked.");
+  }
+
+  /**
+   * @param partitionPathList A list of pairs of the relative and absolute paths of the partitions.
+   * @return all the files from the partitions.
+   * @throws IOException upon error.
+   */
+  protected Map<Pair<String, Path>, FileStatus[]> listPartitions(
+      List<Pair<String, Path>> partitionPathList) throws IOException {
+    Map<Pair<String, Path>, FileStatus[]> fileStatusMap = new HashMap<>();
+
+    for (Pair<String, Path> partitionPair : partitionPathList) {
+      Path absolutePartitionPath = partitionPair.getRight();
+      try {
+        fileStatusMap.put(partitionPair, metaClient.getFs().listStatus(absolutePartitionPath));
+      } catch (IOException e) {
+        // Create the path if it does not exist already
+        if (!metaClient.getFs().exists(absolutePartitionPath)) {
+          metaClient.getFs().mkdirs(absolutePartitionPath);
+          fileStatusMap.put(partitionPair, new FileStatus[0]);
+        } else {
+          // in case the partition path was created by another caller
+          fileStatusMap.put(partitionPair, metaClient.getFs().listStatus(absolutePartitionPath));
+        }
+      }
+    }
+
+    return fileStatusMap;
+  }
+
   /**
    * Allows lazily loading the partitions if needed.
    *
@@ -537,19 +648,38 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchAllStoredFileGroups(partitionPath)
-          .filter(fileGroup -> !isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
-          .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
-              .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
-              ))
-              .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()))
-          .filter(Option::isPresent).map(Option::get)
-          .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
+      return getLatestBaseFilesBeforeOrOnFromCache(partitionPath, maxCommitTime);
     } finally {
       readLock.unlock();
     }
   }
 
+  @Override
+  public final Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+
+      List<String> formattedPartitionList = ensureAllPartitionsLoadedCorrectly();
+      return formattedPartitionList.stream().collect(Collectors.toMap(
+          Function.identity(),
+          partitionPath -> getLatestBaseFilesBeforeOrOnFromCache(partitionPath, maxCommitTime)
+      ));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOnFromCache(String partitionPath, String maxCommitTime) {
+    return fetchAllStoredFileGroups(partitionPath)
+        .filter(fileGroup -> !isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
+        .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
+            .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
+            ))
+            .filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()))
+        .filter(Option::isPresent).map(Option::get)
+        .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
+  }
+
   @Override
   public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, String instantTime, String fileId) {
     try {
@@ -560,8 +690,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
         return Option.empty();
       } else {
         return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
-            .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
-                instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
+                .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
+                    instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
             .map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
       }
     } finally {
@@ -596,8 +726,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       return fetchAllStoredFileGroups()
           .filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
           .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
-          fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
-              && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
+              fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
+                  && !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
           .map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
     } finally {
       readLock.unlock();
@@ -627,9 +757,9 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
       return fetchLatestFileSlices(partitionPath)
-              .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
-              .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
-              .map(this::addBootstrapBaseFileIfPresent);
+          .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
+          .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
+          .map(this::addBootstrapBaseFileIfPresent);
     } finally {
       readLock.unlock();
     }
@@ -684,26 +814,26 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   @Override
   public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime,
-      boolean includeFileSlicesInPendingCompaction) {
+                                                               boolean includeFileSlicesInPendingCompaction) {
     try {
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
       Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
-              .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
-              .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
+          .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+          .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
       if (includeFileSlicesInPendingCompaction) {
         return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
-                .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
-                .map(this::addBootstrapBaseFileIfPresent);
+            .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+            .map(this::addBootstrapBaseFileIfPresent);
       } else {
         return allFileSliceStream
-                .map(sliceStream ->
-                        Option.fromJavaOptional(sliceStream
-                                .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
-                                .filter(slice -> !slice.isEmpty())
-                                .findFirst()))
-                .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+            .map(sliceStream ->
+                Option.fromJavaOptional(sliceStream
+                    .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
+                    .filter(slice -> !slice.isEmpty())
+                    .findFirst()))
+            .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
       }
     } finally {
       readLock.unlock();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 62edc4daa33..44332117367 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 /**
@@ -145,6 +146,12 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
         secondaryView::getLatestBaseFilesBeforeOrOn);
   }
 
+  @Override
+  public Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    return execute(maxCommitTime, preferredView::getAllLatestBaseFilesBeforeOrOn,
+        secondaryView::getAllLatestBaseFilesBeforeOrOn);
+  }
+
   @Override
   public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fileId) {
     return execute(partitionPath, fileId, preferredView::getLatestBaseFile, secondaryView::getLatestBaseFile);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 5ada3dc45b5..bffd6353aa7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -54,6 +54,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -88,6 +89,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
       String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
   public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL =
       String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
+  public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL =
+      String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
 
   public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
       String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
@@ -155,17 +158,17 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     this.serverPort = viewConf.getRemoteViewServerPort();
     this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
     if (viewConf.isRemoteTimelineClientRetryEnabled()) {
-      retryHelper =  new RetryHelper(
-              viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
-              viewConf.getRemoteTimelineClientMaxRetryNumbers(),
-              viewConf.getRemoteTimelineInitialRetryIntervalMs(),
-              viewConf.getRemoteTimelineClientRetryExceptions(),
-              "Sending request");
+      retryHelper = new RetryHelper(
+          viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+          viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+          viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+          viewConf.getRemoteTimelineClientRetryExceptions(),
+          "Sending request");
     }
   }
 
   private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
-      RequestMethod method) throws IOException {
+                               RequestMethod method) throws IOException {
     ValidationUtils.checkArgument(!closed, "View already closed");
 
     URIBuilder builder =
@@ -252,13 +255,36 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     return getLatestBaseFilesFromParams(paramsMap, LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL);
   }
 
+  @Override
+  public Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
+
+    try {
+      Map<String, List<BaseFileDTO>> dataFileMap = executeRequest(
+          ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
+          paramsMap,
+          new TypeReference<Map<String, List<BaseFileDTO>>>() {
+          },
+          RequestMethod.GET);
+      return dataFileMap.entrySet().stream().collect(
+          Collectors.toMap(
+              Map.Entry::getKey,
+              entry -> entry.getValue().stream().map(BaseFileDTO::toHoodieBaseFile)));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String instantTime, String fileId) {
     Map<String, String> paramsMap = getParamsWithAdditionalParams(partitionPath,
         new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime, fileId});
     try {
       List<BaseFileDTO> dataFiles = executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap,
-          new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
+          new TypeReference<List<BaseFileDTO>>() {
+          }, RequestMethod.GET);
       return Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
@@ -505,7 +531,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
     try {
       List<BaseFileDTO> dataFiles = executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap,
-          new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
+          new TypeReference<List<BaseFileDTO>>() {
+          }, RequestMethod.GET);
       return Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 18c9a9af998..97b972a75d1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 /**
@@ -64,6 +65,16 @@ public interface TableFileSystemView {
      */
     Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitionPath, String maxCommitTime);
 
+    /**
+     * Streams the latest version base files in all partitions with precondition that
+     * commitTime(file) before maxCommitTime.
+     *
+     * @param maxCommitTime The max commit time to consider.
+     * @return A {@link Map} of partition path to the latest version base files before or on the
+     * commit time
+     */
+    Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime);
+
     /**
      * Stream all the latest data files pass.
      */
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index ab5b5f6b4db..76b341609de 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -24,12 +24,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
@@ -65,6 +70,26 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
     return tableMetadata.getAllFilesInPartition(partitionPath);
   }
 
+  @Override
+  protected List<String> getAllPartitionPaths() throws IOException {
+    return tableMetadata.getAllPartitionPaths();
+  }
+  
+  @Override
+  protected Map<Pair<String, Path>, FileStatus[]> listPartitions(List<Pair<String, Path>> partitionPathList) throws IOException {
+    Map<String, Pair<String, Path>> absoluteToPairMap = partitionPathList.stream()
+        .collect(Collectors.toMap(
+            pair -> pair.getRight().toString(),
+            Function.identity()
+        ));
+    return tableMetadata.getAllFilesInPartitions(
+            partitionPathList.stream().map(pair -> pair.getRight().toString()).collect(Collectors.toList()))
+        .entrySet().stream().collect(Collectors.toMap(
+            entry -> absoluteToPairMap.get(entry.getKey()),
+            Map.Entry::getValue
+        ));
+  }
+
   @Override
   public void reset() {
     super.reset();
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 4c999b9ad22..7ad175c692a 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -56,6 +56,7 @@ import org.jetbrains.annotations.NotNull;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -276,6 +277,14 @@ public class RequestHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT", 1);
+      Map<String, List<BaseFileDTO>> dtos = dataFileHandler.getAllLatestDataFilesBeforeOrOn(
+          ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+          ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid")));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
index 5033a86f8f2..a34b49843fa 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -61,6 +62,16 @@ public class BaseFileHandler extends Handler {
         .map(BaseFileDTO::fromHoodieBaseFile).collect(Collectors.toList());
   }
 
+  public Map<String, List<BaseFileDTO>> getAllLatestDataFilesBeforeOrOn(String basePath, String maxInstantTime) {
+    return viewManager.getFileSystemView(basePath)
+        .getAllLatestBaseFilesBeforeOrOn(maxInstantTime)
+        .entrySet().stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> entry.getValue().map(BaseFileDTO::fromHoodieBaseFile).collect(Collectors.toList())
+        ));
+  }
+
   public List<BaseFileDTO> getLatestDataFileOn(String basePath, String partitionPath, String instantTime,
                                                String fileId) {
     List<BaseFileDTO> result = new ArrayList<>();