You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/04 16:00:08 UTC

[hudi] 02/06: [HUDI-1450] Use metadata table for listing in HoodieROTablePathFilter (apache#2326)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4e642268442782cdd7ad753981dd2571388cd189
Author: Udit Mehrotra <ud...@gmail.com>
AuthorDate: Thu Dec 31 01:20:02 2020 -0800

    [HUDI-1450] Use metadata table for listing in HoodieROTablePathFilter (apache#2326)
    
    [HUDI-1394] [RFC-15] Use metadata table (if present) to get all partition paths (apache#2351)
---
 .../apache/hudi/cli/commands/MetadataCommand.java  |  2 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  1 +
 .../metadata/HoodieBackedTableMetadataWriter.java  |  9 +--
 .../java/org/apache/hudi/table/HoodieTable.java    |  2 +-
 .../PartitionAwareClusteringPlanStrategy.java      |  4 +-
 .../hudi/table/action/rollback/RollbackUtils.java  |  9 +--
 .../action/savepoint/SavepointActionExecutor.java  | 22 ++++---
 .../FlinkCopyOnWriteRollbackActionExecutor.java    |  4 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  2 +-
 .../index/bloom/SparkHoodieGlobalBloomIndex.java   |  2 +-
 .../index/simple/SparkHoodieGlobalSimpleIndex.java |  3 +-
 ...rkInsertOverwriteTableCommitActionExecutor.java |  3 +-
 .../HoodieSparkMergeOnReadTableCompactor.java      |  2 +-
 .../SparkCopyOnWriteRollbackActionExecutor.java    |  4 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |  2 +-
 .../org/apache/hudi/client/TestClientRollback.java |  4 +-
 .../apache/hudi/metadata/TestHoodieFsMetadata.java |  4 +-
 .../hudi/common}/config/HoodieMetadataConfig.java  | 11 ++--
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  9 ++-
 .../common/table/view/FileSystemViewManager.java   | 17 ++++++
 .../metadata/FileSystemBackedTableMetadata.java    | 69 ++++++++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 24 ++++----
 .../metadata/HoodieMetadataFileSystemView.java     | 21 +++++--
 .../hudi/hadoop/HoodieROTablePathFilter.java       | 17 +++++-
 .../reader/DFSHoodieDatasetInputReader.java        |  4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  6 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  7 +++
 .../java/org/apache/hudi/client/TestBootstrap.java | 19 ++++--
 .../apache/hudi/functional/TestCOWDataSource.scala | 15 ++++-
 .../java/org/apache/hudi/dla/DLASyncConfig.java    | 12 ++++
 .../java/org/apache/hudi/dla/HoodieDLAClient.java  |  3 +-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  | 48 +++++++++------
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  2 +-
 .../hudi/sync/common/AbstractSyncHoodieClient.java | 11 +++-
 .../hudi/utilities/HoodieSnapshotCopier.java       | 16 ++++-
 .../hudi/utilities/HoodieSnapshotExporter.java     |  2 +-
 .../hudi/utilities/perf/TimelineServerPerf.java    | 10 +++-
 .../functional/TestHoodieSnapshotCopier.java       |  8 ++-
 38 files changed, 308 insertions(+), 102 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index 68ff1d1..f8a8eed 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadata;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ae56454..138f1be 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -21,6 +21,7 @@ package org.apache.hudi.config;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.bootstrap.BootstrapMode;
 import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.client.common.EngineType;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 98d314d..9282e3b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
@@ -46,7 +47,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -275,14 +275,15 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
 
     // List all partitions in the basePath of the containing dataset
     FileSystem fs = datasetMetaClient.getFs();
-    List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetWriteConfig.getBasePath(), datasetWriteConfig.shouldAssumeDatePartitioning());
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
+        datasetWriteConfig.shouldAssumeDatePartitioning());
+    List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
     LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");
 
     // List all partitions in parallel and collect the files in them
     int parallelism =  Math.max(partitions.size(), 1);
     List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> {
-      FileSystem fsys = datasetMetaClient.getFs();
-      FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(datasetWriteConfig.getBasePath(), partition));
+      FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
       return Pair.of(partition, statuses);
     }, parallelism);
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 684df39..b268512 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -277,7 +277,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
   private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
     if (config.useFileListingMetadata()) {
       FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
-      return new HoodieMetadataFileSystemView(metaClient, this, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
+      return new HoodieMetadataFileSystemView(metaClient, this.metadata, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
     } else {
       return getViewManager().getFileSystemView(metaClient);
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 404cc02..5a42cdc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -66,8 +66,10 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
     try {
       HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
       LOG.info("Scheduling clustering for " + metaClient.getBasePath());
+      HoodieWriteConfig config = getWriteConfig();
       List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
-          getWriteConfig().shouldAssumeDatePartitioning());
+          config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
+          config.shouldAssumeDatePartitioning());
 
       // filter the partition paths if needed to reduce list status
       partitionPaths = filterPartitionPaths(partitionPaths);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 897b448..bb59a66 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -88,12 +88,13 @@ public class RollbackUtils {
    * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type.
    * @param fs instance of {@link FileSystem} to use.
    * @param basePath base path of interest.
-   * @param shouldAssumeDatePartitioning {@code true} if date partitioning should be assumed. {@code false} otherwise.
+   * @param config instance of {@link HoodieWriteConfig} to use.
    * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
    */
-  public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean shouldAssumeDatePartitioning) {
+  public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, HoodieWriteConfig config) {
     try {
-      return FSUtils.getAllPartitionPaths(fs, basePath, shouldAssumeDatePartitioning).stream()
+      return FSUtils.getAllPartitionPaths(fs, basePath, config.useFileListingMetadata(),
+          config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream()
           .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
           .collect(Collectors.toList());
     } catch (IOException e) {
@@ -113,7 +114,7 @@ public class RollbackUtils {
     String commit = instantToRollback.getTimestamp();
     HoodieWriteConfig config = table.getConfig();
     List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning());
+        config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
     int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
     context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
     return context.flatMap(partitions, partitionPath -> {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 90a96b9..c1d2c4a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -89,15 +90,18 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
           "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
 
       context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
-      Map<String, List<String>> latestFilesMap = context.mapToPair(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
-          table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()), partitionPath -> {
-            // Scan all partitions files with this commit time
-            LOG.info("Collecting latest files in partition path " + partitionPath);
-            TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
-            List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
-                .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
-          return new ImmutablePair<>(partitionPath, latestFiles);
-        }, null);
+      List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
+          table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
+          config.shouldAssumeDatePartitioning()
+      );
+      Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
+        // Scan all partitions files with this commit time
+        LOG.info("Collecting latest files in partition path " + partitionPath);
+        TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+        List<String> latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+            .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+        return new ImmutablePair<>(partitionPath, latestFiles);
+      }, null);
       HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
       // Nothing to save in the savepoint
       table.getActiveTimeline().createNewInstant(
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
index 28b713b..6221dd5 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
@@ -64,8 +64,8 @@ public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa
 
   @Override
   protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning());
+    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
+        table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
     return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 1fa3ad0..17453bb 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -94,7 +94,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
         List<ListingBasedRollbackRequest> rollbackRequests;
         if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
           rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-              table.getConfig().shouldAssumeDatePartitioning());
+              table.getConfig());
         } else {
           rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
         }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java
