You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/20 05:20:41 UTC

[hudi] branch master updated: [HUDI 1308] Harden RFC-15 Implementation based on production testing (#2441)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ca0625  [HUDI 1308] Harden RFC-15 Implementation based on production testing (#2441)
5ca0625 is described below

commit 5ca0625b277efa3a73d2ae0fbdfa4c6163f312d2
Author: vinoth chandar <vi...@users.noreply.github.com>
AuthorDate: Tue Jan 19 21:20:28 2021 -0800

    [HUDI 1308] Harden RFC-15 Implementation based on production testing (#2441)
    
    Addresses leaks, perf degradation observed during testing. These were regressions from the original rfc-15 PoC implementation.
    
    * Pass a single instance of HoodieTableMetadata everywhere
    * Fix tests and add config for enabling metrics
     - Removed special casing of assumeDatePartitioning inside FSUtils#getAllPartitionPaths()
     - Consequently, IOException is never thrown and many files had to be adjusted
    - More diligent handling of open file handles in metadata table
     - Added config for controlling reuse of connections
     - Added config for turning off fallback to listing, so we can see tests fail
     - Changed all ipf listing code to cache/amortize the open/close for better performance
     - Timelineserver also reuses connections, for better performance
     - Without timelineserver, when metadata table is opened from executors, reuse is not allowed
     - HoodieMetadataConfig passed into HoodieTableMetadata#create as argument.
     -  Fix TestHoodieBackedTableMetadata#testSync
---
 .../apache/hudi/cli/commands/MetadataCommand.java  |  11 +-
 .../embedded/EmbeddedTimelineServerHelper.java     |   4 +-
 .../client/embedded/EmbeddedTimelineService.java   |  13 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  33 ++--
 .../metadata/HoodieBackedTableMetadataWriter.java  |  35 +++--
 .../hudi/metadata/HoodieTableMetadataWriter.java   |   2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  66 +++-----
 .../hudi/table/HoodieTimelineArchiveLog.java       |  25 ++-
 .../action/clean/BaseCleanActionExecutor.java      |   2 +-
 .../hudi/table/action/clean/CleanPlanner.java      |  12 +-
 .../PartitionAwareClusteringPlanStrategy.java      |  72 ++++-----
 .../hudi/table/action/rollback/RollbackUtils.java  |  16 +-
 .../action/savepoint/SavepointActionExecutor.java  |   5 +-
 .../FlinkCopyOnWriteRollbackActionExecutor.java    |   2 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |   3 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |   8 +-
 .../index/bloom/SparkHoodieGlobalBloomIndex.java   |  11 +-
 .../index/simple/SparkHoodieGlobalSimpleIndex.java |  13 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |   2 -
 ...rkInsertOverwriteTableCommitActionExecutor.java |  23 +--
 .../HoodieSparkMergeOnReadTableCompactor.java      |   3 +-
 .../SparkCopyOnWriteRollbackActionExecutor.java    |   2 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |   3 +-
 .../org/apache/hudi/client/TestClientRollback.java |   3 +-
 .../hudi/metadata/TestHoodieBackedMetadata.java    |  35 +++--
 .../java/org/apache/hudi/table/TestCleaner.java    |  28 ++--
 .../hudi/table/TestHoodieMergeOnReadTable.java     |   4 +-
 .../table/action/compact/CompactionTestBase.java   |   4 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |  82 +++++++++-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  30 +++-
 .../SerializableSupplier.java}                     |  14 +-
 .../org/apache/hudi/common/metrics/Metric.java     |   4 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |   6 +
 .../common/table/view/FileSystemViewManager.java   |  80 ++++++----
 .../table/view/FileSystemViewStorageConfig.java    |  40 +++--
 .../view/SpillableMapBasedFileSystemView.java      |   2 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    | 135 +++++++++--------
 .../metadata/FileSystemBackedTableMetadata.java    |   5 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 167 ++++++++++++---------
 .../metadata/HoodieMetadataFileSystemView.java     |  26 +++-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  15 +-
 ...anner.java => TimelineMergedTableMetadata.java} |  34 ++---
 .../apache/hudi/hadoop/HoodieHFileInputFormat.java |  12 +-
 .../hudi/hadoop/HoodieParquetInputFormat.java      |  13 +-
 .../hudi/hadoop/HoodieROTablePathFilter.java       |  32 ++--
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  76 ++++++----
 .../utils/HoodieRealtimeInputFormatUtils.java      |  49 +++---
 .../reader/DFSHoodieDatasetInputReader.java        |   3 +-
 .../java/org/apache/hudi/client/TestBootstrap.java |  12 +-
 .../hudi/sync/common/AbstractSyncHoodieClient.java |  18 +--
 .../hudi/timeline/service/TimelineService.java     |  12 +-
 .../TestRemoteHoodieTableFileSystemView.java       |   8 +-
 .../hudi/utilities/HoodieSnapshotCopier.java       |   2 +-
 .../hudi/utilities/HoodieSnapshotExporter.java     |   6 +-
 .../hudi/utilities/perf/TimelineServerPerf.java    |   2 +-
 55 files changed, 748 insertions(+), 577 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index 169b927..5b00554 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -146,7 +146,8 @@ public class MetadataCommand implements CommandMarker {
   @CliCommand(value = "metadata stats", help = "Print stats about the metadata")
   public String stats() throws IOException {
     HoodieCLI.getTableMetaClient();
-    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false);
+    HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp");
     Map<String, String> stats = metadata.stats();
 
     StringBuffer out = new StringBuffer("\n");
@@ -162,8 +163,9 @@ public class MetadataCommand implements CommandMarker {
   public String listPartitions() throws IOException {
     HoodieCLI.getTableMetaClient();
     initJavaSparkContext();
-    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc),
-        HoodieCLI.basePath, "/tmp", true, false, false, false);
+    HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
+        HoodieCLI.basePath, "/tmp");
 
     StringBuffer out = new StringBuffer("\n");
     if (!metadata.enabled()) {
@@ -194,7 +196,8 @@ public class MetadataCommand implements CommandMarker {
       @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true)
       final String partition) throws IOException {
     HoodieCLI.getTableMetaClient();
-    HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false);
+    HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
+    HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp");
 
     StringBuffer out = new StringBuffer("\n");
     if (!metaReader.enabled()) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
index 6396490..fa74aa3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
@@ -49,8 +49,8 @@ public class EmbeddedTimelineServerHelper {
       // Run Embedded Timeline Server
       LOG.info("Starting Timeline service !!");
       Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
-      timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null),
-          config.getEmbeddedTimelineServerPort(), config.getClientSpecifiedViewStorageConfig()));
+      timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(),
+          config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath()));
       timelineServer.get().startServer();
       updateWriteConfigWithTimelineServer(timelineServer.get(), config);
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 2940fa2..386f7d5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client.embedded;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
@@ -41,14 +42,22 @@ public class EmbeddedTimelineService {
   private int serverPort;
   private int preferredPort;
   private String hostAddr;
+  private HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
   private final FileSystemViewStorageConfig config;
+  private final HoodieMetadataConfig metadataConfig;
+  private final String basePath;
+
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort, FileSystemViewStorageConfig config) {
+  public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
+                                 HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath) {
     setHostAddr(embeddedTimelineServiceHostAddr);
+    this.context = context;
     this.config = config;
+    this.basePath = basePath;
+    this.metadataConfig = metadataConfig;
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
     this.preferredPort = embeddedTimelineServerPort;
@@ -64,7 +73,7 @@ public class EmbeddedTimelineService {
       // Reset to default if set to Remote
       builder.withStorageType(FileSystemViewStorageType.MEMORY);
     }
-    return FileSystemViewManager.createViewManager(hadoopConf, builder.build());
+    return FileSystemViewManager.createViewManager(context, metadataConfig, builder.build(), basePath);
   }
 
   public void startServer() throws IOException {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e411f85..d8135d4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -82,7 +82,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl";
   public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
   public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
-  public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
   public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
   public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
   public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
@@ -97,8 +96,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
   public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
   public static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
-  public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
-  public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
+
   public static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
   public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
   public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
@@ -157,6 +155,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
   private FileSystemViewStorageConfig viewStorageConfig;
   private HoodiePayloadConfig hoodiePayloadConfig;
+  private HoodieMetadataConfig metadataConfig;
 
   private EngineType engineType;
 
@@ -176,6 +175,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
     this.viewStorageConfig = clientSpecifiedViewStorageConfig;
     this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
+    this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
   }
 
   public static HoodieWriteConfig.Builder newBuilder() {
@@ -222,7 +222,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   }
 
   public Boolean shouldAssumeDatePartitioning() {
-    return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
+    return metadataConfig.shouldAssumeDatePartitioning();
   }
 
   public boolean shouldUseExternalSchemaTransformation() {
@@ -258,7 +258,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   }
 
   public int getFileListingParallelism() {
-    return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
+    return metadataConfig.getFileListingParallelism();
   }
 
   public boolean shouldRollbackUsingMarkers() {
@@ -837,6 +837,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return hoodiePayloadConfig;
   }
 
+  public HoodieMetadataConfig getMetadataConfig() {
+    return metadataConfig;
+  }
+
   /**
    * Commit call back configs.
    */
@@ -888,11 +892,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
    * File listing metadata configs.
    */
   public boolean useFileListingMetadata() {
-    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP));
+    return metadataConfig.useFileListingMetadata();
   }
 
   public boolean getFileListingMetadataVerify() {
-    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP));
+    return metadataConfig.validateFileListingMetadata();
   }
 
   public int getMetadataInsertParallelism() {
@@ -1007,11 +1011,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withFileListingParallelism(int parallelism) {
-      props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
-      return this;
-    }
-
     public Builder withUserDefinedBulkInsertPartitionerClass(String className) {
       props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className);
       return this;
@@ -1118,11 +1117,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withAssumeDatePartitioning(boolean assumeDatePartitioning) {
-      props.setProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP, String.valueOf(assumeDatePartitioning));
-      return this;
-    }
-
     public Builder withWriteStatusClass(Class<? extends WriteStatus> writeStatusClass) {
       props.setProperty(HOODIE_WRITE_STATUS_CLASS_PROP, writeStatusClass.getName());
       return this;
@@ -1198,8 +1192,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
-      setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
-          DEFAULT_PARALLELISM);
+
       setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
           DEFAULT_ROLLBACK_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP),
@@ -1220,8 +1213,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
       setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
           DEFAULT_HOODIE_AUTO_COMMIT);