index 771c01a..310dbd2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java
@@ -64,7 +64,7 @@ public class SparkHoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     try {
       List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
-          config.shouldAssumeDatePartitioning());
+          config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
       return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
     } catch (IOException e) {
       throw new HoodieIOException("Failed to load all partitions", e);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java
index bdb4991..092c62b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java
@@ -104,7 +104,8 @@ public class SparkHoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends
                                                                       final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     try {
-      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
+      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
+          config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
       // Obtain the latest data files from all the partitions.
       return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);
     } catch (IOException e) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
index c014515..6cc474f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -51,7 +51,8 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor
     Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
     try {
       List<String> partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
-          table.getMetaClient().getBasePath(), false);
+          table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
+          false);
       JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
       if (partitionPaths != null && partitionPaths.size() > 0) {
         context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 96d52a1..21ffd46 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -196,7 +196,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
     List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
-        config.shouldAssumeDatePartitioning());
+        config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
 
     // filter the partition paths if needed to reduce list status
     partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
index 965d805..b770bbf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
@@ -66,8 +66,8 @@ public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa
 
   @Override
   protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning());
+    List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
+        table.getMetaClient().getBasePath(), config);
     return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 7e3faf3..1a1cb3f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -93,7 +93,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
         List<ListingBasedRollbackRequest> rollbackRequests;
         if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
           rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-              table.getConfig().shouldAssumeDatePartitioning());