-      setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
-          HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
       setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), HOODIE_WRITE_STATUS_CLASS_PROP,
           DEFAULT_HOODIE_WRITE_STATUS_CLASS);
       setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1466a17..1f76a5e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 
@@ -108,9 +109,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
       HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
       initialize(engineContext, datasetMetaClient);
       if (enabled) {
-        // (re) init the metadata for reading.
-        initTableMetadata();
-
         // This is always called even in case the table was created for the first time. This is because
         // initFromFilesystem() does file listing and hence may take a long time during which some new updates
         // may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
@@ -148,7 +146,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
         .withAutoCommit(true)
         .withAvroSchemaValidate(true)
         .withEmbeddedTimelineServerEnabled(false)
-        .withAssumeDatePartitioning(false)
         .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
         .withSchema(HoodieMetadataRecord.getClassSchema().toString())
         .forTable(tableName)
@@ -220,12 +217,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient);
 
-  private void initTableMetadata() {
-    this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getBasePath(),
-        datasetWriteConfig.getSpillableMapBasePath(), datasetWriteConfig.useFileListingMetadata(),
-        datasetWriteConfig.getFileListingMetadataVerify(), false,
-        datasetWriteConfig.shouldAssumeDatePartitioning());
-    this.metaClient = metadata.getMetaClient();
+  protected void initTableMetadata() {
+    try {
+      if (this.metadata != null) {
+        this.metadata.close();
+      }
+      this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getMetadataConfig(),
+          datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath());
+      this.metaClient = metadata.getMetaClient();
+    } catch (Exception e) {
+      throw new HoodieException("Error initializing metadata table for reads", e);
+    }
   }
 
   protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
@@ -355,9 +357,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");
-
+    // (re) init the metadata for reading.
+    initTableMetadata();
     try {
-      List<HoodieInstant> instantsToSync = metadata.findInstantsToSync(datasetMetaClient);
+      List<HoodieInstant> instantsToSync = metadata.findInstantsToSync();
       if (instantsToSync.isEmpty()) {
         return;
       }
@@ -373,7 +376,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
           commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
         }
       }
-      // re-init the table metadata, for any future writes.
       initTableMetadata();
     } catch (IOException ioe) {
       throw new HoodieIOException("Unable to sync instants from data to metadata table.", ioe);
@@ -450,6 +452,13 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     }
   }
 
+  @Override
+  public void close() throws Exception {
+    if (metadata != null) {
+      metadata.close();
+    }
+  }
+
   /**
    * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
    *
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index fa1f464..02c5b9e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
 /**
  * Interface that supports updating metadata for a given table, as actions complete.
  */
-public interface HoodieTableMetadataWriter extends Serializable {
+public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
 
   void update(HoodieCommitMetadata commitMetadata, String instantTime);
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 10e6ad9..74ffdfc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -49,8 +50,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
@@ -63,7 +64,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -94,23 +94,28 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
 
   protected final HoodieWriteConfig config;
   protected final HoodieTableMetaClient metaClient;
-  protected final transient HoodieEngineContext context;
   protected final HoodieIndex<T, I, K, O> index;
-
   private SerializableConfiguration hadoopConfiguration;
-  private transient FileSystemViewManager viewManager;
-  private HoodieTableMetadata metadata;
-
   protected final TaskContextSupplier taskContextSupplier;
+  private final HoodieTableMetadata metadata;
+
+  private transient FileSystemViewManager viewManager;
+  protected final transient HoodieEngineContext context;
 
   protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     this.config = config;
     this.hadoopConfiguration = context.getHadoopConf();
-    this.viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration,
-        config.getViewStorageConfig());
+    this.context = context;
+
+    // disable reuse of resources, given there is no close() called on the executors ultimately
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps())
+        .enableReuse(false).build();
+    this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(),
+        FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
+
+    this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
     this.metaClient = metaClient;
     this.index = getIndex(config, context);
-    this.context = context;
     this.taskContextSupplier = context.getTaskContextSupplier();
   }
 
@@ -118,7 +123,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
 
   private synchronized FileSystemViewManager getViewManager() {
     if (null == viewManager) {
-      viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration, config.getViewStorageConfig());
+      viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
     }
     return viewManager;
   }