+              table.getConfig());
         } else {
           rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
         }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index d04a2df..eab9bb1 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -99,8 +99,10 @@ public class TestClientRollback extends HoodieClientTestBase {
       statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
       // Verify there are no errors
       assertNoWriteErrors(statuses);
+      HoodieWriteConfig config = getConfig();
       List<String> partitionPaths =
-          FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
+          FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), config.useFileListingMetadata(),
+              config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
       final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
index e5b1b9f..b9c3511 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
@@ -60,7 +61,6 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -736,7 +736,7 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
     // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
     // in the .hoodie folder.
     List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
-        false);
+        false, false, false);
     assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
 
     // Metadata table should automatically compact and clean
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
similarity index 94%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
rename to hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 54c4ac3..02e67e1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -16,12 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.config;
-
-import org.apache.hudi.common.config.DefaultHoodieConfig;
+package org.apache.hudi.common.config;
 
 import javax.annotation.concurrent.Immutable;
-
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -31,7 +28,7 @@ import java.util.Properties;
  * Configurations used by the HUDI Metadata Table.
  */
 @Immutable
-public class HoodieMetadataConfig extends DefaultHoodieConfig {
+public final class HoodieMetadataConfig extends DefaultHoodieConfig {
 
   public static final String METADATA_PREFIX = "hoodie.metadata";
 
@@ -65,6 +62,10 @@ public class HoodieMetadataConfig extends DefaultHoodieConfig {
   public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
   public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
 
+  // We can set the default to true for readers, as it will internally default to listing from filesystem if metadata
+  // table is not found
+  public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;
+
   private HoodieMetadataConfig(Properties props) {
     super(props);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 94d05b3..d671ec8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -251,12 +252,14 @@ public class FSUtils {
     }
   }
 
-  public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean assumeDatePartitioning)
-      throws IOException {
+  public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean useFileListingFromMetadata, boolean verifyListings,
+                                                  boolean assumeDatePartitioning) throws IOException {
     if (assumeDatePartitioning) {
       return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
     } else {
-      return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
+      HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(fs.getConf(), basePathStr, "/tmp/", useFileListingFromMetadata,
+          verifyListings, false, false);
+      return tableMetadata.getAllPartitionPaths();
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index d310181..c5a31fa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Functions.Function2;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -158,6 +159,22 @@ public class FileSystemViewManager {
     return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
   }
 
+  public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieTableMetaClient metaClient,
+                                                                       boolean useFileListingFromMetadata,
+                                                                       boolean verifyListings) {
+    LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
+    if (useFileListingFromMetadata) {
+      return new HoodieMetadataFileSystemView(metaClient,
+          metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+          true,
+          verifyListings);
+    }
+
+    return new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+  }
+
+
   /**
    * Create a remote file System view for a table.
    * 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
new file mode 100644
index 0000000..73ce8e4
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
+
+  private final SerializableConfiguration hadoopConf;
+  private final String datasetBasePath;
+  private final boolean assumeDatePartitioning;
+
+  public FileSystemBackedTableMetadata(SerializableConfiguration conf, String datasetBasePath, boolean assumeDatePartitioning) {
+    this.hadoopConf = conf;
+    this.datasetBasePath = datasetBasePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+  }
+
+  @Override
+  public FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException {
+    FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
+    return FSUtils.getAllDataFilesInPartition(fs, partitionPath);
+  }
+
+  @Override
+  public List<String> getAllPartitionPaths() throws IOException {
+    FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
+    if (assumeDatePartitioning) {
+      return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
+    } else {
+      return FSUtils.getAllFoldersWithPartitionMetaFile(fs, datasetBasePath);
+    }
+  }
+
+  @Override
+  public Option<String> getSyncedInstantTime() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isInSync() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index cdff41c..4858e6e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -30,7 +30,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -85,7 +84,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
   private final String spillableMapDirectory;
 
   // Readers for the base and log file which store the metadata
-  private transient HoodieFileReader<GenericRecord> basefileReader;
+  private transient HoodieFileReader<GenericRecord> baseFileReader;
   private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
 
   public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
@@ -108,7 +107,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
       try {
         this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
       } catch (TableNotFoundException e) {
-        LOG.error("Metadata table was not found at path " + metadataBasePath);
+        LOG.warn("Metadata table was not found at path " + metadataBasePath);
         this.enabled = false;
       } catch (Exception e) {
         LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e);
@@ -144,9 +143,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
         LOG.error("Failed to retrieve list of partition from metadata", e);
       }
     }
-
-    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
-    return FSUtils.getAllPartitionPaths(fs, datasetBasePath, assumeDatePartitioning);
+    return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths();
   }
 
   /**
@@ -199,7 +196,8 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the file system
       timer.startTimer();
-      List<String> actualPartitions  = FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
+      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning);
+      List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
       metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer()));
 
       Collections.sort(actualPartitions);
@@ -287,9 +285,9 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
 
     // Retrieve record from base file
     HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-    if (basefileReader != null) {
+    if (baseFileReader != null) {
       HoodieTimer timer = new HoodieTimer().startTimer();
-      Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
+      Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
       if (baseRecord.isPresent()) {
         hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
             metaClient.getTableConfig().getPayloadClass());
@@ -338,7 +336,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
     Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile();
     if (basefile.isPresent()) {
       String basefilePath = basefile.get().getPath();
-      basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+      baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
       LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime());
     }
 
@@ -365,9 +363,9 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
   }
 
   public void closeReaders() {
-    if (basefileReader != null) {
-      basefileReader.close();
-      basefileReader = null;
+    if (baseFileReader != null) {
+      baseFileReader.close();
+      baseFileReader = null;
     }
     logRecordScanner = null;
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
similarity index 68%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
rename to hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index 8c23ea8..9f9e405 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -24,19 +24,30 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.table.HoodieTable;
 
 /**
  * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
  */
 public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
-  private HoodieTable hoodieTable;
 
-  public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTable table,
+  private final HoodieTableMetadata tableMetadata;
+
+  public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata,
                                       HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) {
     super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync);
-    this.hoodieTable = table;
+    this.tableMetadata = tableMetadata;
+  }
+
+  public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient,
+                                      HoodieTimeline visibleActiveTimeline,
+                                      boolean useFileListingFromMetadata,
+                                      boolean verifyListings) {
+    super(metaClient, visibleActiveTimeline);
+    this.tableMetadata = HoodieTableMetadata.create(metaClient.getHadoopConf(), metaClient.getBasePath(),
+        FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR, useFileListingFromMetadata, verifyListings,
+        false, false);
   }
 
   /**
@@ -47,6 +58,6 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
    */
   @Override
   protected FileStatus[] listPartition(Path partitionPath) throws IOException {
-    return hoodieTable.metadata().getAllFilesInPartition(partitionPath);
+    return tableMetadata.getAllFilesInPartition(partitionPath);
   }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 1e616f8..baedb16 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -22,9 +22,11 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