@@ -249,41 +254,28 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
    * Get the view of the file system for this table.
    */
   public TableFileSystemView getFileSystemView() {
-    if (config.useFileListingMetadata()) {
-      return getFileSystemViewInternal(getCompletedCommitsTimeline());
-    } else {
-      return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
-    }
+    return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
   }
 
   /**
    * Get the base file only view of the file system for this table.
    */
   public BaseFileOnlyView getBaseFileOnlyView() {
-    return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
+    return getViewManager().getFileSystemView(metaClient);
   }
 
   /**
    * Get the full view of the file system for this table.
    */
   public SliceView getSliceView() {
-    return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
+    return getViewManager().getFileSystemView(metaClient);
   }
 
   /**
    * Get complete view of the file system for this table with ability to force sync.
    */
   public SyncableFileSystemView getHoodieView() {
-    return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
-  }
-
-  private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
-    if (config.useFileListingMetadata()) {
-      FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
-      return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline, viewConfig.isIncrementalTimelineSyncEnabled());
-    } else {
-      return getViewManager().getFileSystemView(metaClient);
-    }
+    return getViewManager().getFileSystemView(metaClient);
   }
 
   /**
@@ -661,19 +653,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
     return getBaseFileFormat() == HoodieFileFormat.HFILE;
   }
 
-  public HoodieTableMetadata metadata() {
-    if (metadata == null) {
-      HoodieEngineContext engineContext = context;
-      if (engineContext == null) {
-        // This is to handle scenarios where this is called at the executor tasks which do not have access
-        // to engine context, and it ends up being null (as its not serializable and marked transient here).
-        engineContext = new HoodieLocalEngineContext(hadoopConfiguration.get());
-      }
-
-      metadata = HoodieTableMetadata.create(engineContext, config.getBasePath(), config.getSpillableMapBasePath(),
-          config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(),
-          config.shouldAssumeDatePartitioning());
-    }
-    return metadata;
+  public HoodieEngineContext getContext() {
+    // This is to handle scenarios where this is called at the executor tasks which do not have access
+    // to engine context, and it ends up being null (as its not serializable and marked transient here).
+    return context == null ? new HoodieLocalEngineContext(hadoopConfiguration.get()) : context;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 3f4c271..5d4a743 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -56,6 +57,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -198,14 +201,20 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
     // If metadata table is enabled, do not archive instants which are more recent that the latest synced
     // instant on the metadata table. This is required for metadata table sync.
     if (config.useFileListingMetadata()) {
-      Option<String> lastSyncedInstantTime = table.metadata().getSyncedInstantTime();
-      if (lastSyncedInstantTime.isPresent()) {
-        LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
-        instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
-            lastSyncedInstantTime.get()));
-      } else {
-        LOG.info("Not archiving as there is no instants yet on the metadata table");
-        instants = Stream.empty();
+      try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
+          config.getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
+        Option<String> lastSyncedInstantTime = tableMetadata.getSyncedInstantTime();
+
+        if (lastSyncedInstantTime.isPresent()) {
+          LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
+          instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
+              lastSyncedInstantTime.get()));
+        } else {
+          LOG.info("Not archiving as there is no instants yet on the metadata table");
+          instants = Stream.empty();
+        }
+      } catch (Exception e) {
+        throw new HoodieException("Error limiting instant archival based on metadata table", e);
       }
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
index 786bf3e..5b37820 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
@@ -67,7 +67,7 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I,
    */
   HoodieCleanerPlan requestClean(HoodieEngineContext context) {
     try {
-      CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(table, config);
+      CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
       Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
       List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 321f248..80727ff 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -20,6 +20,8 @@ package org.apache.hudi.table.action.clean;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.FileSlice;
@@ -76,8 +78,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
   private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
   private HoodieTable<T, I, K, O> hoodieTable;
   private HoodieWriteConfig config;
+  private transient HoodieEngineContext context;
 
-  public CleanPlanner(HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
+  public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
+    this.context = context;
     this.hoodieTable = hoodieTable;
     this.fileSystemView = hoodieTable.getHoodieView();
     this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
@@ -161,7 +165,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    */
   private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
       Option<HoodieInstant> newInstantToRetain) {
-    LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+    LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
         + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
         + ". New Instant to retain : " + newInstantToRetain);
     return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
@@ -190,9 +194,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * @return all partitions paths for the dataset.
    * @throws IOException
    */
-  private List<String> getPartitionPathsForFullCleaning() throws IOException {
+  private List<String> getPartitionPathsForFullCleaning() {
     // Go to brute force mode of scanning all partitions
-    return hoodieTable.metadata().getAllPartitionPaths();
+    return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index a94a4e6..d3b39f5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -28,12 +28,10 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -63,48 +61,42 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
 
   @Override
   public Option<HoodieClusteringPlan> generateClusteringPlan() {
-    try {
-      HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
-      LOG.info("Scheduling clustering for " + metaClient.getBasePath());
-      HoodieWriteConfig config = getWriteConfig();
-      List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), metaClient.getFs(), metaClient.getBasePath(),
-          config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
-          config.shouldAssumeDatePartitioning());
+    HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
+    LOG.info("Scheduling clustering for " + metaClient.getBasePath());
+    HoodieWriteConfig config = getWriteConfig();
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath());
 
-      // filter the partition paths if needed to reduce list status
-      partitionPaths = filterPartitionPaths(partitionPaths);
+    // filter the partition paths if needed to reduce list status
+    partitionPaths = filterPartitionPaths(partitionPaths);
 
-      if (partitionPaths.isEmpty()) {
-        // In case no partitions could be picked, return no clustering plan
-        return Option.empty();
-      }
-
-      List<HoodieClusteringGroup> clusteringGroups = getEngineContext().flatMap(partitionPaths,
-          partitionPath -> {
-            List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
-            return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
-          },
-          partitionPaths.size())
-          .stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList());
-      
-      if (clusteringGroups.isEmpty()) {
-        LOG.info("No data available to cluster");
-        return Option.empty();
-      }
+    if (partitionPaths.isEmpty()) {
+      // In case no partitions could be picked, return no clustering plan
+      return Option.empty();
+    }
 
-      HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
-          .setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
-          .setStrategyParams(getStrategyParams())
-          .build();
+    List<HoodieClusteringGroup> clusteringGroups = getEngineContext().flatMap(partitionPaths,
+        partitionPath -> {
+          List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
+          return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
+        },
+        partitionPaths.size())
+        .stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList());
 
-      return Option.of(HoodieClusteringPlan.newBuilder()
-          .setStrategy(strategy)
-          .setInputGroups(clusteringGroups)
-          .setExtraMetadata(getExtraMetadata())
-          .setVersion(getPlanVersion())
-          .build());
-    } catch (IOException e) {
-      throw new HoodieIOException("Unable to create clustering plan", e);
+    if (clusteringGroups.isEmpty()) {
+      LOG.info("No data available to cluster");
+      return Option.empty();
     }
+
+    HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
+        .setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
+        .setStrategyParams(getStrategyParams())
+        .build();
+
+    return Option.of(HoodieClusteringPlan.newBuilder()
+        .setStrategy(strategy)
+        .setInputGroups(clusteringGroups)
+        .setExtraMetadata(getExtraMetadata())
+        .setVersion(getPlanVersion())
+        .build());
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 8ddb7e9..bfbe577 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -94,15 +94,10 @@ public class RollbackUtils {
    * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
    */
   public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext,
-      FileSystem fs, String basePath, HoodieWriteConfig config) {
-    try {
-      return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, config.useFileListingMetadata(),
-          config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream()
-          .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
-          .collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new HoodieIOException("Error generating rollback requests", e);
-    }
+                                                                                       String basePath, HoodieWriteConfig config) {
+    return FSUtils.getAllPartitionPaths(engineContext, config.getMetadataConfig(), basePath).stream()
+        .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
+        .collect(Collectors.toList());
   }
 
   /**
@@ -116,8 +111,7 @@ public class RollbackUtils {
   public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
     String commit = instantToRollback.getTimestamp();
     HoodieWriteConfig config = table.getConfig();
-    List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
-        config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
+    List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
     int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
     context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
     return context.flatMap(partitions, partitionPath -> {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 4b222da..de1d973 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -90,10 +90,7 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
           "Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
 
       context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
-      List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
-          table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
-          config.shouldAssumeDatePartitioning()
-      );
+      List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
       Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
         // Scan all partitions files with this commit time
         LOG.info("Collecting latest files in partition path " + partitionPath);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
index 585b41b..47039a3 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java
@@ -65,7 +65,7 @@ public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa
   @Override
   protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
     List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
-        context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
+        context, table.getMetaClient().getBasePath(), config);
     return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 5a61c5c..5db7de2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -93,8 +93,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
         // generate rollback stats
         List<ListingBasedRollbackRequest> rollbackRequests;
         if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
-          rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(),
-              table.getMetaClient().getBasePath(), table.getConfig());
+          rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
         } else {
           rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
         }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 5d72913..6abefbd 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -41,8 +41,10 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -395,7 +397,11 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public void syncTableMetadata() {
     // Open up the metadata table again, for syncing
-    SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
+    try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) {
+      LOG.info("Successfully synced to metadata table");
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Error syncing to metadata table.", e);
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java
index 27a9131..21b9d40 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieGlobalBloomIndex.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 
@@ -36,7 +35,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.Optional;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -62,13 +60,8 @@ public class SparkHoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends
   List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
                                                              final HoodieTable hoodieTable) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    try {
-      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
-          config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
-      return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to load all partitions", e);
-    }
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
+    return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
   }
 
   /**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java
index c58359c..63e13c4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/SparkHoodieGlobalSimpleIndex.java
@@ -31,14 +31,12 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import scala.Tuple2;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -103,14 +101,9 @@ public class SparkHoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends
   protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final HoodieEngineContext context,
                                                                       final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    try {
-      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
-          config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
-      // Obtain the latest data files from all the partitions.
-      return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to load all partitions", e);
-    }
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
+    // Obtain the latest data files from all the partitions.
+    return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);
   }
 
   /**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index acc7546..7c12a9e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -99,8 +99,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
-    metadata.closeReaders();
-
     JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
 
     try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
index 039f628..150d2f4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -26,13 +26,11 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import scala.Tuple2;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,20 +47,13 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor
   @Override
   protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
     Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
-    try {
-      List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
-          table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
-          false);
-      JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-      if (partitionPaths != null && partitionPaths.size() > 0) {
-        context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
-        JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
-        partitionToExistingFileIds = partitionPathRdd.mapToPair(
-            partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
-      }
-    } catch (IOException e) {
-      throw new HoodieCommitException("In InsertOverwriteTable action failed to get existing fileIds of all partition "
-          + config.getBasePath() + " at time " + instantTime, e);
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
+    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+    if (partitionPaths != null && partitionPaths.size() > 0) {
+      context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
+      JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
+      partitionToExistingFileIds = partitionPathRdd.mapToPair(
+          partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
     }
     return partitionToExistingFileIds;
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 4773bf2..17c19ce 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -195,8 +195,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
     // TODO - rollback any compactions in flight
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
-    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
-        config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
 
     // filter the partition paths if needed to reduce list status
     partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
index 11d3baf..611ec21 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java
@@ -67,7 +67,7 @@ public class SparkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayloa
   @Override
   protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
     List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
-        table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
+        table.getMetaClient().getBasePath(), config);
     return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index bb1a9ad..a451956 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -92,8 +92,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
         // generate rollback stats
         List<ListingBasedRollbackRequest> rollbackRequests;
         if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
-          rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(),
-              table.getMetaClient().getBasePath(), table.getConfig());
+          rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
         } else {
           rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
         }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index bc639b2..0599ee8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -101,8 +101,7 @@ public class TestClientRollback extends HoodieClientTestBase {
       assertNoWriteErrors(statuses);
       HoodieWriteConfig config = getConfig();
       List<String> partitionPaths =
-          FSUtils.getAllPartitionPaths(context, fs, cfg.getBasePath(), config.useFileListingMetadata(),
-              config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
+          FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), cfg.getBasePath());
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
       final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 16ee120..027b2b8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -265,7 +265,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   public void testRollbackOperations(HoodieTableType tableType) throws Exception {
-    //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
     init(tableType);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
@@ -511,10 +510,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
       assertTrue(metadata(client).isInSync());
     }
 
-    // Enable metadata table and ensure it is synced
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
       // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
       client.syncTableMetadata();
+    }
+
+    // Enable metadata table and ensure it is synced
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
       client.restoreToInstant(restoreToInstant);
       assertFalse(metadata(client).isInSync());
 
@@ -752,8 +754,16 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   /**
    * Validate the metadata tables contents to ensure it matches what is on the file system.
    */
-  private void validateMetadata(SparkRDDWriteClient client) throws IOException {
-    HoodieWriteConfig config = client.getConfig();
+  private void validateMetadata(SparkRDDWriteClient testClient) throws IOException {
+    HoodieWriteConfig config = testClient.getConfig();
+
+    SparkRDDWriteClient client;
+    if (config.isEmbeddedTimelineServerEnabled()) {
+      testClient.close();
+      client = new SparkRDDWriteClient(testClient.getEngineContext(), testClient.getConfig());
+    } else {
+      client = testClient;
+    }
 
     HoodieTableMetadata tableMetadata = metadata(client);
     assertNotNull(tableMetadata, "MetadataReader should have been initialized");
@@ -869,7 +879,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
     // Metadata table has a fixed number of partitions
     // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
     // in the .hoodie folder.
-    List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
+    List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath),
         false, false, false);
     assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
 
@@ -895,9 +905,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
 
   private HoodieTableMetadata metadata(SparkRDDWriteClient client) {
     HoodieWriteConfig clientConfig = client.getConfig();
-    return HoodieTableMetadata.create(client.getEngineContext(), clientConfig.getBasePath(),
-        clientConfig.getSpillableMapBasePath(), clientConfig.useFileListingMetadata(),
-        clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning());
+    return HoodieTableMetadata.create(client.getEngineContext(), clientConfig.getMetadataConfig(), clientConfig.getBasePath(),
+        clientConfig.getSpillableMapBasePath());
   }
 
   // TODO: this can be moved to TestHarness after merge from master
@@ -915,16 +924,20 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
-        .withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
+        .withAutoCommit(autoCommit)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
             .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
-        .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
+        .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
             .withEnableBackupForRemoteFileSystemView(false).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(useFileListingMetadata)
+            .enableReuse(false)
+            .enableMetrics(enableMetrics)
+            .enableFallback(false).build())
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
             .withExecutorMetrics(true).usePrefix("unit-test").build());
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 3a5d737..4fff08a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -30,6 +30,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BootstrapFileMapping;
@@ -524,7 +525,8 @@ public class TestCleaner extends HoodieClientTestBase {
   @ValueSource(booleans = {false, true})
   public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception {
     HoodieWriteConfig config =
-        HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+        HoodieWriteConfig.newBuilder().withPath(basePath)
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
@@ -615,7 +617,8 @@ public class TestCleaner extends HoodieClientTestBase {
   public void testKeepLatestFileVersionsMOR() throws Exception {
 
     HoodieWriteConfig config =
-        HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+        HoodieWriteConfig.newBuilder().withPath(basePath)
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
@@ -652,7 +655,8 @@ public class TestCleaner extends HoodieClientTestBase {
   public void testKeepLatestCommitsMOR() throws Exception {
 
     HoodieWriteConfig config =
-            HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+            HoodieWriteConfig.newBuilder().withPath(basePath)
+                .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
                     .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
                     .build();
@@ -691,7 +695,8 @@ public class TestCleaner extends HoodieClientTestBase {
   
   @Test
   public void testCleanWithReplaceCommits() throws Exception {
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
@@ -971,7 +976,8 @@ public class TestCleaner extends HoodieClientTestBase {
   @ParameterizedTest
   @MethodSource("argumentsForTestKeepLatestCommits")
   public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withIncrementalCleaningMode(enableIncrementalClean)
             .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
@@ -1146,7 +1152,8 @@ public class TestCleaner extends HoodieClientTestBase {
    */
   @Test
   public void testCleaningWithZeroPartitionPaths() throws Exception {
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
@@ -1167,7 +1174,8 @@ public class TestCleaner extends HoodieClientTestBase {
    */
   @Test
   public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
@@ -1190,7 +1198,8 @@ public class TestCleaner extends HoodieClientTestBase {
   @ValueSource(booleans = {false, true})
   public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws Exception {
     HoodieWriteConfig config =
-        HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+        HoodieWriteConfig.newBuilder().withPath(basePath)
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
             .build();
@@ -1213,7 +1222,8 @@ public class TestCleaner extends HoodieClientTestBase {
   public void testCleanPreviousCorruptedCleanFiles() throws IOException {
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder()
-            .withPath(basePath).withAssumeDatePartitioning(true)
+            .withPath(basePath)
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index bb907be..a6ac276 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -887,7 +887,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff() {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .withDeleteParallelism(2)
-        .withAutoCommit(false).withAssumeDatePartitioning(true)
+        .withAutoCommit(false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withEmbeddedTimelineServerEnabled(true)
@@ -1564,7 +1564,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
                                                        long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .withDeleteParallelism(2)
-        .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
+        .withAutoCommit(autoCommit)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 094c0b3..0f6a150 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -70,7 +71,8 @@ public class CompactionTestBase extends HoodieClientTestBase {
     return HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
-        .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
+        .withAutoCommit(autoCommit)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder()
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index e95712b..1ead9c8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -41,6 +41,10 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
   public static final boolean DEFAULT_METADATA_VALIDATE = false;
   public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false;
 
+  // Enable metrics for internal Metadata Table
+  public static final String METADATA_METRICS_ENABLE_PROP = METADATA_PREFIX + ".metrics.enable";
+  public static final boolean DEFAULT_METADATA_METRICS_ENABLE = false;
+
   // Parallelism for inserts
   public static final String METADATA_INSERT_PARALLELISM_PROP = METADATA_PREFIX + ".insert.parallelism";
   public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 1;
@@ -63,6 +67,20 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
   public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
   public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
 
+  // Controls whether or no the base file open/log merges are reused per API call
+  public static final String ENABLE_REUSE_PROP = METADATA_PREFIX + ".reuse.enable";
+  public static final String DEFAULT_ENABLE_REUSE = "true";
+
+  // Controls whether or not, upon failure to fetch from metadata table, should fallback to listing.
+  public static final String ENABLE_FALLBACK_PROP = METADATA_PREFIX + ".fallback.enable";
+  public static final String DEFAULT_ENABLE_FALLBACK = "true";
+
+  public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
+  public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
+
+  public static final String FILE_LISTING_PARALLELISM_PROP = "hoodie.file.listing.parallelism";
+  public static final int DEFAULT_FILE_LISTING_PARALLELISM = 1500;
+
   private HoodieMetadataConfig(Properties props) {
     super(props);
   }
@@ -71,6 +89,34 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
     return new Builder();
   }
 
+  public int getFileListingParallelism() {
+    return Math.max(Integer.parseInt(props.getProperty(HoodieMetadataConfig.FILE_LISTING_PARALLELISM_PROP)), 1);
+  }
+
+  public Boolean shouldAssumeDatePartitioning() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.HOODIE_ASSUME_DATE_PARTITIONING_PROP));
+  }
+
+  public boolean useFileListingMetadata() {
+    return Boolean.parseBoolean(props.getProperty(METADATA_ENABLE_PROP));
+  }
+
+  public boolean enableReuse() {
+    return Boolean.parseBoolean(props.getProperty(ENABLE_REUSE_PROP));
+  }
+
+  public boolean enableFallback() {
+    return Boolean.parseBoolean(props.getProperty(ENABLE_FALLBACK_PROP));
+  }
+
+  public boolean validateFileListingMetadata() {
+    return Boolean.parseBoolean(props.getProperty(METADATA_VALIDATE_PROP));
+  }
+
+  public boolean enableMetrics() {
+    return Boolean.parseBoolean(props.getProperty(METADATA_METRICS_ENABLE_PROP));
+  }
+
   public static class Builder {
 
     private final Properties props = new Properties();
@@ -92,6 +138,21 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder enableMetrics(boolean enableMetrics) {
+      props.setProperty(METADATA_METRICS_ENABLE_PROP, String.valueOf(enableMetrics));
+      return this;
+    }
+
+    public Builder enableReuse(boolean reuse) {
+      props.setProperty(ENABLE_REUSE_PROP, String.valueOf(reuse));
+      return this;
+    }
+
+    public Builder enableFallback(boolean fallback) {
+      props.setProperty(ENABLE_FALLBACK_PROP, String.valueOf(fallback));
+      return this;
+    }
+
     public Builder validate(boolean validate) {
       props.setProperty(METADATA_VALIDATE_PROP, String.valueOf(validate));
       return this;
@@ -123,10 +184,22 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withFileListingParallelism(int parallelism) {
+      props.setProperty(FILE_LISTING_PARALLELISM_PROP, String.valueOf(parallelism));
+      return this;
+    }
+
+    public Builder withAssumeDatePartitioning(boolean assumeDatePartitioning) {
+      props.setProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP, String.valueOf(assumeDatePartitioning));
+      return this;
+    }
+
     public HoodieMetadataConfig build() {
       HoodieMetadataConfig config = new HoodieMetadataConfig(props);
       setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP,
           String.valueOf(DEFAULT_METADATA_ENABLE));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_METRICS_ENABLE_PROP), METADATA_METRICS_ENABLE_PROP,
+          String.valueOf(DEFAULT_METADATA_METRICS_ENABLE));
       setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE_PROP), METADATA_VALIDATE_PROP,
           String.valueOf(DEFAULT_METADATA_VALIDATE));
       setDefaultOnCondition(props, !props.containsKey(METADATA_INSERT_PARALLELISM_PROP), METADATA_INSERT_PARALLELISM_PROP,
@@ -141,7 +214,14 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
           String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP));
       setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP,
           String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP));
-
+      setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM_PROP), FILE_LISTING_PARALLELISM_PROP,
+          String.valueOf(DEFAULT_FILE_LISTING_PARALLELISM));
+      setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
+          HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
+      setDefaultOnCondition(props, !props.containsKey(ENABLE_FALLBACK_PROP), ENABLE_FALLBACK_PROP,
+          DEFAULT_ENABLE_FALLBACK);
+      setDefaultOnCondition(props, !props.containsKey(ENABLE_REUSE_PROP), ENABLE_REUSE_PROP,
+          DEFAULT_ENABLE_REUSE);
       return config;
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index fb36bfe..a2b3889 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -34,6 +35,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -252,15 +254,29 @@ public class FSUtils {
     }
   }
 
-  public static List<String> getAllPartitionPaths(HoodieEngineContext engineContext, FileSystem fs, String basePathStr,
+  public static List<String> getAllPartitionPaths(HoodieEngineContext engineContext, String basePathStr,
                                                   boolean useFileListingFromMetadata, boolean verifyListings,
-                                                  boolean assumeDatePartitioning) throws IOException {
-    if (assumeDatePartitioning) {
-      return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
-    } else {
-      HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, basePathStr, "/tmp/",
-          useFileListingFromMetadata, verifyListings, false, false);
+                                                  boolean assumeDatePartitioning) {
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(useFileListingFromMetadata)
+        .validate(verifyListings)
+        .withAssumeDatePartitioning(assumeDatePartitioning)
+        .build();
+    try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
+        FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
       return tableMetadata.getAllPartitionPaths();
+    } catch (Exception e) {
+      throw new HoodieException("Error fetching partition paths from metadata table", e);
+    }
+  }
+
+  public static List<String> getAllPartitionPaths(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
+                                                  String basePathStr) {
+    try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
+        FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
+      return tableMetadata.getAllPartitionPaths();
+    } catch (Exception e) {
+      throw new HoodieException("Error fetching partition paths from metadata table", e);
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableSupplier.java
similarity index 82%
copy from hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java
copy to hudi-common/src/main/java/org/apache/hudi/common/function/SerializableSupplier.java
index 12b42de..0500955 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableSupplier.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.common.metrics;
+package org.apache.hudi.common.function;
 
-/**
- * Interface for Hudi Metric Types.
- */
-public interface Metric {
-  Long getValue();
-}
\ No newline at end of file
+import java.io.Serializable;
+
+@FunctionalInterface
+public interface SerializableSupplier<T> extends Serializable {
+  T get();
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java
index 12b42de..79a7764 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Metric.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
+
 /**
  * Interface for Hudi Metric Types.
  */
-public interface Metric {
+public interface Metric extends Serializable {
   Long getValue();
 }
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e99859e..177be8b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -153,6 +153,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
     return totalTimeTakenToReadAndMergeBlocks;
   }
 
+  public void close() {
+    if (records != null) {
+      records.close();
+    }
+  }
+
   /**
    * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 6901bc5..6f0e7d5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -18,12 +18,16 @@
 
 package org.apache.hudi.common.table.view;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableSupplier;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Functions.Function2;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -62,9 +66,9 @@ public class FileSystemViewManager {
   // Factory Map to create file-system views
   private final Function2<HoodieTableMetaClient, FileSystemViewStorageConfig, SyncableFileSystemView> viewCreator;
 
-  public FileSystemViewManager(SerializableConfiguration conf, FileSystemViewStorageConfig viewStorageConfig,
+  private FileSystemViewManager(HoodieEngineContext context, FileSystemViewStorageConfig viewStorageConfig,
       Function2<HoodieTableMetaClient, FileSystemViewStorageConfig, SyncableFileSystemView> viewCreator) {
-    this.conf = new SerializableConfiguration(conf);
+    this.conf = context.getHadoopConf();
     this.viewStorageConfig = viewStorageConfig;
     this.globalViewMap = new ConcurrentHashMap<>();
     this.viewCreator = viewCreator;
@@ -110,8 +114,10 @@ public class FileSystemViewManager {
    * Closes all views opened.
    */
   public void close() {
-    this.globalViewMap.values().forEach(SyncableFileSystemView::close);
-    this.globalViewMap.clear();
+    if (!this.globalViewMap.isEmpty()) {
+      this.globalViewMap.values().forEach(SyncableFileSystemView::close);
+      this.globalViewMap.clear();
+    }
   }
 
   // FACTORY METHODS FOR CREATING FILE-SYSTEM VIEWS
@@ -147,34 +153,32 @@ public class FileSystemViewManager {
 
   /**
    * Create an in-memory file System view for a table.
-   * 
-   * @param conf Hadoop Configuration
-   * @param viewConf View Storage Configuration
-   * @param metaClient HoodieTableMetaClient
-   * @return
+   *
    */
-  private static HoodieTableFileSystemView createInMemoryFileSystemView(SerializableConfiguration conf,
-      FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
+  private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig viewConf,
+                                                                        HoodieTableMetaClient metaClient, SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
     LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
     HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+    if (metadataConfig.useFileListingMetadata()) {
+      ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
+      return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+          metadataSupplier.get());
+    }
     return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
   }
 
-  public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieEngineContext engineContext,
-      HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean verifyListings) {
+  public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient,
+                                                                       HoodieMetadataConfig metadataConfig) {
     LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
-    if (useFileListingFromMetadata) {
+    if (metadataConfig.useFileListingMetadata()) {
       return new HoodieMetadataFileSystemView(engineContext, metaClient,
           metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
-          true,
-          verifyListings);
+          metadataConfig);
     }
-
     return new HoodieTableFileSystemView(metaClient,
         metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
   }
 
-
   /**
    * Create a remote file System view for a table.
    * 
@@ -192,42 +196,56 @@ public class FileSystemViewManager {
         metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
   }
 
+  public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
+                                                        final HoodieMetadataConfig metadataConfig,
+                                                        final FileSystemViewStorageConfig config) {
+    return createViewManager(context, metadataConfig, config, (SerializableSupplier<HoodieTableMetadata>) null);
+  }
+
+  public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
+                                                        final HoodieMetadataConfig metadataConfig,
+                                                        final FileSystemViewStorageConfig config,
+                                                        final String basePath) {
+    return createViewManager(context, metadataConfig, config,
+        () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir()));
+  }
+
   /**
    * Main Factory method for building file-system views.
-   * 
-   * @param conf Hadoop Configuration
-   * @param config View Storage Configuration
-   * @return
+   *
    */
-  public static FileSystemViewManager createViewManager(final SerializableConfiguration conf,
-      final FileSystemViewStorageConfig config) {
+  public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
+                                                        final HoodieMetadataConfig metadataConfig,
+                                                        final FileSystemViewStorageConfig config,
+                                                        final SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
     LOG.info("Creating View Manager with storage type :" + config.getStorageType());
+    final SerializableConfiguration conf = context.getHadoopConf();
     switch (config.getStorageType()) {
       case EMBEDDED_KV_STORE:
         LOG.info("Creating embedded rocks-db based Table View");
-        return new FileSystemViewManager(conf, config,
+        return new FileSystemViewManager(context, config,
             (metaClient, viewConf) -> createRocksDBBasedFileSystemView(conf, viewConf, metaClient));
       case SPILLABLE_DISK:
         LOG.info("Creating Spillable Disk based Table View");
-        return new FileSystemViewManager(conf, config,
+        return new FileSystemViewManager(context, config,
             (metaClient, viewConf) -> createSpillableMapBasedFileSystemView(conf, viewConf, metaClient));
       case MEMORY:
         LOG.info("Creating in-memory based Table View");
-        return new FileSystemViewManager(conf, config,
-            (metaClient, viewConfig) -> createInMemoryFileSystemView(conf, viewConfig, metaClient));
+        return new FileSystemViewManager(context, config,
+            (metaClient, viewConfig) -> createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, metadataSupplier));
       case REMOTE_ONLY:
         LOG.info("Creating remote only table view");
-        return new FileSystemViewManager(conf, config, (metaClient, viewConfig) -> createRemoteFileSystemView(conf,
+        return new FileSystemViewManager(context, config, (metaClient, viewConfig) -> createRemoteFileSystemView(conf,
             viewConfig, metaClient));
       case REMOTE_FIRST:
         LOG.info("Creating remote first table view");
-        return new FileSystemViewManager(conf, config, (metaClient, viewConfig) -> {
+        return new FileSystemViewManager(context, config, (metaClient, viewConfig) -> {
           RemoteHoodieTableFileSystemView remoteFileSystemView =
               createRemoteFileSystemView(conf, viewConfig, metaClient);
           SyncableFileSystemView secondaryView;
           switch (viewConfig.getSecondaryStorageType()) {
             case MEMORY:
-              secondaryView = createInMemoryFileSystemView(conf, viewConfig, metaClient);
+              secondaryView = createInMemoryFileSystemView(metadataConfig, viewConfig, metaClient, metadataSupplier);
               break;
             case EMBEDDED_KV_STORE:
               secondaryView = createRocksDBBasedFileSystemView(conf, viewConfig, metaClient);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index ff3a78f..f967d43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -33,39 +33,49 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
 
   // Property Names
   public static final String FILESYSTEM_VIEW_STORAGE_TYPE = "hoodie.filesystem.view.type";
+  public static final FileSystemViewStorageType DEFAULT_VIEW_STORAGE_TYPE = FileSystemViewStorageType.MEMORY;
+
   public static final String FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE = "hoodie.filesystem.view.incr.timeline.sync.enable";
+  public static final String DEFAULT_FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE = "false";
+
   public static final String FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE = "hoodie.filesystem.view.secondary.type";
+  public static final FileSystemViewStorageType DEFAULT_SECONDARY_VIEW_STORAGE_TYPE = FileSystemViewStorageType.MEMORY;
+
   public static final String FILESYSTEM_VIEW_REMOTE_HOST = "hoodie.filesystem.view.remote.host";
+  public static final String DEFUALT_REMOTE_VIEW_SERVER_HOST = "localhost";
+
   public static final String FILESYSTEM_VIEW_REMOTE_PORT = "hoodie.filesystem.view.remote.port";
+  public static final Integer DEFAULT_REMOTE_VIEW_SERVER_PORT = 26754;
+
   public static final String FILESYSTEM_VIEW_SPILLABLE_DIR = "hoodie.filesystem.view.spillable.dir";
+  public static final String DEFAULT_VIEW_SPILLABLE_DIR = "/tmp/view_map/";
+
   public static final String FILESYSTEM_VIEW_SPILLABLE_MEM = "hoodie.filesystem.view.spillable.mem";
+  private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
+
   public static final String FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION =
       "hoodie.filesystem.view.spillable.compaction.mem.fraction";
+  private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
+
   public static final String FILESYSTEM_VIEW_BOOTSTRAP_BASE_FILE_FRACTION =
       "hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction";
+
   public static final String FILESYSTEM_VIEW_REPLACED_MEM_FRACTION =
       "hoodie.filesystem.view.spillable.replaced.mem.fraction";
+  private static final Double DEFAULT_MEM_FRACTION_FOR_REPLACED_FILEGROUPS = 0.01;
+
   public static final String FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION =
       "hoodie.filesystem.view.spillable.clustering.mem.fraction";
-  private static final String ROCKSDB_BASE_PATH_PROP = "hoodie.filesystem.view.rocksdb.base.path";
-  public static final String FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS =
-      "hoodie.filesystem.view.remote.timeout.secs";
-
+  private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_CLUSTERING_FILEGROUPS = 0.01;
 
-  public static final FileSystemViewStorageType DEFAULT_VIEW_STORAGE_TYPE = FileSystemViewStorageType.MEMORY;
-  public static final FileSystemViewStorageType DEFAULT_SECONDARY_VIEW_STORAGE_TYPE = FileSystemViewStorageType.MEMORY;
+  private static final String ROCKSDB_BASE_PATH_PROP = "hoodie.filesystem.view.rocksdb.base.path";
   public static final String DEFAULT_ROCKSDB_BASE_PATH = "/tmp/hoodie_timeline_rocksdb";
 
-  public static final String DEFAULT_FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE = "false";
-  public static final String DEFUALT_REMOTE_VIEW_SERVER_HOST = "localhost";
-  public static final Integer DEFAULT_REMOTE_VIEW_SERVER_PORT = 26754;
+  public static final String FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS =
+      "hoodie.filesystem.view.remote.timeout.secs";
   public static final Integer DEFAULT_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS = 5 * 60; // 5 min
-  public static final String DEFAULT_VIEW_SPILLABLE_DIR = "/tmp/view_map/";
-  private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
+
   private static final Double DEFAULT_MEM_FRACTION_FOR_EXTERNAL_DATA_FILE = 0.05;
-  private static final Double DEFAULT_MEM_FRACTION_FOR_REPLACED_FILEGROUPS = 0.01;
-  private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_CLUSTERING_FILEGROUPS = 0.01;
-  private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
 
   /**
    * Configs to control whether backup needs to be configured if clients were not able to reach
@@ -135,7 +145,7 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
         .longValue();
   }
 
-  public String getBaseStoreDir() {
+  public String getSpillableDir() {
     return props.getProperty(FILESYSTEM_VIEW_SPILLABLE_DIR);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
index d725169..1dafe33 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
@@ -62,7 +62,7 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
     this.maxMemoryForBootstrapBaseFile = config.getMaxMemoryForBootstrapBaseFile();
     this.maxMemoryForReplaceFileGroups = config.getMaxMemoryForReplacedFileGroups();
     this.maxMemoryForClusteringFileGroups = config.getMaxMemoryForPendingClusteringFileGroups();
-    this.baseStoreDir = config.getBaseStoreDir();
+    this.baseStoreDir = config.getSpillableDir();
     init(metaClient, visibleActiveTimeline);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index de0a3c4..c86b37e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -19,29 +19,29 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.avro.Schema;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieMetadataException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -57,33 +57,33 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
   protected final transient HoodieEngineContext engineContext;
   protected final SerializableConfiguration hadoopConf;
   protected final String datasetBasePath;
-  protected boolean enabled;
+  protected final HoodieTableMetaClient datasetMetaClient;
   protected final Option<HoodieMetadataMetrics> metrics;
-
-  private final boolean validateLookups;
-  private final boolean assumeDatePartitioning;
-
+  protected final HoodieMetadataConfig metadataConfig;
   // Directory used for Spillable Map when merging records
   protected final String spillableMapDirectory;
-  private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner;
 
-  protected BaseTableMetadata(HoodieEngineContext engineContext, String datasetBasePath, String spillableMapDirectory,
-                              boolean enabled, boolean validateLookups, boolean enableMetrics,
-                              boolean assumeDatePartitioning) {
+  protected boolean enabled;
+  private TimelineMergedTableMetadata timelineMergedMetadata;
+
+  protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
+                              String datasetBasePath, String spillableMapDirectory) {
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf());
     this.datasetBasePath = datasetBasePath;
+    this.datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
     this.spillableMapDirectory = spillableMapDirectory;
+    this.metadataConfig = metadataConfig;
 
-    this.enabled = enabled;
-    this.validateLookups = validateLookups;
-    this.assumeDatePartitioning = assumeDatePartitioning;
-
-    if (enableMetrics) {
+    this.enabled = metadataConfig.useFileListingMetadata();
+    if (metadataConfig.enableMetrics()) {
       this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
     } else {
       this.metrics = Option.empty();
     }
+    if (enabled) {
+      openTimelineScanner();
+    }
   }
 
   /**
@@ -101,11 +101,15 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       try {
         return fetchAllPartitionPaths();
       } catch (Exception e) {
-        LOG.error("Failed to retrieve list of partition from metadata", e);
+        if (metadataConfig.enableFallback()) {
+          LOG.error("Failed to retrieve list of partition from metadata", e);
+        } else {
+          throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e);
+        }
       }
     }
-    return new FileSystemBackedTableMetadata(engineContext, hadoopConf, datasetBasePath,
-        assumeDatePartitioning).getAllPartitionPaths();
+    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath,
+        metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
   }
 
   /**
@@ -125,11 +129,16 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       try {
         return fetchAllFilesInPartition(partitionPath);
       } catch (Exception e) {
-        LOG.error("Failed to retrieve files in partition " + partitionPath + " from metadata", e);
+        if (metadataConfig.enableFallback()) {
+          LOG.error("Failed to retrieve files in partition " + partitionPath + " from metadata", e);
+        } else {
+          throw new HoodieMetadataException("Failed to retrieve files in partition " + partitionPath + " from metadata", e);
+        }
       }
     }
 
-    return FSUtils.getFs(partitionPath.toString(), hadoopConf.get()).listStatus(partitionPath);
+    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning())
+        .getAllFilesInPartition(partitionPath);
   }
 
   /**
@@ -155,11 +164,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       }
     }
 
-    if (validateLookups) {
+    if (metadataConfig.validateFileListingMetadata()) {
       // Validate the Metadata Table data by listing the partitions from the file system
       timer.startTimer();
-      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext,
-          hadoopConf, datasetBasePath, assumeDatePartitioning);
+      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(getEngineContext(),
+          hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
       List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
       metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer()));
 
@@ -205,19 +214,29 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
     }
 
-    if (validateLookups) {
+    if (metadataConfig.validateFileListingMetadata()) {
       // Validate the Metadata Table data by listing the partitions from the file system
       timer.startTimer();
 
-      // Ignore partition metadata file
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-      FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath,
-          p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
-
-      List<String> directFilenames = Arrays.stream(directStatuses)
-          .map(s -> s.getPath().getName()).sorted()
+      String partitionPathStr = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), partitionPath);
+      String latestDataInstantTime = getLatestDatasetInstantTime();
+      HoodieTableFileSystemView dataFsView = new HoodieTableFileSystemView(datasetMetaClient, datasetMetaClient.getActiveTimeline());
+      List<FileStatus> directStatuses = dataFsView.getAllFileSlices(partitionPathStr).flatMap(slice -> {
+        List<FileStatus> paths = new ArrayList<>();
+        slice.getBaseFile().ifPresent(baseFile -> {
+          if (HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, latestDataInstantTime)) {
+            paths.add(baseFile.getFileStatus());
+          }
+        });
+        //TODO(metadata): this will remain problematic; no way to know the commit time based on log file written
+        slice.getLogFiles().forEach(logFile -> paths.add(logFile.getFileStatus()));
+        return paths.stream();
+      }).collect(Collectors.toList());
+
+      List<String> directFilenames = directStatuses.stream()
+          .map(fileStatus -> fileStatus.getPath().getName()).sorted()
           .collect(Collectors.toList());
+      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
 
       List<String> metadataFilenames = Arrays.stream(statuses)
           .map(s -> s.getPath().getName()).sorted()
@@ -232,7 +251,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
       }
 
       // Return the direct listing as it should be correct
-      statuses = directStatuses;
+      statuses = directStatuses.toArray(new FileStatus[0]);
     }
 
     LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length);
@@ -244,14 +263,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
    *
    * @param key The key of the record
    */
-  private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) throws IOException {
-
+  private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) {
     Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord;
-    openTimelineScanner();
-
     Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord = getRecordByKeyFromMetadata(key);
     // Retrieve record from unsynced timeline instants
-    Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = timelineRecordScanner.getRecordByKey(key);
+    Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = timelineMergedMetadata.getRecordByKey(key);
     if (timelineHoodieRecord.isPresent()) {
       if (metadataHoodieRecord.isPresent()) {
         HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
@@ -265,37 +281,28 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
     return mergedRecord;
   }
 
-  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException;
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key);
 
-  private void openTimelineScanner() throws IOException {
-    if (timelineRecordScanner != null) {
-      // Already opened
-      return;
+  private void openTimelineScanner() {
+    if (timelineMergedMetadata == null) {
+      List<HoodieInstant> unSyncedInstants = findInstantsToSync();
+      timelineMergedMetadata =
+          new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), null);
     }
-
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    List<HoodieInstant> unSyncedInstants = findInstantsToSync(datasetMetaClient);
-    Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-    timelineRecordScanner =
-        new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null);
-  }
-
-  protected List<HoodieInstant> findInstantsToSync() {
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    return findInstantsToSync(datasetMetaClient);
   }
 
-  protected abstract List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient);
+  protected abstract List<HoodieInstant> findInstantsToSync();
 
   public boolean isInSync() {
     return enabled && findInstantsToSync().isEmpty();
   }
 
-  protected void closeReaders() {
-    timelineRecordScanner = null;
+  protected HoodieEngineContext getEngineContext() {
+    return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());
   }
 
-  protected HoodieEngineContext getEngineContext() {
-    return engineContext;
+  protected String getLatestDatasetInstantTime() {
+    return datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
+        .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 326d6fd..31c74a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -114,4 +114,9 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
   public boolean isInSync() {
     return true;
   }
+
+  @Override
+  public void close() throws Exception {
+    // no-op
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index b90266e..a34652c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hudi.metadata;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -32,12 +33,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -50,6 +52,8 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,32 +69,41 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
   private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
 
-  private final String metadataBasePath;
+  private String metadataBasePath;
+  // Metadata table's timeline and metaclient
   private HoodieTableMetaClient metaClient;
+  private List<FileSlice> latestFileSystemMetadataSlices;
 
   // Readers for the base and log file which store the metadata
   private transient HoodieFileReader<GenericRecord> baseFileReader;
   private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
 
-  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled,
-                                   boolean validateLookups, boolean assumeDatePartitioning) {
-    this(new HoodieLocalEngineContext(conf), datasetBasePath, spillableMapDirectory, enabled, validateLookups,
-        false, assumeDatePartitioning);
+  public HoodieBackedTableMetadata(Configuration conf, HoodieMetadataConfig metadataConfig,
+                                   String datasetBasePath, String spillableMapDirectory) {
+    this(new HoodieLocalEngineContext(conf), metadataConfig, datasetBasePath, spillableMapDirectory);
   }
 
-  public HoodieBackedTableMetadata(HoodieEngineContext engineContext, String datasetBasePath, String spillableMapDirectory,
-                                   boolean enabled, boolean validateLookups, boolean enableMetrics, boolean assumeDatePartitioning) {
-    super(engineContext, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning);
-    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
-    if (enabled) {
+  public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
+                                   String datasetBasePath, String spillableMapDirectory) {
+    super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory);
+    initIfNeeded();
+  }
+
+  private void initIfNeeded() {
+    if (enabled && this.metaClient == null) {
+      this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
       try {
         this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
+        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+        latestFileSystemMetadataSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
       } catch (TableNotFoundException e) {
         LOG.warn("Metadata table was not found at path " + metadataBasePath);
         this.enabled = false;
+        this.metaClient = null;
       } catch (Exception e) {
         LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e);
         this.enabled = false;
+        this.metaClient = null;
       }
     } else {
       LOG.info("Metadata table is disabled.");
@@ -98,60 +111,67 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   }
 
   @Override
-  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException {
-    openBaseAndLogFiles();
-
-    // Retrieve record from base file
-    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-    if (baseFileReader != null) {
+  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) {
+    try {
+      List<Long> timings = new ArrayList<>();
       HoodieTimer timer = new HoodieTimer().startTimer();
-      Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
-      if (baseRecord.isPresent()) {
-        hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
-            metaClient.getTableConfig().getPayloadClass());
-        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+      openFileSliceIfNeeded();
+      timings.add(timer.endTimer());
+
+      timer.startTimer();
+      // Retrieve record from base file
+      HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+      if (baseFileReader != null) {
+        HoodieTimer readTimer = new HoodieTimer().startTimer();
+        Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
+        if (baseRecord.isPresent()) {
+          hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+              metaClient.getTableConfig().getPayloadClass());
+          metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
+        }
       }
-    }
-
-    // Retrieve record from log file
-    Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
-    if (logHoodieRecord.isPresent()) {
-      if (hoodieRecord != null) {
-        // Merge the payloads
-        HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
-        hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
-      } else {
-        hoodieRecord = logHoodieRecord.get();
+      timings.add(timer.endTimer());
+
+      // Retrieve record from log file
+      timer.startTimer();
+      if (logRecordScanner != null) {
+        Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
+        if (logHoodieRecord.isPresent()) {
+          if (hoodieRecord != null) {
+            // Merge the payloads
+            HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
+            hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
+          } else {
+            hoodieRecord = logHoodieRecord.get();
+          }
+        }
       }
+      timings.add(timer.endTimer());
+      LOG.info(String.format("Metadata read for key %s took [open, baseFileRead, logMerge] %s ms", key, timings));
+      return Option.ofNullable(hoodieRecord);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe);
+    } finally {
+      closeIfNeeded();
     }
-
-    return Option.ofNullable(hoodieRecord);
   }
 
   /**
    * Open readers to the base and log files.
    */
-  protected synchronized void openBaseAndLogFiles() throws IOException {
-    if (logRecordScanner != null) {
-      // Already opened
+  private synchronized void openFileSliceIfNeeded() throws IOException {
+    if (metadataConfig.enableReuse() && baseFileReader != null) {
+      // we will reuse what's open.
       return;
     }
 
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
     // Metadata is in sync till the latest completed instant on the dataset
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    String latestInstantTime = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
-        .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-    // Find the latest file slice
-    HoodieTimeline timeline = metaClient.reloadActiveTimeline();
-    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
-    List<FileSlice> latestSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
-    ValidationUtils.checkArgument(latestSlices.size() == 1);
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    String latestInstantTime = getLatestDatasetInstantTime();
+    ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one validata metadata file slice");
 
     // If the base file is present then create a reader
-    Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile();
+    Option<HoodieBaseFile> basefile = latestFileSystemMetadataSlices.get(0).getBaseFile();
     if (basefile.isPresent()) {
       String basefilePath = basefile.get().getPath();
       baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
@@ -159,18 +179,16 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     }
 
     // Open the log record scanner using the log files from the latest file slice
-    List<String> logFilePaths = latestSlices.get(0).getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+    List<String> logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles()
+        .sorted(HoodieLogFile.getLogFileComparator())
         .map(o -> o.getPath().toString())
         .collect(Collectors.toList());
-
-    Option<HoodieInstant> lastInstant = timeline.filterCompletedInstants().lastInstant();
+    Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
     String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
 
     // Load the schema
     Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-
-    logRecordScanner =
-        new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
+    logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
             logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
             spillableMapDirectory, null);
 
@@ -180,27 +198,42 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
   }
 
-  public void closeReaders() {
+  private void closeIfNeeded() {
+    try {
+      if (!metadataConfig.enableReuse()) {
+        close();
+      }
+    } catch (Exception e) {
+      throw new HoodieException("Error closing resources during metadata table merge", e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
     if (baseFileReader != null) {
       baseFileReader.close();
       baseFileReader = null;
     }
-    logRecordScanner = null;
+    if (logRecordScanner != null) {
+      logRecordScanner.close();
+      logRecordScanner = null;
+    }
   }
 
   /**
    * Return an ordered list of instants which have not been synced to the Metadata Table.
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
    */
-  protected List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient) {
-    HoodieActiveTimeline metaTimeline = metaClient.reloadActiveTimeline();
+  protected List<HoodieInstant> findInstantsToSync() {
+    initIfNeeded();
+
+    // if there are no instants yet, return empty list, since there is nothing to sync here.
+    if (!enabled || !metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) {
+      return Collections.EMPTY_LIST;
+    }
 
     // All instants on the data timeline, which are greater than the last instant on metadata timeline
     // are candidates for sync.
-    Option<HoodieInstant> latestMetadataInstant = metaTimeline.filterCompletedInstants().lastInstant();
-    ValidationUtils.checkArgument(latestMetadataInstant.isPresent(),
-        "At least one completed instant should exist on the metadata table, before syncing.");
-    String latestMetadataInstantTime = latestMetadataInstant.get().getTimestamp();
+    String latestMetadataInstantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
     HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE);
     Option<HoodieInstant> earliestIncompleteInstant = candidateTimeline.filterInflightsAndRequested().firstInstant();
 
@@ -236,10 +269,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return hadoopConf;
   }
 
-  public String getDatasetBasePath() {
-    return datasetBasePath;
-  }
-
   public HoodieTableMetaClient getMetaClient() {
     return metaClient;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index a440419..36c9cef 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieException;
 
 /**
  * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
@@ -36,21 +38,20 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
 
   private final HoodieTableMetadata tableMetadata;
 
-  public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata,
-                                      HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) {
-    super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync);
+  public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient,
+                                      HoodieTimeline visibleActiveTimeline,
+                                      HoodieTableMetadata tableMetadata) {
+    super(metaClient, visibleActiveTimeline);
     this.tableMetadata = tableMetadata;
   }
 
   public HoodieMetadataFileSystemView(HoodieEngineContext engineContext,
                                       HoodieTableMetaClient metaClient,
                                       HoodieTimeline visibleActiveTimeline,
-                                      boolean useFileListingFromMetadata,
-                                      boolean verifyListings) {
+                                      HoodieMetadataConfig metadataConfig) {
     super(metaClient, visibleActiveTimeline);
-    this.tableMetadata = HoodieTableMetadata.create(engineContext, metaClient.getBasePath(),
-        FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR, useFileListingFromMetadata, verifyListings,
-        false, false);
+    this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(),
+        FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
   }
 
   /**
@@ -63,4 +64,13 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
   protected FileStatus[] listPartition(Path partitionPath) throws IOException {
     return tableMetadata.getAllFilesInPartition(partitionPath);
   }
+
+  @Override
+  public void close() {
+    try {
+      tableMetadata.close();
+    } catch (Exception e) {
+      throw new HoodieException("Error closing metadata file system view.", e);
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 2bfbf39..56c3cd2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +34,7 @@ import java.util.List;
 /**
  * Interface that supports querying various pieces of metadata about a hudi table.
  */
-public interface HoodieTableMetadata extends Serializable {
+public interface HoodieTableMetadata extends Serializable, AutoCloseable {
 
   // Table name suffix
   String METADATA_TABLE_NAME_SUFFIX = "_metadata";
@@ -69,15 +70,13 @@ public interface HoodieTableMetadata extends Serializable {
     return basePath.endsWith(METADATA_TABLE_REL_PATH);
   }
 
-  static HoodieTableMetadata create(HoodieEngineContext engineContext, String datasetBasePath,
-                                    String spillableMapPath, boolean useFileListingFromMetadata, boolean verifyListings,
-                                    boolean enableMetrics, boolean shouldAssumeDatePartitioning) {
-    if (useFileListingFromMetadata) {
-      return new HoodieBackedTableMetadata(engineContext, datasetBasePath, spillableMapPath, useFileListingFromMetadata,
-          verifyListings, enableMetrics, shouldAssumeDatePartitioning);
+  static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
+                                    String spillableMapPath) {
+    if (metadataConfig.useFileListingMetadata()) {
+      return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath);
     } else {
       return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
-          datasetBasePath, shouldAssumeDatePartitioning);
+          datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java
similarity index 72%
rename from hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
rename to hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java
index 1dcd322..9ba3f26 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java
@@ -18,48 +18,48 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 /**
  * Provides functionality to convert timeline instants to table metadata records and then merge by key. Specify
  *  a filter to limit keys that are merged and stored in memory.
  */
-public class HoodieMetadataMergedInstantRecordScanner {
+public class TimelineMergedTableMetadata implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class);
+  private static final Logger LOG = LogManager.getLogger(TimelineMergedTableMetadata.class);
 
   HoodieTableMetaClient metaClient;
   private List<HoodieInstant> instants;
   private Option<String> lastSyncTs;
   private Set<String> mergeKeyFilter;
-  protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
 
-  public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient metaClient, List<HoodieInstant> instants,
-                                                  Option<String> lastSyncTs, Schema readerSchema, Long maxMemorySizeInBytes,
-                                                  String spillableMapBasePath, Set<String> mergeKeyFilter) throws IOException {
+  // keep it a simple hash map, so it can be easily passed onto the executors, once merged.
+  protected final Map<String, HoodieRecord<? extends HoodieRecordPayload>> timelineMergedRecords;
+
+  public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, List<HoodieInstant> instants,
+                                     Option<String> lastSyncTs, Set<String> mergeKeyFilter) {
     this.metaClient = metaClient;
     this.instants = instants;
     this.lastSyncTs = lastSyncTs;
     this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet();
-    this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
-        new HoodieRecordSizeEstimator(readerSchema));
+    this.timelineMergedRecords = new HashMap<>();
 
     scan();
   }
@@ -92,13 +92,13 @@ public class HoodieMetadataMergedInstantRecordScanner {
   private void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) {
     String key = hoodieRecord.getRecordKey();
     if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) {
-      if (records.containsKey(key)) {
+      if (timelineMergedRecords.containsKey(key)) {
         // Merge and store the merged record
-        HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
-        records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
+        HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(timelineMergedRecords.get(key).getData(), new Properties());
+        timelineMergedRecords.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
       } else {
         // Put the record as is
-        records.put(key, hoodieRecord);
+        timelineMergedRecords.put(key, hoodieRecord);
       }
     }
   }
@@ -110,6 +110,6 @@ public class HoodieMetadataMergedInstantRecordScanner {
    * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
    */
   public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) {
-    return Option.ofNullable((HoodieRecord) records.get(key));
+    return Option.ofNullable((HoodieRecord) timelineMergedRecords.get(key));
   }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
index 048402a..e3bac0b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
@@ -96,17 +96,9 @@ public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayW
     // process snapshot queries next.
     List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
     if (snapshotPaths.size() > 0) {
-      Map<HoodieTableMetaClient, List<Path>> groupedPaths =
-          HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
-      LOG.info("Found a total of " + groupedPaths.size() + " groups");
-      for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
-        List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
-        if (result != null) {
-          returns.addAll(result);
-        }
-      }
+      returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
     }
-    return returns.toArray(new FileStatus[returns.size()]);
+    return returns.toArray(new FileStatus[0]);
   }
 
   /**
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index d51aff0..0288cbd 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -108,18 +108,9 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     // process snapshot queries next.
     List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
     if (snapshotPaths.size() > 0) {
-      Map<HoodieTableMetaClient, List<Path>> groupedPaths =
-          HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
-      LOG.info("Found a total of " + groupedPaths.size() + " groups");
-      for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
-        HoodieTableMetaClient metaClient = entry.getKey();
-        List<FileStatus> result = HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(), entry.getValue());
-        if (result != null) {
-          returns.addAll(result);
-        }
-      }
+      returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
     }
-    return returns.toArray(new FileStatus[returns.size()]);
+    return returns.toArray(new FileStatus[0]);
   }
 
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index dd285ba..da45fa6 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.hadoop;
 
-import java.util.Map;
-import java.util.Set;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -32,7 +29,9 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,13 +43,10 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
-
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then
  * always accept
@@ -87,6 +83,9 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
    */
   private SerializableConfiguration conf;
 
+  private transient HoodieLocalEngineContext engineContext;
+
+
   private transient FileSystem fs;
 
   public HoodieROTablePathFilter() {
@@ -116,6 +115,10 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
   @Override
   public boolean accept(Path path) {
 
+    if (engineContext == null) {
+      this.engineContext = new HoodieLocalEngineContext(this.conf.get());
+    }
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Checking acceptance for path " + path);
     }
@@ -164,6 +167,7 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
       }
 
       if (baseDir != null) {
+        HoodieTableFileSystemView fsView = null;
         try {
           HoodieTableMetaClient metaClient = metaClientCache.get(baseDir.toString());
           if (null == metaClient) {
@@ -171,13 +175,9 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
-          boolean useFileListingFromMetadata = getConf().getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
-          boolean verifyFileListing = getConf().getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
-          HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf.get());
-          HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
-              metaClient, useFileListingFromMetadata, verifyFileListing);
+          fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+              metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()));
           String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder);
-
           List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
           // populate the cache
           if (!hoodiePathCache.containsKey(folder.toString())) {
@@ -202,6 +202,10 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
           }
           nonHoodiePathCache.add(folder.toString());
           return true;
+        } finally {
+          if (fsView != null) {
+            fsView.close();
+          }
         }
       } else {
         // files is at < 3 level depth in FS tree, can't be hoodie dataset
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 2ca7ab5..9f98136 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -41,6 +42,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
 import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -65,10 +67,10 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
 import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
 import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
+import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
 
 public class HoodieInputFormatUtils {
 
@@ -413,40 +415,50 @@ public class HoodieInputFormatUtils {
     return grouped;
   }
 
-  /**
-   * Filters data files under @param paths for a snapshot queried table.
-   * @param job
-   * @param metaClient
-   * @param paths
-   * @return
-   */
-  public static List<FileStatus> filterFileStatusForSnapshotMode(
-          JobConf job, HoodieTableMetaClient metaClient, List<Path> paths) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metaClient);
-    }
+  public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
+    return HoodieMetadataConfig.newBuilder()
+        .enable(conf.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS))
+        .validate(conf.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE))
+        .build();
+  }
 
-    boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
-    boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
+  public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
+                                                                 List<Path> snapshotPaths) throws IOException {
     HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
-    HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
-        metaClient, useFileListingFromMetadata, verifyFileListing);
-
-    List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
-    for (Path p : paths) {
-      String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
-      List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
-      filteredBaseFiles.addAll(matched);
-    }
-
-    LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
     List<FileStatus> returns = new ArrayList<>();
-    for (HoodieBaseFile filteredFile : filteredBaseFiles) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+
+    Map<HoodieTableMetaClient, List<Path>> groupedPaths = HoodieInputFormatUtils
+        .groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
+    Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();
+    LOG.info("Found a total of " + groupedPaths.size() + " groups");
+
+    try {
+      for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
+        HoodieTableMetaClient metaClient = entry.getKey();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Hoodie Metadata initialized with completed commit instant as :" + metaClient);
+        }
+
+        HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
+            FileSystemViewManager.createInMemoryFileSystemView(engineContext, tableMetaClient, buildMetadataConfig(job)));
+        List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
+        for (Path p : entry.getValue()) {
+          String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
+          List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
+          filteredBaseFiles.addAll(matched);
+        }
+
+        LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size());
+        for (HoodieBaseFile filteredFile : filteredBaseFiles) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+          }
+          filteredFile = refreshFileStatus(job, filteredFile);
+          returns.add(getFileStatus(filteredFile));
+        }
       }
-      filteredFile = refreshFileStatus(job, filteredFile);
-      returns.add(getFileStatus(filteredFile));
+    } finally {
+      fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
     }
     return returns;
   }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 68d2d22..ce770ba 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -55,11 +55,6 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
-
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
@@ -70,28 +65,25 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
     // TODO(vc): Should we handle also non-hoodie splits here?
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
 
-    boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
-    boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
     // Create file system cache so metadata table is only instantiated once. Also can benefit normal file listing if
     // partition path is listed twice so file groups will already be loaded in file system
     Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new HashMap<>();
     // for all unique split parents, obtain all delta files based on delta commit timeline,
     // grouped on file id
     List<InputSplit> rtSplits = new ArrayList<>();
-    partitionsToParquetSplits.keySet().forEach(partitionPath -> {
-      // for each partition path obtain the data & log file groupings, then map back to inputsplits
-      HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
-      if (!fsCache.containsKey(metaClient)) {
-        HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
-        HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
-            metaClient, useFileListingFromMetadata, verifyFileListing);
-        fsCache.put(metaClient, fsView);
-      }
-      HoodieTableFileSystemView fsView = fsCache.get(metaClient);
-
-      String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
-
-      try {
+    try {
+      partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+        // for each partition path obtain the data & log file groupings, then map back to inputsplits
+        HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
+        if (!fsCache.containsKey(metaClient)) {
+          HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
+          HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+              metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
+          fsCache.put(metaClient, fsView);
+        }
+        HoodieTableFileSystemView fsView = fsCache.get(metaClient);
+
+        String relPartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
         // Both commit and delta-commits are included - pick the latest completed one
         Option<HoodieInstant> latestCompletedInstant =
             metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
@@ -105,7 +97,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
             .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
         // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
         String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
-                HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
+            HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
             .filterCompletedInstants().lastInstant().get().getTimestamp();
         latestFileSlices.forEach(fileSlice -> {
           List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
@@ -121,7 +113,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
                     .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0];
                 FileSplit baseSplit = new FileSplit(eSplit.getPath(), eSplit.getStart(), eSplit.getLength(),
                     hosts, inMemoryHosts);
-                rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit,metaClient.getBasePath(),
+                rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(),
                     logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit()));
               } else {
                 rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
@@ -131,10 +123,13 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
             }
           });
         });
-      } catch (Exception e) {
-        throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
-      }
-    });
+      });
+    } catch (Exception e) {
+      throw new HoodieException("Error obtaining data file/log file grouping ", e);
+    } finally {
+      // close all the open fs views.
+      fsCache.forEach((k, view) -> view.close());
+    }
     LOG.info("Returning a total splits of " + rtSplits.size());
     return rtSplits.toArray(new InputSplit[0]);
   }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 5aa9797..136aa27 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -86,9 +86,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
   protected List<String> getPartitions(Option<Integer> partitionsLimit) throws IOException {
     // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus
     // calls in metrics as they are not part of normal HUDI operation.
-    FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-    List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, fs, metaClient.getBasePath(),
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metaClient.getBasePath(),
         HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false);
     // Sort partition so we can pick last N partitions by default
     Collections.sort(partitionPaths);
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index fe77e12..dbf95de 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -373,7 +373,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
             HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