@@ -43,6 +45,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
+
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then
  * always accept
@@ -163,9 +170,13 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
-          HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
-              metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), fs.listStatus(folder));
-          List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());
+          boolean useFileListingFromMetadata = getConf().getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
+          boolean verifyFileListing = getConf().getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
+          HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+              useFileListingFromMetadata, verifyFileListing);
+          String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder);
+
+          List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
           // populate the cache
           if (!hoodiePathCache.containsKey(folder.toString())) {
             hoodiePathCache.put(folder.toString(), new HashSet<>());
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 43d5fde..a41da2c 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.integ.testsuite.reader;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -85,7 +86,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
     // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus
     // calls in metrics as they are not part of normal HUDI operation.
     FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
-    List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), false);
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(),
+        HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false);
     // Sort partition so we can pick last N partitions by default
     Collections.sort(partitionPaths);
     if (!partitionPaths.isEmpty()) {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 988c9f9..472f450 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.HoodieWriteResult
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
@@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
-import org.apache.hudi.internal.{HoodieDataSourceInternalWriter, DataSourceInternalWriterHelper}
+import org.apache.hudi.internal.{DataSourceInternalWriterHelper, HoodieDataSourceInternalWriter}
 import org.apache.hudi.sync.common.AbstractSyncTool
 import org.apache.log4j.LogManager
 import org.apache.spark.SPARK_VERSION
@@ -372,6 +372,8 @@ private[hudi] object HoodieSparkSqlWriter {
       ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
     hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
     hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
+    hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
+    hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
     hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
     hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
       DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 294050b..02b5abd 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -23,6 +23,11 @@ import org.apache.hudi.common.config.TypedProperties
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.collection.JavaConverters.mapAsScalaMapConverter
 
+import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE
+import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE
+import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP
+import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP
+
 /**
  * WriterUtils to assist in write path in Datasource and tests.
  */
@@ -46,6 +51,8 @@ object HoodieWriterUtils {
       RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
       PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
       KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
+      METADATA_ENABLE_PROP -> DEFAULT_METADATA_ENABLE.toString,
+      METADATA_VALIDATE_PROP -> DEFAULT_METADATA_VALIDATE.toString,
       COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
       INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
       STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index 7e13a5e..521ff05 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -28,6 +28,7 @@ import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelect
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
@@ -372,7 +373,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
     assertEquals(totalRecords, records.size());
@@ -390,7 +392,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
     assertEquals(totalRecords, records.size());
@@ -406,7 +409,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
         true, HoodieRecord.HOODIE_META_COLUMNS);
@@ -423,7 +427,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
         HoodieRecord.HOODIE_META_COLUMNS);
@@ -438,7 +443,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
         Arrays.asList("_row_key"));
@@ -455,7 +461,8 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+            HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
         Arrays.asList("_row_key"));
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 51ca72e..c3843cc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -22,6 +22,7 @@ import java.util.function.Supplier
 import java.util.stream.Stream
 
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -34,6 +35,8 @@ import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.JavaConversions._
 
@@ -82,13 +85,16 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
 
-  @Test def testCopyOnWriteStorage() {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testCopyOnWriteStorage(isMetadataEnabled: Boolean) {
     // Insert Operation
     val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
@@ -96,7 +102,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
     val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
 
     // Snapshot query
-    val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
+    val snapshotDF1 = spark.read.format("org.apache.hudi")
+      .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
+      .load(basePath + "/*/*/*/*")
     assertEquals(100, snapshotDF1.count())
 
     // Upsert based on the written table with Hudi metadata columns
@@ -120,6 +128,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     inputDF2.write.format("org.apache.hudi")
       .options(commonOpts)
+      .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -128,6 +137,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     // Snapshot Query
     val snapshotDF3 = spark.read.format("org.apache.hudi")
+      .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
       .load(basePath + "/*/*/*/*")
     assertEquals(100, snapshotDF3.count()) // still 100, since we only updated
 
@@ -149,6 +159,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
     emptyDF.write.format("org.apache.hudi")
       .options(commonOpts)
+      .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
       .mode(SaveMode.Append)
       .save(basePath)
 
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
index 8a92de6..5b50ada 100644
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.dla;
 
 import com.beust.jcommander.Parameter;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 
 import java.io.Serializable;
@@ -68,6 +70,12 @@ public class DLASyncConfig implements Serializable {
   @Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2")
   public Boolean useDLASyncHiveStylePartitioning = false;
 
+  @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+  public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+  @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
+  public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
@@ -88,6 +96,8 @@ public class DLASyncConfig implements Serializable {
     newConfig.skipROSuffix = cfg.skipROSuffix;
     newConfig.skipRTSync = cfg.skipRTSync;
     newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
+    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+    newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
     newConfig.supportTimestamp = cfg.supportTimestamp;
     return newConfig;
   }
@@ -99,6 +109,8 @@ public class DLASyncConfig implements Serializable {
         + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
         + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
         + ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning
+        + ", useFileListingFromMetadata=" + useFileListingFromMetadata
+        + ", verifyMetadataFileListing=" + verifyMetadataFileListing
         + ", help=" + help + '}';
   }
 }
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index 34a96c9..02c07d6 100644
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -70,7 +70,8 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
   private PartitionValueExtractor partitionValueExtractor;
 
   public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+        syncConfig.verifyMetadataFileListing, fs);
     this.dlaConfig = syncConfig;
     try {
       this.partitionValueExtractor =
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 6c8fd8f..dd9d483 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
 import com.beust.jcommander.Parameter;
 
 import java.io.Serializable;
@@ -77,6 +79,12 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
   public Boolean skipROSuffix = false;
 
+  @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+  public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+  @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
+  public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
@@ -99,6 +107,8 @@ public class HiveSyncConfig implements Serializable {
     newConfig.jdbcUrl = cfg.jdbcUrl;
     newConfig.tableName = cfg.tableName;
     newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+    newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
     newConfig.supportTimestamp = cfg.supportTimestamp;
     newConfig.decodePartition = cfg.decodePartition;
     return newConfig;
@@ -107,23 +117,25 @@ public class HiveSyncConfig implements Serializable {
   @Override
   public String toString() {
     return "HiveSyncConfig{"
-      + "databaseName='" + databaseName + '\''
-      + ", tableName='" + tableName + '\''
-      + ", baseFileFormat='" + baseFileFormat + '\''
-      + ", hiveUser='" + hiveUser + '\''
-      + ", hivePass='" + hivePass + '\''
-      + ", jdbcUrl='" + jdbcUrl + '\''
-      + ", basePath='" + basePath + '\''
-      + ", partitionFields=" + partitionFields
-      + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
-      + ", assumeDatePartitioning=" + assumeDatePartitioning
-      + ", usePreApacheInputFormat=" + usePreApacheInputFormat
-      + ", useJdbc=" + useJdbc
-      + ", autoCreateDatabase=" + autoCreateDatabase
-      + ", skipROSuffix=" + skipROSuffix
-      + ", help=" + help
-      + ", supportTimestamp=" + supportTimestamp
-      + ", decodePartition=" + decodePartition
-      + '}';
+        + "databaseName='" + databaseName + '\''
+        + ", tableName='" + tableName + '\''
+        + ", baseFileFormat='" + baseFileFormat + '\''
+        + ", hiveUser='" + hiveUser + '\''
+        + ", hivePass='" + hivePass + '\''
+        + ", jdbcUrl='" + jdbcUrl + '\''
+        + ", basePath='" + basePath + '\''
+        + ", partitionFields=" + partitionFields
+        + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+        + ", assumeDatePartitioning=" + assumeDatePartitioning
+        + ", usePreApacheInputFormat=" + usePreApacheInputFormat
+        + ", useJdbc=" + useJdbc
+        + ", autoCreateDatabase=" + autoCreateDatabase
+        + ", skipROSuffix=" + skipROSuffix
+        + ", help=" + help
+        + ", supportTimestamp=" + supportTimestamp
+        + ", decodePartition=" + decodePartition
+        + ", useFileListingFromMetadata=" + useFileListingFromMetadata
+        + ", verifyMetadataFileListing=" + verifyMetadataFileListing
+        + '}';
   }
 }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 88f4c10..5c0c128 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -76,7 +76,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
   private HiveConf configuration;
 
   public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
-    super(cfg.basePath, cfg.assumeDatePartitioning, fs);
+    super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs);
     this.syncConfig = cfg;
     this.fs = fs;
 
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 419ea16..8c91848 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -40,18 +40,25 @@ import java.util.List;
 import java.util.Map;
 
 public abstract class AbstractSyncHoodieClient {
+
   private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+
   protected final HoodieTableMetaClient metaClient;
   protected final HoodieTableType tableType;
   protected final FileSystem fs;
   private String basePath;
   private boolean assumeDatePartitioning;
+  private boolean useFileListingFromMetadata;
+  private boolean verifyMetadataFileListing;
 
-  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+                                  boolean verifyMetadataFileListing, FileSystem fs) {
     this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
     this.tableType = metaClient.getTableType();
     this.basePath = basePath;
     this.assumeDatePartitioning = assumeDatePartitioning;
+    this.useFileListingFromMetadata = useFileListingFromMetadata;
+    this.verifyMetadataFileListing = verifyMetadataFileListing;
     this.fs = fs;
   }
 
@@ -120,7 +127,7 @@ public abstract class AbstractSyncHoodieClient {
     if (!lastCommitTimeSynced.isPresent()) {
       LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs);
       try {
-        return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning);
+        return FSUtils.getAllPartitionPaths(fs, basePath, useFileListingFromMetadata, verifyMetadataFileListing, assumeDatePartitioning);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to list all partitions in " + basePath, e);
       }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 2826108..0066d86 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -68,10 +69,18 @@ public class HoodieSnapshotCopier implements Serializable {
 
     @Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we assume date partitioning?")
     boolean shouldAssumeDatePartitioning = false;
+
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+    @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
+    public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
   }
 
   public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir,
-      final boolean shouldAssumeDatePartitioning) throws IOException {
+                       final boolean shouldAssumeDatePartitioning,
+                       final boolean useFileListingFromMetadata,
+                       final boolean verifyMetadataFileListing) throws IOException {
     FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
     final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
     final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir);
@@ -88,7 +97,7 @@ public class HoodieSnapshotCopier implements Serializable {
     LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
         latestCommitTimestamp));
 