@@ -392,7 +392,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
             HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
@@ -409,7 +409,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
             HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
@@ -427,7 +427,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
             HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
@@ -443,7 +443,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     reloadInputFormats();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
             HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
@@ -461,7 +461,7 @@ public class TestBootstrap extends HoodieClientTestBase {
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         jsc.hadoopConfiguration(),
-        FSUtils.getAllPartitionPaths(context, metaClient.getFs(), basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
+        FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS,
             HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE, false).stream()
             .map(f -> basePath + "/" + f).collect(Collectors.toList()),
         basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 957477e..8d03252 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.sync.common;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -29,12 +26,13 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
 
-import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -128,13 +126,9 @@ public abstract class AbstractSyncHoodieClient {
   public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
       LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs);
-      try {
-        HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
-        return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, useFileListingFromMetadata, verifyMetadataFileListing,
-            assumeDatePartitioning);
-      } catch (IOException e) {
-        throw new HoodieIOException("Failed to list all partitions in " + basePath, e);
-      }
+      HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
+      return FSUtils.getAllPartitionPaths(engineContext, basePath, useFileListingFromMetadata, verifyMetadataFileListing,
+          assumeDatePartitioning);
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
       return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 969f824..b1bec62 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.timeline.service;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -142,24 +144,28 @@ public class TimelineService {
   }
 
   public static FileSystemViewManager buildFileSystemViewManager(Config config, SerializableConfiguration conf) {
+    HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf.get());
+    // Just use defaults for now
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
+
     switch (config.viewStorageType) {
       case MEMORY:
         FileSystemViewStorageConfig.Builder inMemConfBuilder = FileSystemViewStorageConfig.newBuilder();
         inMemConfBuilder.withStorageType(FileSystemViewStorageType.MEMORY);
-        return FileSystemViewManager.createViewManager(conf, inMemConfBuilder.build());
+        return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build());
       case SPILLABLE_DISK: {
         FileSystemViewStorageConfig.Builder spillableConfBuilder = FileSystemViewStorageConfig.newBuilder();
         spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
             .withBaseStoreDir(config.baseStorePathForFileGroups)
             .withMaxMemoryForView(config.maxViewMemPerTableInMB * 1024 * 1024L)
             .withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
-        return FileSystemViewManager.createViewManager(conf, spillableConfBuilder.build());
+        return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build());
       }
       case EMBEDDED_KV_STORE: {
         FileSystemViewStorageConfig.Builder rocksDBConfBuilder = FileSystemViewStorageConfig.newBuilder();
         rocksDBConfBuilder.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
             .withRocksDBPath(config.rocksDBPath);
-        return FileSystemViewManager.createViewManager(conf, rocksDBConfBuilder.build());
+        return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build());
       }
       default:
         throw new IllegalArgumentException("Invalid view manager storage type :" + config.viewStorageType);
diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
index 9a0ff65..0865585 100644
--- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
+++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
@@ -18,7 +18,8 @@
 
 package org.apache.hudi.timeline.service.functional;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -44,9 +45,12 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst
   protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
     FileSystemViewStorageConfig sConf =
         FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build();
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
+    HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
+
     try {
       server = new TimelineService(0,
-          FileSystemViewManager.createViewManager(new SerializableConfiguration(metaClient.getHadoopConf()), sConf));
+          FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, sConf));
       server.startService();
     } catch (Exception ex) {
       throw new RuntimeException(ex);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index 0c6b0b3..ece9b8c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -98,7 +98,7 @@ public class HoodieSnapshotCopier implements Serializable {
     LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
         latestCommitTimestamp));
 
-    List<String> partitions = FSUtils.getAllPartitionPaths(context, fs, baseDir, useFileListingFromMetadata, verifyMetadataFileListing, shouldAssumeDatePartitioning);
+    List<String> partitions = FSUtils.getAllPartitionPaths(context, baseDir, useFileListingFromMetadata, verifyMetadataFileListing, shouldAssumeDatePartitioning);
     if (partitions.size() > 0) {
       LOG.info(String.format("The job needs to copy %d partitions.", partitions.size()));
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index db80ce9..b9f32cb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -129,7 +129,7 @@ public class HoodieSnapshotExporter {
     LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
         latestCommitTimestamp));
 