-    List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning);
+    List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, useFileListingFromMetadata, verifyMetadataFileListing, shouldAssumeDatePartitioning);
     if (partitions.size() > 0) {
       LOG.info(String.format("The job needs to copy %d partitions.", partitions.size()));
 
@@ -183,7 +192,8 @@ public class HoodieSnapshotCopier implements Serializable {
 
     // Copy
     HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning);
+    copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning, cfg.useFileListingFromMetadata,
+        cfg.verifyMetadataFileListing);
 
     // Stop the job
     jsc.stop();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index c69d004..2f5f461 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -154,7 +154,7 @@ public class HoodieSnapshotExporter {
   }
 
   private List<String> getPartitions(FileSystem fs, Config cfg) throws IOException {
-    return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, false);
+    return FSUtils.getAllPartitionPaths(fs, cfg.sourceBasePath, true, false, false);
   }
 
   private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index f338e52..27dc709 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.perf;
 
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -85,7 +86,8 @@ public class TimelineServerPerf implements Serializable {
 
   public void run() throws IOException {
 
-    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, true);
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(timelineServer.getFs(), cfg.basePath, cfg.useFileListingFromMetadata,
+        cfg.verifyMetadataFileListing, true);
     Collections.shuffle(allPartitionPaths);
     List<String> selected = allPartitionPaths.stream().filter(p -> !p.contains("error")).limit(cfg.maxPartitions)
         .collect(Collectors.toList());
@@ -294,6 +296,12 @@ public class TimelineServerPerf implements Serializable {
     @Parameter(names = {"--wait-for-manual-queries", "-ww"})
     public Boolean waitForManualQueries = false;
 
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+
+    @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
+    public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
     @Parameter(names = {"--help", "-h"})
     public Boolean help = false;
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
index 95af888..0b19fa0 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.functional;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -67,7 +68,9 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
 
     // Do the snapshot
     HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc(), basePath, outputPath, true);
+    copier.snapshot(jsc(), basePath, outputPath, true,
+        HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
 
     // Nothing changed; we just bail out
     assertEquals(fs.listStatus(new Path(basePath)).length, 1);
@@ -120,7 +123,8 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
 
     // Do a snapshot copy
     HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc(), basePath, outputPath, false);
+    copier.snapshot(jsc(), basePath, outputPath, false, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE);
 
     // Check results
     assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));