-    final List<String> partitions = getPartitions(engineContext, fs, cfg);
+    final List<String> partitions = getPartitions(engineContext, cfg);
     if (partitions.isEmpty()) {
       throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
     }
@@ -154,8 +154,8 @@ public class HoodieSnapshotExporter {
     return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
   }
 
-  private List<String> getPartitions(HoodieEngineContext engineContext, FileSystem fs, Config cfg) throws IOException {
-    return FSUtils.getAllPartitionPaths(engineContext, fs, cfg.sourceBasePath, true, false, false);
+  private List<String> getPartitions(HoodieEngineContext engineContext, Config cfg) {
+    return FSUtils.getAllPartitionPaths(engineContext, cfg.sourceBasePath, true, false, false);
   }
 
   private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index b4813a0..d296f0e 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -87,7 +87,7 @@ public class TimelineServerPerf implements Serializable {
   public void run() throws IOException {
     JavaSparkContext jsc = UtilHelpers.buildSparkContext("hudi-view-perf-" + cfg.basePath, cfg.sparkMaster);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, timelineServer.getFs(), cfg.basePath,
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, cfg.basePath,
         cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, true);
     Collections.shuffle(allPartitionPaths);
     List<String> selected = allPartitionPaths.stream().filter(p -> !p.contains("error")).limit(cfg.maxPartitions)