You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/04 16:00:07 UTC

[hudi] 01/06: [HUDI-842] Implementation of HUDI RFC-15.

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 298808baaf771a3707cf55db8d89ed4421e1f8c3
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Wed Dec 30 18:29:55 2020 -0800

    [HUDI-842] Implementation of HUDI RFC-15.
    
     - Introduced an internal metadata table, that stores file listings.
     - metadata table is kept upto date with
     - Fixed handling of CleanerPlan.
     - [HUDI-842] Reduce parallelism to speed up the test.
     - [HUDI-842] Implementation of CLI commands for metadata operations and lookups.
     - [HUDI-842] Extend rollback metadata to include the files which have been appended to.
     - [HUDI-842] Support for rollbacks in MOR Table.
     - MarkerBasedRollbackStrategy needs to correctly provide the list of files for which rollback blocks were appended.
     - [HUDI-842] Added unit test for rollback of partial commits (inflight but not completed yet).
     - [HUDI-842] Handled the error case where metadata update succeeds but dataset commit fails.
     - [HUDI-842] Schema evolution strategy for Metadata Table. Each type of metadata saved (FilesystemMetadata, ColumnIndexMetadata, etc.) will be a separate field with default null. The type of the record will identify the valid field. This way, we can grow the schema when new type of information is saved within in which still keeping it backward compatible.
     - [HUDI-842] Fix non-partitioned case and speedup initial creation of metadata table.Choose only 1 partition for jsc as the number of records is low (hundreds to thousands). There is more overhead of creating large number of partitions for JavaRDD and it slows down operations like WorkloadProfile.
    For the non-partitioned case, use "." as the name of the partition to prevent empty keys in HFile.
     - [HUDI-842] Reworked metrics pusblishing.
     - Code has been split into reader and writer side. HoodieMetadata code to be accessed by using HoodieTable.metadata() to get instance of metdata for the table.
    Code is serializable to allow executors to use the functionality.
     - [RFC-15] Add metrics to track the time for each file system call.
     - [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors. This helps create a stats dashboard which shows the metadata table improvements in real-time for production tables.
     - [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table. This is safer than full-fledged properties for the metadata table (like HoodieWriteConfig) as it makes burdensome to tune the metadata. With limited configuration, we can control the performance of the metadata table closely.
    
    [HUDI-1319][RFC-15] Adding interfaces for HoodieMetadata, HoodieMetadataWriter (apache#2266)
     - moved MetadataReader to HoodieBackedTableMetadata, under the HoodieTableMetadata interface
     - moved MetadataWriter to HoodieBackedTableMetadataWriter, under the HoodieTableMetadataWriter
     - Pulled all the metrics into HoodieMetadataMetrics
     - Writer now wraps the metadata, instead of extending it
     - New enum for MetadataPartitionType
     - Streamlined code flow inside HoodieBackedTableMetadataWriter w.r.t initializing metadata state
     - [HUDI-1319] Make async operations work with metadata table (apache#2332)
     - Changes the syncing model to only move over completed instants on data timeline
     - Syncing happens postCommit and on writeClient initialization
     - Latest delta commit on the metadata table is sufficient as the watermark for data timeline archival
     - Cleaning/Compaction use a suffix to the last instant written to metadata table, such that we keep the 1-1
     - .. mapping between data and metadata timelines.
     - Got rid of a lot of the complexity around checking for valid commits during open of base/log files
     - Tests now use local FS, to simulate more failure scenarios
     - Some failure scenarios exposed HUDI-1434, which is needed for MOR to work correctly
    
    co-authored by: Vinoth Chandar <vi...@apache.org>
---
 .../apache/hudi/cli/commands/MetadataCommand.java  | 226 ++++++
 .../apache/hudi/client/AbstractHoodieClient.java   |   5 +
 .../hudi/client/AbstractHoodieWriteClient.java     |  23 +-
 .../apache/hudi/client/AsyncCleanerService.java    |   7 +-
 .../apache/hudi/client/CompactionAdminClient.java  |   5 +
 .../apache/hudi/config/HoodieMetadataConfig.java   | 150 ++++
 .../apache/hudi/config/HoodieMetricsConfig.java    |   8 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  48 ++
 .../metadata/HoodieBackedTableMetadataWriter.java  | 647 +++++++++++++++++
 .../metadata/HoodieMetadataFileSystemView.java     |  52 ++
 .../hudi/metadata/HoodieTableMetadataWriter.java   |  43 ++
 .../java/org/apache/hudi/table/HoodieTable.java    |  33 +-
 .../hudi/table/HoodieTimelineArchiveLog.java       |  19 +
 .../hudi/table/action/clean/CleanPlanner.java      |   6 +-
 .../AbstractMarkerBasedRollbackStrategy.java       |  13 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   4 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   8 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |  51 +-
 .../apache/hudi/metrics/DistributedRegistry.java   | 107 +++
 .../SparkHoodieBackedTableMetadataWriter.java      | 186 +++++
 .../apache/hudi/metadata/TestHoodieFsMetadata.java | 801 +++++++++++++++++++++
 hudi-common/pom.xml                                |   1 +
 hudi-common/src/main/avro/HoodieMetadata.avsc      |  56 ++
 .../src/main/avro/HoodieRollbackMetadata.avsc      |  10 +-
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  27 +-
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 297 +++++---
 .../common/fs/SizeAwareFSDataOutputStream.java     |  19 +-
 .../hudi/common/fs/TimedFSDataInputStream.java     |  79 ++
 .../metrics/{Registry.java => LocalRegistry.java}  |  64 +-
 .../org/apache/hudi/common/metrics/Registry.java   | 110 +--
 .../table/log/AbstractHoodieLogRecordScanner.java  |  16 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |   5 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |  49 +-
 .../table/log/block/HoodieHFileDataBlock.java      |   3 +
 .../table/timeline/HoodieActiveTimeline.java       |  16 +-
 .../table/timeline/TimelineMetadataUtils.java      |  10 +-
 .../table/view/AbstractTableFileSystemView.java    |  12 +-
 .../common/table/view/HoodieMetadataException.java |  34 +
 .../hudi/exception/TableNotFoundException.java     |  14 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 447 ++++++++++++
 .../HoodieMetadataMergedLogRecordScanner.java      |  75 ++
 .../hudi/metadata/HoodieMetadataMetrics.java       | 147 ++++
 .../hudi/metadata/HoodieMetadataPayload.java       | 233 ++++++
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  93 +++
 .../hudi/metadata/MetadataPartitionType.java       |  33 +
 45 files changed, 4003 insertions(+), 289 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
new file mode 100644
index 0000000..68ff1d1
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.utils.SparkUtil;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieMetadataConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CLI commands to operate on the Metadata Table.
+ */
+@Component
+public class MetadataCommand implements CommandMarker {
+
+  private JavaSparkContext jsc;
+  private static String metadataBaseDirectory;
+
+  /**
+   * Sets the directory to store/read Metadata Table.
+   *
+   * This can be used to store the metadata table away from the dataset directory.
+   *  - Useful for testing as well as for using via the HUDI CLI so that the actual dataset is not written to.
+   *  - Useful for testing Metadata Table performance and operations on existing datasets before enabling.
+   */
+  public static void setMetadataBaseDirectory(String metadataDir) {
+    ValidationUtils.checkState(metadataBaseDirectory == null,
+        "metadataBaseDirectory is already set to " + metadataBaseDirectory);
+    metadataBaseDirectory = metadataDir;
+  }
+
+  public static String getMetadataTableBasePath(String tableBasePath) {
+    if (metadataBaseDirectory != null) {
+      return metadataBaseDirectory;
+    }
+    return HoodieTableMetadata.getMetadataTableBasePath(tableBasePath);
+  }
+
+  @CliCommand(value = "metadata set", help = "Set options for Metadata Table")
+  public String set(@CliOption(key = {"metadataDir"},
+      help = "Directory to read/write metadata table (can be different from dataset)", unspecifiedDefaultValue = "")
+                    final String metadataDir) {
+    if (!metadataDir.isEmpty()) {
+      setMetadataBaseDirectory(metadataDir);
+    }
+
+    return "Ok";
+  }
+
+  @CliCommand(value = "metadata create", help = "Create the Metadata Table if it does not exist")
+  public String create() throws IOException {
+    HoodieCLI.getTableMetaClient();
+    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
+    try {
+      FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
+      if (statuses.length > 0) {
+        throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") not empty.");
+      }
+    } catch (FileNotFoundException e) {
+      // Metadata directory does not exist yet
+      HoodieCLI.fs.mkdirs(metadataPath);
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    initJavaSparkContext();
+    SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
+    return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
+  }
+
+  @CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
+  public String delete() throws Exception {
+    HoodieCLI.getTableMetaClient();
+    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
+    try {
+      FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
+      if (statuses.length > 0) {
+        HoodieCLI.fs.delete(metadataPath, true);
+      }
+    } catch (FileNotFoundException e) {
+      // Metadata directory does not exist
+    }
+
+    return String.format("Removed Metdata Table from %s", metadataPath);
+  }
+
+  @CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation")
+  public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = "false",
+      help = "Open in read-only mode") final boolean readOnly) throws Exception {
+    HoodieCLI.getTableMetaClient();
+    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
+    try {
+      HoodieCLI.fs.listStatus(metadataPath);
+    } catch (FileNotFoundException e) {
+      // Metadata directory does not exist
+      throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") does not exist.");
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    if (!readOnly) {
+      HoodieWriteConfig writeConfig = getWriteConfig();
+      initJavaSparkContext();
+      SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
+    }
+
+    String action = readOnly ? "Opened" : "Initialized";
+    return String.format(action + " Metadata Table in %s (duration=%.2fsec)", metadataPath, (timer.endTimer()) / 1000.0);
+  }
+
+  @CliCommand(value = "metadata stats", help = "Print stats about the metadata")
+  public String stats() throws IOException {
+    HoodieCLI.getTableMetaClient();
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false);
+    Map<String, String> stats = metadata.stats();
+
+    StringBuffer out = new StringBuffer("\n");
+    out.append(String.format("Base path: %s\n", getMetadataTableBasePath(HoodieCLI.basePath)));
+    for (Map.Entry<String, String> entry : stats.entrySet()) {
+      out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
+    }
+
+    return out.toString();
+  }
+
+  @CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata")
+  public String listPartitions() throws IOException {
+    HoodieCLI.getTableMetaClient();
+    HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false);
+
+    StringBuffer out = new StringBuffer("\n");
+    if (!metadata.enabled()) {
+      out.append("=== Metadata Table not initilized. Using file listing to get list of partitions. ===\n\n");
+    }
+
+    long t1 = System.currentTimeMillis();
+    List<String> partitions = metadata.getAllPartitionPaths();
+    long t2 = System.currentTimeMillis();
+
+    int[] count = {0};
+    partitions.stream().sorted((p1, p2) -> p2.compareTo(p1)).forEach(p -> {
+      out.append(p);
+      if (++count[0] % 15 == 0) {
+        out.append("\n");
+      } else {
+        out.append(", ");
+      }
+    });
+
+    out.append(String.format("\n\n=== List of partitions retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
+
+    return out.toString();
+  }
+
+  @CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
+  public String listFiles(
+      @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true)
+      final String partition) throws IOException {
+    HoodieCLI.getTableMetaClient();
+    HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false);
+
+    StringBuffer out = new StringBuffer("\n");
+    if (!metaReader.enabled()) {
+      out.append("=== Metadata Table not initialized. Using file listing to get list of files in partition. ===\n\n");
+    }
+
+    long t1 = System.currentTimeMillis();
+    FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
+    long t2 = System.currentTimeMillis();
+
+    Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
+      out.append("\t" + p.getPath().getName());
+      out.append("\n");
+    });
+
+    out.append(String.format("\n=== Files in partition retrieved in %.2fsec ===", (t2 - t1) / 1000.0));
+
+    return out.toString();
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    return HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
+  }
+
+  private void initJavaSparkContext() {
+    if (jsc == null) {
+      jsc = SparkUtil.initJavaSparkConf("HoodieClI");
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index ba7db3e..818751d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -71,6 +71,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
     this.timelineServer = timelineServer;
     shouldStopTimelineServer = !timelineServer.isPresent();
     startEmbeddedServerView();
+    initWrapperFSMetrics();
   }
 
   /**
@@ -118,6 +119,10 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
     return config;
   }
 
+  protected void initWrapperFSMetrics() {
+    // no-op.
+  }
+
   protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
     return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad,
         config.getConsistencyGuardConfig(),
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 0149987..e12b3af 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -134,6 +134,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
     this.index = createIndex(writeConfig);
+    syncTableMetadata();
   }
 
   protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig writeConfig);
@@ -220,6 +221,10 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
     }
   }
 
+  protected void syncTableMetadata() {
+    // no-op
+  }
+
   /**
    * Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
    *
@@ -407,7 +412,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
       // We cannot have unbounded commit files. Archive commits if we have to archive
       HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
       archiveLog.archiveIfRequired(context);
-      autoCleanOnCommit(instantTime);
+      autoCleanOnCommit();
+
+      syncTableMetadata();
     } catch (IOException ioe) {
       throw new HoodieIOException(ioe.getMessage(), ioe);
     }
@@ -434,9 +441,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
   /**
    * Handle auto clean during commit.
    *
-   * @param instantTime
    */
-  protected void autoCleanOnCommit(String instantTime) {
+  protected void autoCleanOnCommit() {
     if (config.isAutoClean()) {
       // Call clean to cleanup if there is anything to cleanup after the commit,
       if (config.isAsyncClean()) {
@@ -444,8 +450,9 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
         AsyncCleanerService.waitForCompletion(asyncCleanerService);
         LOG.info("Cleaner has finished");
       } else {
+        // Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
         LOG.info("Auto cleaning is enabled. Running cleaner now");
-        clean(instantTime);
+        clean();
       }
     }
   }
@@ -599,8 +606,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
    * Provides a new commit time for a write operation (insert/update/delete).
    */
   public String startCommit() {
+    // NOTE : Need to ensure that rollback is done before a new commit is started
+    if (rollbackPending) {
+      // Only rollback pending commit/delta-commits. Do not touch compaction commits
+      rollbackPendingCommits();
+    }
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    startCommitWithTime(instantTime);
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
     return instantTime;
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
index e8016c9..c259acf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.async.HoodieAsyncService;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
@@ -52,11 +53,11 @@ class AsyncCleanerService extends HoodieAsyncService {
     }), executor);
   }
 
-  public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient,
-                                                                String instantTime) {
+  public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
     AsyncCleanerService asyncCleanerService = null;
     if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
-      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation");
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
+      LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
       asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
       asyncCleanerService.start(null);
     } else {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index a2ecb67..368c6b6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -481,6 +481,11 @@ public class CompactionAdminClient extends AbstractHoodieClient {
     throw new HoodieException("FileGroupId " + fgId + " not in pending compaction");
   }
 
+  @Override
+  protected void initWrapperFSMetrics() {
+    // no-op
+  }
+
   /**
    * Holds Operation result for Renaming.
    */
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
new file mode 100644
index 0000000..54c4ac3
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Configurations used by the HUDI Metadata Table.
+ */
+@Immutable
+public class HoodieMetadataConfig extends DefaultHoodieConfig {
+
+  public static final String METADATA_PREFIX = "hoodie.metadata";
+
+  // Enable the internal Metadata Table which saves file listings
+  public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + ".enable";
+  public static final boolean DEFAULT_METADATA_ENABLE = false;
+
+  // Validate contents of Metadata Table on each access against the actual filesystem
+  public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate";
+  public static final boolean DEFAULT_METADATA_VALIDATE = false;
+
+  // Parallelism for inserts
+  public static final String METADATA_INSERT_PARALLELISM_PROP = METADATA_PREFIX + ".insert.parallelism";
+  public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 1;
+
+  // Async clean
+  public static final String METADATA_ASYNC_CLEAN_PROP = METADATA_PREFIX + ".clean.async";
+  public static final boolean DEFAULT_METADATA_ASYNC_CLEAN = false;
+
+  // Maximum delta commits before compaction occurs
+  public static final String METADATA_COMPACT_NUM_DELTA_COMMITS_PROP = METADATA_PREFIX + ".compact.max.delta.commits";
+  public static final int DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS = 24;
+
+  // Archival settings
+  public static final String MIN_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.min.commits";
+  public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20;
+  public static final String MAX_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.max.commits";
+  public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30;
+
+  // Cleaner commits retained
+  public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
+  public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
+
+  private HoodieMetadataConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieMetadataConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public Builder enable(boolean enable) {
+      props.setProperty(METADATA_ENABLE_PROP, String.valueOf(enable));
+      return this;
+    }
+
+    public Builder validate(boolean validate) {
+      props.setProperty(METADATA_VALIDATE_PROP, String.valueOf(validate));
+      return this;
+    }
+
+    public Builder withInsertParallelism(int parallelism) {
+      props.setProperty(METADATA_INSERT_PARALLELISM_PROP, String.valueOf(parallelism));
+      return this;
+    }
+
+    public Builder withAsyncClean(boolean asyncClean) {
+      props.setProperty(METADATA_ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
+      return this;
+    }
+
+    public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
+      props.setProperty(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
+      return this;
+    }
+
+    public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
+      props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep));
+      props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep));
+      return this;
+    }
+
+    public Builder retainCommits(int commitsRetained) {
+      props.setProperty(CLEANER_COMMITS_RETAINED_PROP, String.valueOf(commitsRetained));
+      return this;
+    }
+
+    public HoodieMetadataConfig build() {
+      HoodieMetadataConfig config = new HoodieMetadataConfig(props);
+      setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP,
+          String.valueOf(DEFAULT_METADATA_ENABLE));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE_PROP), METADATA_VALIDATE_PROP,
+          String.valueOf(DEFAULT_METADATA_VALIDATE));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_INSERT_PARALLELISM_PROP), METADATA_INSERT_PARALLELISM_PROP,
+          String.valueOf(DEFAULT_METADATA_INSERT_PARALLELISM));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_ASYNC_CLEAN_PROP), METADATA_ASYNC_CLEAN_PROP,
+          String.valueOf(DEFAULT_METADATA_ASYNC_CLEAN));
+      setDefaultOnCondition(props, !props.containsKey(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP),
+          METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS));
+      setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED_PROP), CLEANER_COMMITS_RETAINED_PROP,
+          String.valueOf(DEFAULT_CLEANER_COMMITS_RETAINED));
+      setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP,
+          String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP));
+      setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP,
+          String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP));
+
+      return config;
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index 800c75f..b6cb6e5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -62,6 +62,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
   public static final String METRICS_REPORTER_CLASS = METRIC_PREFIX + ".reporter.class";
   public static final String DEFAULT_METRICS_REPORTER_CLASS = "";
 
+  // Enable metrics collection from executors
+  public static final String ENABLE_EXECUTOR_METRICS = METRIC_PREFIX + ".executor.enable";
+
   private HoodieMetricsConfig(Properties props) {
     super(props);
   }
@@ -126,6 +129,11 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withExecutorMetrics(boolean enable) {
+      props.setProperty(ENABLE_EXECUTOR_METRICS, String.valueOf(enable));
+      return this;
+    }
+
     public HoodieMetricsConfig build() {
       HoodieMetricsConfig config = new HoodieMetricsConfig(props);
       setDefaultOnCondition(props, !props.containsKey(METRICS_ON), METRICS_ON, String.valueOf(DEFAULT_METRICS_ON));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9122396..ae56454 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -689,6 +689,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.METRICS_ON));
   }
 
+  public boolean isExecutorMetricsEnabled() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.ENABLE_EXECUTOR_METRICS, "false"));
+  }
+
   public MetricsReporterType getMetricsReporterType() {
     return MetricsReporterType.valueOf(props.getProperty(HoodieMetricsConfig.METRICS_REPORTER_TYPE));
   }
@@ -874,6 +878,41 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP));
   }
 
+  /**
+   * File listing metadata configs.
+   */
+  public boolean useFileListingMetadata() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP));
+  }
+
+  public boolean getFileListingMetadataVerify() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP));
+  }
+
+  public int getMetadataInsertParallelism() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP));
+  }
+
+  public int getMetadataCompactDeltaCommitMax() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_COMPACT_NUM_DELTA_COMMITS_PROP));
+  }
+
+  public boolean isMetadataAsyncClean() {
+    return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ASYNC_CLEAN_PROP));
+  }
+
+  public int getMetadataMaxCommitsToKeep() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP_PROP));
+  }
+
+  public int getMetadataMinCommitsToKeep() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP_PROP));
+  }
+
+  public int getMetadataCleanerCommitsRetained() {
+    return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP));
+  }
+
   public static class Builder {
 
     protected final Properties props = new Properties();
@@ -889,6 +928,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     private boolean isConsistencyGuardSet = false;
     private boolean isCallbackConfigSet = false;
     private boolean isPayloadConfigSet = false;
+    private boolean isMetadataConfigSet = false;
 
     public Builder withEngineType(EngineType engineType) {
       this.engineType = engineType;
@@ -1056,6 +1096,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) {
+      props.putAll(metadataConfig.getProps());
+      isMetadataConfigSet = true;
+      return this;
+    }
+
     public Builder withAutoCommit(boolean autoCommit) {
       props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
       return this;
@@ -1204,6 +1250,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
       setDefaultOnCondition(props, !isPayloadConfigSet,
           HoodiePayloadConfig.newBuilder().fromProperties(props).build());
+      setDefaultOnCondition(props, !isMetadataConfigSet,
+          HoodieMetadataConfig.newBuilder().fromProperties(props).build());
 
       setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
           EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
new file mode 100644
index 0000000..98d314d
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMetadataConfig;
+import org.apache.hudi.config.HoodieMetricsConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
+import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+
+/**
+ * Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
+ * called Metadata Table. This table is created by listing files and partitions (first time)
+ * and kept in sync using the instants on the main dataset.
+ */
+public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWriter {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
+
+  protected HoodieWriteConfig metadataWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieBackedTableMetadata metadata;
+  protected HoodieTableMetaClient metaClient;
+  protected Option<HoodieMetadataMetrics> metrics;
+  protected boolean enabled;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+
+  protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.useFileListingMetadata()) {
+      this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
+      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
+      enabled = true;
+
+      // Inline compaction and auto clean is required as we dont expose this table outside
+      ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
+      ValidationUtils.checkArgument(!this.metadataWriteConfig.isInlineCompaction(), "Compaction is controlled internally for metadata table.");
+      // Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
+      ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
+      ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table");
+
+      initRegistry();
+      HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
+      initialize(engineContext, datasetMetaClient);
+      if (enabled) {
+        // (re) init the metadata for reading.
+        initTableMetadata();
+
+        // This is always called even in case the table was created for the first time. This is because
+        // initFromFilesystem() does file listing and hence may take a long time during which some new updates
+        // may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
+        // with the active timeline.
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        syncFromInstants(datasetMetaClient);
+        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer()));
+      }
+    } else {
+      enabled = false;
+      this.metrics = Option.empty();
+    }
+  }
+
+  protected abstract void initRegistry();
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Metadata Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) {
+    int parallelism = writeConfig.getMetadataInsertParallelism();
+
+    // Create the write config for the metadata table by borrowing options from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+            .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
+            .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
+            .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
+            .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withAutoCommit(true)
+        .withAvroSchemaValidate(true)
+        .withEmbeddedTimelineServerEnabled(false)
+        .withAssumeDatePartitioning(false)
+        .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
+        .withSchema(HoodieMetadataRecord.getClassSchema().toString())
+        .forTable(tableName)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withAsyncClean(writeConfig.isMetadataAsyncClean())
+            // we will trigger cleaning manually, to control the instant times
+            .withAutoClean(false)
+            .withCleanerParallelism(parallelism)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
+            .archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep())
+            // we will trigger compaction manually, to control the instant times
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
+        .withParallelism(parallelism, parallelism)
+        .withDeleteParallelism(parallelism)
+        .withRollbackParallelism(parallelism)
+        .withFinalizeWriteParallelism(parallelism);
+
+    if (writeConfig.isMetricsOn()) {
+      HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder()
+          .withReporterType(writeConfig.getMetricsReporterType().toString())
+          .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
+          .on(true);
+      switch (writeConfig.getMetricsReporterType()) {
+        case GRAPHITE:
+          metricsConfig.onGraphitePort(writeConfig.getGraphiteServerPort())
+              .toGraphiteHost(writeConfig.getGraphiteServerHost())
+              .usePrefix(writeConfig.getGraphiteMetricPrefix());
+          break;
+        case JMX:
+          metricsConfig.onJmxPort(writeConfig.getJmxPort())
+              .toJmxHost(writeConfig.getJmxHost());
+          break;
+        case DATADOG:
+          // TODO:
+          break;
+        case CONSOLE:
+        case INMEMORY:
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
+      }
+
+      builder.withMetricsConfig(metricsConfig.build());
+    }
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return metadataWriteConfig;
+  }
+
+  public HoodieTableMetadata metadata() {
+    return metadata;
+  }
+
+  /**
+   * Initialize the metadata table if it does not exist. Update the metadata to bring it in sync with the file system.
+   *
+   * This can happen in two ways:
+   * 1. If the metadata table did not exist, then file and partition listing is used
+   * 2. If the metadata table exists, the instants from active timeline are read in order and changes applied
+   *
+   * The above logic has been chosen because it is faster to perform #1 at scale rather than read all the Instants
+   * which are large in size (AVRO or JSON encoded and not compressed) and incur considerable IO for de-serialization
+   * and decoding.
+   */
+  protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient);
+
+  private void initTableMetadata() {
+    this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
+        datasetWriteConfig.useFileListingMetadata(), datasetWriteConfig.getFileListingMetadataVerify(), false,
+        datasetWriteConfig.shouldAssumeDatePartitioning());
+    this.metaClient = metadata.getMetaClient();
+  }
+
+  protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    boolean exists = datasetMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
+    if (!exists) {
+      // Initialize for the first time by listing partitions and files directly from the file system
+      bootstrapFromFilesystem(engineContext, datasetMetaClient);
+      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+    }
+  }
+
+  /**
+   * Initialize the Metadata Table by listing files and partitions from the file system.
+   *
+   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   */
+  private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
+    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");
+
+    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+    // Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it.
+    Option<HoodieInstant> latestInstant = Option.empty();
+    boolean foundNonComplete = false;
+    for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) {
+      if (!instant.isCompleted()) {
+        foundNonComplete = true;
+      } else if (!foundNonComplete) {
+        latestInstant = Option.of(instant);
+      }
+    }
+
+    String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+    LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
+
+    HoodieTableMetaClient.initTableType(hadoopConf.get(), metadataWriteConfig.getBasePath(),
+        HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(),
+        HoodieFileFormat.HFILE.toString());
+    initTableMetadata();
+
+    // List all partitions in the basePath of the containing dataset
+    FileSystem fs = datasetMetaClient.getFs();
+    List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetWriteConfig.getBasePath(), datasetWriteConfig.shouldAssumeDatePartitioning());
+    LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");
+
+    // List all partitions in parallel and collect the files in them
+    int parallelism =  Math.max(partitions.size(), 1);
+    List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> {
+      FileSystem fsys = datasetMetaClient.getFs();
+      FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(datasetWriteConfig.getBasePath(), partition));
+      return Pair.of(partition, statuses);
+    }, parallelism);
+
+    // Create a HoodieCommitMetadata with writeStats for all discovered files
+    int[] stats = {0};
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    partitionFileList.forEach(t -> {
+      final String partition = t.getKey();
+      try {
+        if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
+          return;
+        }
+      } catch (IOException e) {
+        throw new HoodieMetadataException("Failed to check partition " + partition, e);
+      }
+
+      // Filter the statuses to only include files which were created before or on createInstantTime
+      Arrays.stream(t.getValue()).filter(status -> {
+        String filename = status.getPath().getName();
+        if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
+          return false;
+        }
+        if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
+            createInstantTime)) {
+          return false;
+        }
+        return true;
+      }).forEach(status -> {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
+        writeStat.setPartitionPath(partition);
+        writeStat.setTotalWriteBytes(status.getLen());
+        commitMetadata.addWriteStat(partition, writeStat);
+        stats[0] += 1;
+      });
+
+      // If the partition has no files then create a writeStat with no file path
+      if (commitMetadata.getWriteStats(partition) == null) {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setPartitionPath(partition);
+        commitMetadata.addWriteStat(partition, writeStat);
+      }
+    });
+
+    LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
+    update(commitMetadata, createInstantTime);
+  }
+
+  /**
+   * Sync the Metadata Table from the instants created on the dataset.
+   *
+   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   */
+  private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
+    ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");
+
+    try {
+      List<HoodieInstant> instantsToSync = metadata.findInstantsToSync(datasetMetaClient);
+      if (instantsToSync.isEmpty()) {
+        return;
+      }
+
+      LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);
+
+      // Read each instant in order and sync it to metadata table
+      final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
+      for (HoodieInstant instant : instantsToSync) {
+        LOG.info("Syncing instant " + instant + " to metadata table");
+        ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");
+
+        switch (instant.getAction()) {
+          case HoodieTimeline.CLEAN_ACTION:
+            HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
+            update(cleanMetadata, instant.getTimestamp());
+            break;
+          case HoodieTimeline.DELTA_COMMIT_ACTION:
+          case HoodieTimeline.COMMIT_ACTION:
+          case HoodieTimeline.COMPACTION_ACTION:
+            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+                timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+            update(commitMetadata, instant.getTimestamp());
+            break;
+          case HoodieTimeline.ROLLBACK_ACTION:
+            HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+                timeline.getInstantDetails(instant).get());
+            update(rollbackMetadata, instant.getTimestamp());
+            break;
+          case HoodieTimeline.RESTORE_ACTION:
+            HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+                timeline.getInstantDetails(instant).get());
+            update(restoreMetadata, instant.getTimestamp());
+            break;
+          case HoodieTimeline.SAVEPOINT_ACTION:
+            // Nothing to be done here
+            break;
+          default:
+            throw new HoodieException("Unknown type of action " + instant.getAction());
+        }
+      }
+      // re-init the table metadata, for any future writes.
+      initTableMetadata();
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Unable to sync instants from data to metadata table.", ioe);
+    }
+  }
+
+  /**
+   * Update from {@code HoodieCommitMetadata}.
+   *
+   * @param commitMetadata {@code HoodieCommitMetadata}
+   * @param instantTime Timestamp at which the commit was performed
+   */
+  @Override
+  public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
+    if (!enabled) {
+      return;
+    }
+
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> allPartitions = new LinkedList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
+      final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
+      allPartitions.add(partition);
+
+      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+      writeStats.forEach(hoodieWriteStat -> {
+        String pathWithPartition = hoodieWriteStat.getPath();
+        if (pathWithPartition == null) {
+          throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
+        }
+
+        int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
+        String filename = pathWithPartition.substring(offset);
+        ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
+        newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
+      });
+
+      // New files added to a partition
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
+          partition, Option.of(newFiles), Option.empty());
+      records.add(record);
+    });
+
+    // New partitions created
+    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
+    records.add(record);
+
+    LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+        + ". #partitions_updated=" + records.size());
+    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
+  }
+
+  /**
+   * Update from {@code HoodieCleanerPlan}.
+   *
+   * @param cleanerPlan {@code HoodieCleanerPlan}
+   * @param instantTime Timestamp at which the clean plan was generated
+   */
+  @Override
+  public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
+    if (!enabled) {
+      return;
+    }
+
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileDeleteCount = {0};
+    cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
+      fileDeleteCount[0] += deletedPathInfo.size();
+
+      // Files deleted from a partition
+      List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName())
+          .collect(Collectors.toList());
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+          Option.of(deletedFilenames));
+      records.add(record);
+    });
+
+    LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size()
+        + ", #files_deleted=" + fileDeleteCount[0]);
+    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
+  }
+
+  /**
+   * Update from {@code HoodieCleanMetadata}.
+   *
+   * @param cleanMetadata {@code HoodieCleanMetadata}
+   * @param instantTime Timestamp at which the clean was completed
+   */
+  @Override
+  public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
+    if (!enabled) {
+      return;
+    }
+
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileDeleteCount = {0};
+
+    cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
+      // Files deleted from a partition
+      List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+          Option.of(new ArrayList<>(deletedFiles)));
+
+      records.add(record);
+      fileDeleteCount[0] += deletedFiles.size();
+    });
+
+    LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+        + ", #files_deleted=" + fileDeleteCount[0]);
+    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
+  }
+
+  /**
+   * Update from {@code HoodieRestoreMetadata}.
+   *
+   * @param restoreMetadata {@code HoodieRestoreMetadata}
+   * @param instantTime Timestamp at which the restore was performed
+   */
+  @Override
+  public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
+    if (!enabled) {
+      return;
+    }
+
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
+      rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles));
+    });
+    commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore");
+  }
+
+  /**
+   * Update from {@code HoodieRollbackMetadata}.
+   *
+   * @param rollbackMetadata {@code HoodieRollbackMetadata}
+   * @param instantTime Timestamp at which the rollback was performed
+   */
+  @Override
+  public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
+    if (!enabled) {
+      return;
+    }
+
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+    processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles);
+    commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
+  }
+
+  /**
+   * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
+   *
+   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
+   * function will extract this change file for each partition.
+   *
+   * @param rollbackMetadata {@code HoodieRollbackMetadata}
+   * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
+   * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
+   */
+  private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
+                                       Map<String, List<String>> partitionToDeletedFiles,
+                                       Map<String, Map<String, Long>> partitionToAppendedFiles) {
+    rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
+      final String partition = pm.getPartitionPath();
+
+      if (!pm.getSuccessDeleteFiles().isEmpty()) {
+        if (!partitionToDeletedFiles.containsKey(partition)) {
+          partitionToDeletedFiles.put(partition, new ArrayList<>());
+        }
+
+        // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
+        List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
+            .collect(Collectors.toList());
+        partitionToDeletedFiles.get(partition).addAll(deletedFiles);
+      }
+
+      if (!pm.getAppendFiles().isEmpty()) {
+        if (!partitionToAppendedFiles.containsKey(partition)) {
+          partitionToAppendedFiles.put(partition, new HashMap<>());
+        }
+
+        // Extract appended file name from the absolute paths saved in getAppendFiles()
+        pm.getAppendFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+            return size + oldSize;
+          });
+        });
+      }
+    });
+  }
+
+  /**
+   * Create file delete records and commit.
+   *
+   * @param partitionToDeletedFiles {@code Map} of partitions and the deleted files
+   * @param instantTime Timestamp at which the deletes took place
+   * @param operation Type of the operation which caused the files to be deleted
+   */
+  private void commitRollback(Map<String, List<String>> partitionToDeletedFiles,
+                              Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime,
+                              String operation) {
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileChangeCount = {0, 0}; // deletes, appends
+
+    partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
+      // Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the
+      // metadata table. Hence, the deleted filed need to be checked against the metadata.
+      try {
+        FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition));
+        Set<String> currentFiles =
+            Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet());
+
+        int origCount = deletedFiles.size();
+        deletedFiles.removeIf(f -> !currentFiles.contains(f));
+        if (deletedFiles.size() != origCount) {
+          LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the "
+              + " metadata for partition " + partition
+              + ". To delete = " + origCount + ", found=" + deletedFiles.size());
+        }
+
+        fileChangeCount[0] += deletedFiles.size();
+
+        Option<Map<String, Long>> filesAdded = Option.empty();
+        if (partitionToAppendedFiles.containsKey(partition)) {
+          filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
+        }
+
+        HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
+            Option.of(new ArrayList<>(deletedFiles)));
+        records.add(record);
+      } catch (IOException e) {
+        throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e);
+      }
+    });
+
+    partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
+      fileChangeCount[1] += appendedFileMap.size();
+
+      // Validate that no appended file has been deleted
+      ValidationUtils.checkState(
+          !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())),
+          "Rollback file cannot both be appended and deleted");
+
+      // New files added to a partition
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap),
+          Option.empty());
+      records.add(record);
+    });
+
+    LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size()
+        + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
+    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
+  }
+
+  /**
+   * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
+   *
+   */
+  protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
new file mode 100644
index 0000000..8c23ea8
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
+ */
+public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
+  private HoodieTable hoodieTable;
+
+  public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTable table,
+                                      HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) {
+    super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync);
+    this.hoodieTable = table;
+  }
+
+  /**
+   * Return all the files in the partition by reading from the Metadata Table.
+   *
+   * @param partitionPath The absolute path of the partition
+   * @throws IOException
+   */
+  @Override
+  protected FileStatus[] listPartition(Path partitionPath) throws IOException {
+    return hoodieTable.metadata().getAllFilesInPartition(partitionPath);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
new file mode 100644
index 0000000..fa1f464
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+import java.io.Serializable;
+
+/**
+ * Interface that supports updating metadata for a given table, as actions complete.
+ */
+public interface HoodieTableMetadataWriter extends Serializable {
+
+  void update(HoodieCommitMetadata commitMetadata, String instantTime);
+
+  void update(HoodieCleanerPlan cleanerPlan, String instantTime);
+
+  void update(HoodieCleanMetadata cleanMetadata, String instantTime);
+
+  void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
+
+  void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 6b7a7d2..684df39 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -49,6 +49,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
@@ -61,6 +62,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.log4j.LogManager;
@@ -94,6 +97,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
 
   private SerializableConfiguration hadoopConfiguration;
   private transient FileSystemViewManager viewManager;
+  private HoodieTableMetadata metadata;
 
   protected final TaskContextSupplier taskContextSupplier;
 
@@ -242,28 +246,41 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
    * Get the view of the file system for this table.
    */
   public TableFileSystemView getFileSystemView() {
-    return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
+    if (config.useFileListingMetadata()) {
+      return getFileSystemViewInternal(getCompletedCommitsTimeline());
+    } else {
+      return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
+    }
   }
 
   /**
    * Get the base file only view of the file system for this table.
    */
   public BaseFileOnlyView getBaseFileOnlyView() {
-    return getViewManager().getFileSystemView(metaClient);
+    return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
   }
 
   /**
    * Get the full view of the file system for this table.
    */
   public SliceView getSliceView() {
-    return getViewManager().getFileSystemView(metaClient);
+    return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
   }
 
   /**
    * Get complete view of the file system for this table with ability to force sync.
    */
   public SyncableFileSystemView getHoodieView() {
-    return getViewManager().getFileSystemView(metaClient);
+    return getFileSystemViewInternal(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
+  }
+
+  private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
+    if (config.useFileListingMetadata()) {
+      FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
+      return new HoodieMetadataFileSystemView(metaClient, this, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
+    } else {
+      return getViewManager().getFileSystemView(metaClient);
+    }
   }
 
   /**
@@ -640,4 +657,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
   public boolean requireSortedRecords() {
     return getBaseFileFormat() == HoodieFileFormat.HFILE;
   }
+
+  public HoodieTableMetadata metadata() {
+    if (metadata == null) {
+      metadata = HoodieTableMetadata.create(hadoopConfiguration.get(), config.getBasePath(), config.getSpillableMapBasePath(),
+          config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(), config.shouldAssumeDatePartitioning());
+    }
+    return metadata;
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index e24ae73..2a147f7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -88,6 +88,11 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
   private final HoodieTable<T, I, K, O> table;
   private final HoodieTableMetaClient metaClient;
 
+  /*
+  public HoodieTimelineArchiveLog(HoodieWriteConfig config, Configuration configuration) {
+    this(config, HoodieTable.create(config, configuration));
+  }*/
+
   public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
     this.config = config;
     this.table = table;
@@ -195,6 +200,20 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
         .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
             HoodieInstant.getComparableAction(i.getAction()))));
 
+    // If metadata table is enabled, do not archive instants which are more recent that the latest synced
+    // instant on the metadata table. This is required for metadata table sync.
+    if (config.useFileListingMetadata()) {
+      Option<String> lastSyncedInstantTime = table.metadata().getSyncedInstantTime();
+      if (lastSyncedInstantTime.isPresent()) {
+        LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
+        instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
+            lastSyncedInstantTime.get()));
+      } else {
+        LOG.info("Not archiving as there is no instants yet on the metadata table");
+        instants = Stream.empty();
+      }
+    }
+
     return instants.flatMap(hoodieInstant ->
         groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
             HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 4f9b2a2..31d433d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.clean;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.FileSlice;
@@ -180,14 +179,13 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
   }
 
   /**
-   * Scan and list all paritions for cleaning.
+   * Scan and list all partitions for cleaning.
    * @return all partitions paths for the dataset.
    * @throws IOException
    */
   private List<String> getPartitionPathsForFullCleaning() throws IOException {
     // Go to brute force mode of scanning all partitions
-    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(),
-        config.shouldAssumeDatePartitioning());
+    return hoodieTable.metadata().getAllPartitionPaths();
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
index 657057f..d3c2557 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
@@ -37,6 +38,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Performs rollback using marker files generated during the write..
@@ -119,10 +121,17 @@ public abstract class AbstractMarkerBasedRollbackStrategy<T extends HoodieRecord
       }
     }
 
+    Map<FileStatus, Long> filesToNumBlocksRollback = Collections.emptyMap();
+    if (config.useFileListingMetadata()) {
+      // When metadata is enabled, the information of files appended to is required
+      filesToNumBlocksRollback = Collections.singletonMap(
+          table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+          1L);
+    }
+
     return HoodieRollbackStat.newBuilder()
         .withPartitionPath(partitionPath)
-        // we don't use this field per se. Avoiding the extra file status call.
-        .withRollbackBlockAppendResults(Collections.emptyMap())
+        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
         .build();
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3e8952e..e7fda7d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -107,7 +107,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -126,7 +126,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 2b5e607..7b10843 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -98,7 +98,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -113,7 +113,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<List<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -124,7 +124,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -139,7 +139,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<List<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index f7e7690..ec98155 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -41,6 +43,8 @@ import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndex;
+import org.apache.hudi.metrics.DistributedRegistry;
+import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -51,6 +55,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -136,7 +141,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
@@ -150,7 +155,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
     table.validateUpsertSchema();
     setOperationType(WriteOperationType.UPSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -161,7 +166,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
     return postWrite(result, instantTime, table);
   }
@@ -172,7 +177,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -188,7 +193,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_OVERWRITE);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
     return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
   }
@@ -205,7 +210,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
     return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
   }
@@ -221,7 +226,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -232,7 +237,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
     table.validateInsertSchema();
     setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
-    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -394,4 +399,34 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
     }
     return table;
   }
+
+  @Override
+  public void syncTableMetadata() {
+    // Open up the metadata table again, for syncing
+    SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
+  }
+
+  @Override
+  protected void initWrapperFSMetrics() {
+    if (config.isMetricsOn()) {
+      Registry registry;
+      Registry registryMeta;
+      JavaSparkContext jsc = ((HoodieSparkEngineContext) context).getJavaSparkContext();
+
+      if (config.isExecutorMetricsEnabled()) {
+        // Create a distributed registry for HoodieWrapperFileSystem
+        registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registry).register(jsc);
+        registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder",
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registryMeta).register(jsc);
+      } else {
+        registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
+        registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder");
+      }
+
+      HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
+    }
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
new file mode 100644
index 0000000..22b3afd
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.AccumulatorV2;
+
+/**
+ * Lightweight Metrics Registry to track Hudi events.
+ */
+public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<String, Long>>
+    implements Registry, Serializable {
+  private String name;
+  ConcurrentHashMap<String, Long> counters = new ConcurrentHashMap<>();
+
+  public DistributedRegistry(String name) {
+    this.name = name;
+  }
+
+  public void register(JavaSparkContext jsc) {
+    if (!isRegistered()) {
+      jsc.sc().register(this);
+    }
+  }
+
+  @Override
+  public void clear() {
+    counters.clear();
+  }
+
+  @Override
+  public void increment(String name) {
+    counters.merge(name,  1L, (oldValue, newValue) -> oldValue + newValue);
+  }
+
+  @Override
+  public void add(String name, long value) {
+    counters.merge(name,  value, (oldValue, newValue) -> oldValue + newValue);
+  }
+
+  /**
+   * Get all Counter type metrics.
+   */
+  @Override
+  public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
+    HashMap<String, Long> countersMap = new HashMap<>();
+    counters.forEach((k, v) -> {
+      String key = prefixWithRegistryName ? name + "." + k : k;
+      countersMap.put(key, v);
+    });
+    return countersMap;
+  }
+
+  @Override
+  public void add(Map<String, Long> arg) {
+    arg.forEach((key, value) -> add(key, value));
+  }
+
+  @Override
+  public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
+    DistributedRegistry registry = new DistributedRegistry(name);
+    counters.forEach((key, value) -> registry.add(key, value));
+    return registry;
+  }
+
+  @Override
+  public boolean isZero() {
+    return counters.isEmpty();
+  }
+
+  @Override
+  public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
+    acc.value().forEach((key, value) -> add(key, value));
+  }
+
+  @Override
+  public void reset() {
+    counters.clear();
+  }
+
+  @Override
+  public Map<String, Long> value() {
+    return counters;
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java
new file mode 100644
index 0000000..950144b
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieMetadataMetrics;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
+
+  private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
+
+  public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
+    return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
+  }
+
+  SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    super(hadoopConf, writeConfig, engineContext);
+  }
+
+  @Override
+  protected void initRegistry() {
+    if (metadataWriteConfig.isMetricsOn()) {
+      Registry registry;
+      if (metadataWriteConfig.isExecutorMetricsEnabled()) {
+        registry = Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName());
+      } else {
+        registry = Registry.getRegistry("HoodieMetadata");
+      }
+      this.metrics = Option.of(new HoodieMetadataMetrics(registry));
+    } else {
+      this.metrics = Option.empty();
+    }
+  }
+
+  @Override
+  protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) {
+    try {
+      metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
+        if (registry instanceof DistributedRegistry) {
+          HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext;
+          ((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext());
+        }
+      });
+
+      if (enabled) {
+        bootstrapIfNeeded(engineContext, datasetMetaClient);
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
+      enabled = false;
+    }
+  }
+
+  @Override
+  protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
+    ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
+    metadata.closeReaders();
+
+    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+
+    try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
+      writeClient.startCommitWithTime(instantTime);
+      List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect();
+      statuses.forEach(writeStatus -> {
+        if (writeStatus.hasErrors()) {
+          throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
+        }
+      });
+      // trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
+      // delta commits synced over will not have an instant time lesser than the last completed instant on the
+      // metadata table.
+      if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
+        writeClient.compact(instantTime + "001");
+      }
+      writeClient.clean(instantTime + "002");
+    }
+
+    // Update total size of the metadata and count of base/log files
+    metrics.ifPresent(m -> {
+      try {
+        Map<String, String> stats = m.getStats(false, metaClient, metadata);
+        m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
+            Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
+            Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
+            Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
+      } catch (HoodieIOException e) {
+        LOG.error("Could not publish metadata size metrics", e);
+      }
+    });
+  }
+
+  /**
+   * Tag each record with the location.
+   *
+   * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
+   * base file.
+   */
+  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
+    HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, engineContext);
+    TableFileSystemView.SliceView fsView = table.getSliceView();
+    List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
+        .map(FileSlice::getBaseFile)
+        .filter(Option::isPresent)
+        .map(Option::get)
+        .collect(Collectors.toList());
+
+    // All the metadata fits within a single base file
+    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
+      if (baseFiles.size() > 1) {
+        throw new HoodieMetadataException("Multiple base files found in metadata partition");
+      }
+    }
+
+    JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
+    String fileId;
+    String instantTime;
+    if (!baseFiles.isEmpty()) {
+      fileId = baseFiles.get(0).getFileId();
+      instantTime = baseFiles.get(0).getCommitTime();
+    } else {
+      // If there is a log file then we can assume that it has the data
+      List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
+          .map(FileSlice::getLatestLogFile)
+          .filter(Option::isPresent)
+          .map(Option::get)
+          .collect(Collectors.toList());
+      if (logFiles.isEmpty()) {
+        // No base and log files. All are new inserts
+        return jsc.parallelize(records, 1);
+      }
+
+      fileId = logFiles.get(0).getFileId();
+      instantTime = logFiles.get(0).getBaseCommitTime();
+    }
+
+    return jsc.parallelize(records, 1).map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId)));
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
new file mode 100644
index 0000000..e5b1b9f
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
@@ -0,0 +1,801 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieMetadataConfig;
+import org.apache.hudi.config.HoodieMetricsConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestHoodieFsMetadata extends HoodieClientTestHarness {
+  private static final Logger LOG = LogManager.getLogger(TestHoodieFsMetadata.class);
+
+  @TempDir
+  public java.nio.file.Path tempFolder;
+
+  private String metadataTableBasePath;
+
+  private HoodieTableType tableType;
+
+  public void init(HoodieTableType tableType) throws IOException {
+    this.tableType = tableType;
+    initPath();
+    initSparkContexts("TestHoodieMetadata");
+    initFileSystem();
+    fs.mkdirs(new Path(basePath));
+    initMetaClient();
+    initTestDataGenerator();
+    metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
+
+  }
+
+  @AfterEach
+  public void clean() throws IOException {
+    cleanupResources();
+  }
+
+  /**
+   * Metadata Table should not be created unless it is enabled in config.
+   */
+  @Test
+  public void testDefaultNoMetadataTable() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Metadata table should not exist until created for the first time
+    assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
+    assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
+
+    // Metadata table is not created if disabled by config
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      client.startCommitWithTime("001");
+      assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
+      assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
+    }
+
+    // Metadata table created when enabled by config
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
+      client.startCommitWithTime("001");
+      assertTrue(fs.exists(new Path(metadataTableBasePath)));
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Only valid partition directories are added to the metadata.
+   */
+  @Test
+  public void testOnlyValidPartitionsAdded() throws Exception {
+    // This test requires local file system
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Create an empty directory which is not a partition directory (lacks partition metadata)
+    final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
+    Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
+
+    // Create some commits
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    testTable.withPartitionMetaFiles("p1", "p2")
+        .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
+        .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10)
+        .addInflightCommit("003").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      client.startCommitWithTime("005");
+
+      List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
+      assertFalse(partitions.contains(nonPartitionDirectory),
+          "Must not contain the non-partition " + nonPartitionDirectory);
+      assertTrue(partitions.contains("p1"), "Must contain partition p1");
+      assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+      FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
+      assertTrue(statuses.length == 2);
+      statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
+      assertTrue(statuses.length == 5);
+    }
+  }
+
+  /**
+   * Test various table operations sync to Metadata Table correctly.
+   */
+  //@ParameterizedTest
+  //@EnumSource(HoodieTableType.class)
+  //public void testTableOperations(HoodieTableType tableType) throws Exception {
+  @Test
+  public void testTableOperations() throws Exception {
+    //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+
+      // Write 1 (Bulk insert)
+      String newCommitTime = "001";
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Write 2 (inserts)
+      newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+      validateMetadata(client);
+
+      records = dataGen.generateInserts(newCommitTime, 20);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Write 3 (updates)
+      newCommitTime = "003";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Write 4 (updates and inserts)
+      newCommitTime = "004";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Compaction
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = "005";
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+        validateMetadata(client);
+      }
+
+      // Write 5 (updates and inserts)
+      newCommitTime = "006";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 5);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Compaction
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = "007";
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+        validateMetadata(client);
+      }
+
+      // Deletes
+      newCommitTime = "008";
+      records = dataGen.generateDeletes(newCommitTime, 10);
+      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
+      client.startCommitWithTime(newCommitTime);
+      client.delete(deleteKeys, newCommitTime);
+      validateMetadata(client);
+
+      // Clean
+      newCommitTime = "009";
+      client.clean(newCommitTime);
+      validateMetadata(client);
+
+      // Restore
+      client.restoreToInstant("006");
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Test rollback of various table operations sync to Metadata Table correctly.
+   */
+  //@ParameterizedTest
+  //@EnumSource(HoodieTableType.class)
+  //public void testRollbackOperations(HoodieTableType tableType) throws Exception {
+  @Test
+  public void testRollbackOperations() throws Exception {
+    //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      // Write 1 (Bulk insert)
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Rollback of inserts
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateInserts(newCommitTime, 20);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+
+      // Rollback of updates
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+
+      // Rollback of updates and inserts
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+
+      // Rollback of Compaction
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+        validateMetadata(client);
+      }
+
+      // Rollback of Deletes
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      records = dataGen.generateDeletes(newCommitTime, 10);
+      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
+      client.startCommitWithTime(newCommitTime);
+      writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+
+      // Rollback of Clean
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.clean(newCommitTime);
+      validateMetadata(client);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+
+    }
+
+    // Rollback of partial commits
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
+        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) {
+      // Write updates and inserts
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+    }
+
+    // Marker based rollback of partial commits
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
+        getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) {
+      // Write updates and inserts
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.rollback(newCommitTime);
+      client.syncTableMetadata();
+      validateMetadata(client);
+    }
+
+  }
+
+  /**
+   * Test sync of table operations.
+   */
+  //@ParameterizedTest
+  //@EnumSource(HoodieTableType.class)
+  //public void testSync(HoodieTableType tableType) throws Exception {
+  @Test
+  public void testSync() throws Exception {
+    //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    String newCommitTime;
+    List<HoodieRecord> records;
+    List<WriteStatus> writeStatuses;
+
+    // Initial commits without metadata table enabled
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      records = dataGen.generateInserts(newCommitTime, 5);
+      client.startCommitWithTime(newCommitTime);
+      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      records = dataGen.generateInserts(newCommitTime, 5);
+      client.startCommitWithTime(newCommitTime);
+      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+    }
+
+    // Enable metadata table so it initialized by listing from file system
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      // inserts
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateInserts(newCommitTime, 5);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+
+      validateMetadata(client);
+      assertTrue(metadata(client).isInSync());
+    }
+
+    // Various table operations without metadata table enabled
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      // updates
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUniqueUpdates(newCommitTime, 5);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      assertFalse(metadata(client).isInSync());
+
+      // updates and inserts
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      assertFalse(metadata(client).isInSync());
+
+      // Compaction
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+        assertFalse(metadata(client).isInSync());
+      }
+
+      // Savepoint
+      String savepointInstant = newCommitTime;
+      if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
+        client.savepoint("hoodie", "metadata test");
+        assertFalse(metadata(client).isInSync());
+      }
+
+      // Deletes
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      records = dataGen.generateDeletes(newCommitTime, 5);
+      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
+      client.startCommitWithTime(newCommitTime);
+      client.delete(deleteKeys, newCommitTime);
+      assertFalse(metadata(client).isInSync());
+
+      // Clean
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.clean(newCommitTime);
+      assertFalse(metadata(client).isInSync());
+
+      // updates
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      assertFalse(metadata(client).isInSync());
+
+      client.restoreToInstant(savepointInstant);
+      assertFalse(metadata(client).isInSync());
+    }
+
+
+    // Enable metadata table and ensure it is synced
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+
+      validateMetadata(client);
+      assertTrue(metadata(client).isInSync());
+    }
+
+  }
+
+  /**
+   * Instants on Metadata Table should be archived as per config.
+   * Metadata Table should be automatically compacted as per config.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans =  {false})
+  public void testCleaningArchivingAndCompaction(boolean asyncClean) throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    final int maxDeltaCommitsBeforeCompaction = 4;
+    HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+            .archiveCommitsWith(6, 8).retainCommits(1)
+            .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3)
+            .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build())
+        .build();
+
+    List<HoodieRecord> records;
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
+      for (int i = 1; i < 10; ++i) {
+        String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        if (i == 1) {
+          records = dataGen.generateInserts(newCommitTime, 5);
+        } else {
+          records = dataGen.generateUpdates(newCommitTime, 2);
+        }
+        client.startCommitWithTime(newCommitTime);
+        List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+        assertNoWriteErrors(writeStatuses);
+        validateMetadata(client);
+      }
+    }
+
+    HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+    HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline();
+    // check that there are 2 compactions.
+    assertEquals(2, metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants());
+    // check that cleaning has, once after each compaction. There will be more instances on the timeline, since it's less aggressively archived
+    assertEquals(4, metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants());
+    // ensure archiving has happened
+    List<HoodieInstant> instants = metadataTimeline.getCommitsAndCompactionTimeline()
+        .getInstants().collect(Collectors.toList());
+    Collections.reverse(instants);
+    long numDeltaCommits = instants.stream().filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)).count();
+    assertEquals(5, numDeltaCommits);
+  }
+
+  /**
+   * Test various error scenarios.
+   */
+  @Test
+  public void testErrorCases() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
+    // should be rolled back to last valid commit.
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateInserts(newCommitTime, 5);
+      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+
+      // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
+      // instant so that only the inflight is left over.
+      String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime);
+      assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
+          commitInstantFileName), false));
+    }
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
+      // Start the next commit which will rollback the previous one and also should update the metadata table by
+      // updating it with HoodieRollbackMetadata.
+      String newCommitTime = client.startCommit();
+
+      // Dangling commit but metadata should be valid at this time
+      validateMetadata(client);
+
+      // Next insert
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+
+      // Post rollback commit and metadata should be valid
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Test non-partitioned datasets.
+   */
+  @Test
+  public void testNonPartitioned() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""});
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      // Write 1 (Bulk insert)
+      String newCommitTime = "001";
+      List<HoodieRecord> records = nonPartitionedGenerator.generateInserts(newCommitTime, 10);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      validateMetadata(client);
+
+      List<String> metadataPartitions = metadata(client).getAllPartitionPaths();
+      assertTrue(metadataPartitions.contains(""), "Must contain empty partition");
+    }
+  }
+
+  /**
+   * Test various metrics published by metadata table.
+   */
+  @Test
+  public void testMetadataMetrics() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) {
+      // Write
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
+      assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration"));
+      assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count"), 1L);
+      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
+      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
+    }
+  }
+
+  /**
+   * Validate the metadata tables contents to ensure it matches what is on the file system.
+   *
+   * @throws IOException
+   */
+  private void validateMetadata(SparkRDDWriteClient client) throws IOException {
+    HoodieWriteConfig config = client.getConfig();
+    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+    assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
+    if (!config.useFileListingMetadata()) {
+      return;
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Validate write config for metadata table
+    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
+    assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
+    assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
+
+    // Metadata table should be in sync with the dataset
+    assertTrue(metadata(client).isInSync());
+
+    // Partitions should match
+    List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath);
+    List<String> metadataPartitions = metadataWriter.metadata().getAllPartitionPaths();
+
+    Collections.sort(fsPartitions);
+    Collections.sort(metadataPartitions);
+
+    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match");
+    assertTrue(fsPartitions.equals(metadataPartitions), "Partitions should match");
+
+    // Files within each partition should match
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieSparkTable.create(config, engineContext);
+    TableFileSystemView tableView = table.getHoodieView();
+    fsPartitions.forEach(partition -> {
+      try {
+        Path partitionPath;
+        if (partition.equals("")) {
+          // Should be the non-partitioned case
+          partitionPath = new Path(basePath);
+        } else {
+          partitionPath = new Path(basePath, partition);
+        }
+        FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
+        FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath);
+        List<String> fsFileNames = Arrays.stream(fsStatuses)
+            .map(s -> s.getPath().getName()).collect(Collectors.toList());
+        List<String> metadataFilenames = Arrays.stream(metaStatuses)
+            .map(s -> s.getPath().getName()).collect(Collectors.toList());
+        Collections.sort(fsFileNames);
+        Collections.sort(metadataFilenames);
+
+        // File sizes should be valid
+        Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
+
+        if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
+          LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
+          LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
+        }
+        assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
+        assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
+
+        // FileSystemView should expose the same data
+        List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
+
+        fileGroups.forEach(g -> LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
+        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
+        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
+
+        long numFiles = fileGroups.stream()
+            .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
+            .sum();
+        assertEquals(metadataFilenames.size(), numFiles);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+        assertTrue(false, "Exception should not be raised: " + e);
+      }
+    });
+
+    HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+
+    // Metadata table should be in sync with the dataset
+    assertTrue(metadataWriter.metadata().isInSync());
+
+    // Metadata table is MOR
+    assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
+
+    // Metadata table is HFile format
+    assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE,
+        "Metadata Table base file format should be HFile");
+
+    // Metadata table has a fixed number of partitions
+    // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory
+    // in the .hoodie folder.
+    List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(fs, HoodieTableMetadata.getMetadataTableBasePath(basePath),
+        false);
+    assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size());
+
+    // Metadata table should automatically compact and clean
+    // versions are +1 as autoclean / compaction happens end of commits
+    int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
+    metadataTablePartitions.forEach(partition -> {
+      List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
+      assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
+      assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
+      assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
+          + numFileVersions + " but was " + latestSlices.size());
+    });
+
+    LOG.info("Validation time=" + timer.endTimer());
+  }
+
+  private HoodieBackedTableMetadataWriter metadataWriter(SparkRDDWriteClient client) {
+    return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter
+        .create(hadoopConf, client.getConfig(), new HoodieSparkEngineContext(jsc));
+  }
+
+  private HoodieBackedTableMetadata metadata(SparkRDDWriteClient client) {
+    HoodieWriteConfig clientConfig = client.getConfig();
+    return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
+        clientConfig.useFileListingMetadata(), clientConfig.getFileListingMetadataVerify(), false, clientConfig.shouldAssumeDatePartitioning());
+  }
+
+  // TODO: this can be moved to TestHarness after merge from master
+  private void assertNoWriteErrors(List<WriteStatus> statuses) {
+    // Verify there are no errors
+    for (WriteStatus status : statuses) {
+      assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId());
+    }
+  }
+
+  private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) {
+    return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build();
+  }
+
+  private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
+        .withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+            .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
+        .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+        .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
+            .withExecutorMetrics(true).usePrefix("unit-test").build());
+  }
+
+  @Override
+  protected HoodieTableType getTableType() {
+    return tableType;
+  }
+}
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 4fbc63d..f1e8e90 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -89,6 +89,7 @@
             <import>${basedir}/src/main/avro/HoodieClusteringStrategy.avsc</import>
             <import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
             <import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
+            <import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
           </imports>
         </configuration>
       </plugin>
diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc
new file mode 100644
index 0000000..bf85587
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieMetadata.avsc
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+    "namespace": "org.apache.hudi.avro.model",
+    "type": "record",
+    "name": "HoodieMetadataRecord",
+    "doc": "A record saved within the Metadata Table",
+    "fields": [
+        {
+            "name": "key",
+            "type": "string"
+        },
+        {
+            "name": "type",
+            "doc": "Type of the metadata record",
+            "type": "int"
+        },
+        {   "name": "filesystemMetadata",
+            "doc": "Contains information about partitions and files within the dataset",
+            "type": ["null", {
+               "type": "map",
+               "values": {
+                    "type": "record",
+                    "name": "HoodieMetadataFileInfo",
+                    "fields": [
+                        {
+                            "name": "size",
+                            "type": "long",
+                            "doc": "Size of the file"
+                        },
+                        {
+                            "name": "isDeleted",
+                            "type": "boolean",
+                            "doc": "True if this file has been deleted"
+                        }
+                    ]
+                }
+            }]
+        }
+    ]
+}
diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
index a6bd4c2..069881e 100644
--- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
@@ -30,7 +30,15 @@
         "fields": [
             {"name": "partitionPath", "type": "string"},
             {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
-            {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
+            {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}},
+            {"name": "appendFiles", "type": {
+                "type": "map",
+                "doc": "Files to which append blocks were written",
+                "values": {
+                    "type": "long",
+                    "doc": "Size of this file in bytes"
+                }
+            }}
         ]
      }
      }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 0ce5573..94d05b3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -46,6 +46,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import java.util.List;
 import java.util.Map.Entry;
@@ -193,8 +195,17 @@ public class FSUtils {
   /**
    * Obtain all the partition paths, that are present in this table, denoted by presence of
    * {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}.
+   *
+   * If the basePathStr is a subdirectory of .hoodie folder then we assume that the partitions of an internal
+   * table (a hoodie table within the .hoodie directory) are to be obtained.
+   *
+   * @param fs FileSystem instance
+   * @param basePathStr base directory
    */
   public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException {
+    // If the basePathStr is a folder within the .hoodie directory then we are listing partitions within an
+    // internal table.
+    final boolean isMetadataTable = basePathStr.contains(HoodieTableMetaClient.METAFOLDER_NAME);
     final Path basePath = new Path(basePathStr);
     final List<String> partitions = new ArrayList<>();
     processFiles(fs, basePathStr, (locatedFileStatus) -> {
@@ -203,7 +214,7 @@ public class FSUtils {
         partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
       }
       return true;
-    }, true);
+    }, !isMetadataTable);
     return partitions;
   }
 
@@ -385,6 +396,20 @@ public class FSUtils {
   }
 
   /**
+   * Get the names of all the base and log files in the given partition path.
+   */
+  public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partitionPath) throws IOException {
+    final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values())
+        .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
+    final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension();
+
+    return Arrays.stream(fs.listStatus(partitionPath, path -> {
+      String extension = FSUtils.getFileExtension(path.getName());
+      return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);
+    })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
+  }
+
+  /**
    * Get the latest log file written from the list of log files passed in.
    */
   public static Option<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index c3f6189..1faaad5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -65,15 +67,56 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
 
-  private enum MetricName {
-    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles
+  protected enum MetricName {
+    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write
   }
 
+  private static Registry METRICS_REGISTRY_DATA;
+  private static Registry METRICS_REGISTRY_META;
+
+  public static void setMetricsRegistry(Registry registry, Registry registryMeta) {
+    METRICS_REGISTRY_DATA = registry;
+    METRICS_REGISTRY_META = registryMeta;
+  }
+
+
   private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private Registry metricsRegistry = Registry.getRegistry(this.getClass().getSimpleName());
+
+  @FunctionalInterface
+  public interface CheckedFunction<R> {
+    R get() throws IOException;
+  }
+
+  private static Registry getMetricRegistryForPath(Path p) {
+    return ((p != null) && (p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)))
+        ? METRICS_REGISTRY_META : METRICS_REGISTRY_DATA;
+  }
+
+  protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, CheckedFunction<R> func) throws IOException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    R res = func.get();
+
+    Registry registry = getMetricRegistryForPath(p);
+    if (registry != null) {
+      registry.increment(metricName);
+      registry.add(metricName + ".totalDuration", timer.endTimer());
+    }
+
+    return res;
+  }
+
+  protected static <R> R executeFuncWithTimeAndByteMetrics(String metricName, Path p, long byteCount,
+                                                           CheckedFunction<R> func) throws IOException {
+    Registry registry = getMetricRegistryForPath(p);
+    if (registry != null) {
+      registry.add(metricName + ".totalBytes", byteCount);
+    }
+
+    return executeFuncWithTimeMetrics(metricName, p, func);
+  }
 
   public HoodieWrapperFileSystem() {}
 
@@ -140,16 +183,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return fileSystem.open(convertToDefaultPath(f), bufferSize);
+    return wrapInputStream(f, fileSystem.open(convertToDefaultPath(f), bufferSize));
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
-      short replication, long blockSize, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    final Path translatedPath = convertToDefaultPath(f);
-    return wrapOutputStream(f,
-        fileSystem.create(translatedPath, permission, overwrite, bufferSize, replication, blockSize, progress));
+                                   short replication, long blockSize, Progressable progress) throws IOException {
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      final Path translatedPath = convertToDefaultPath(f);
+      return wrapOutputStream(f,
+          fileSystem.create(translatedPath, permission, overwrite, bufferSize, replication, blockSize, progress));
+    });
   }
 
   private FSDataOutputStream wrapOutputStream(final Path path, FSDataOutputStream fsDataOutputStream)
@@ -164,79 +208,97 @@ public class HoodieWrapperFileSystem extends FileSystem {
     return os;
   }
 
+  private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream fsDataInputStream) throws IOException {
+    if (fsDataInputStream instanceof TimedFSDataInputStream) {
+      return fsDataInputStream;
+    }
+    return new TimedFSDataInputStream(path, fsDataInputStream);
+  }
+
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
       throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize,
-      Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress));
+                                   Progressable progress) throws IOException {
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
-      short replication, long blockSize, Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize, progress));
+                                   short replication, long blockSize, Progressable progress) throws IOException {
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
-      short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication,
-        blockSize, progress, checksumOpt));
+                                   short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), permission, flags, bufferSize, replication,
+          blockSize, progress, checksumOpt));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
       throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize));
+    });
   }
 
   @Override
@@ -246,50 +308,53 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    this.metricsRegistry.increment(MetricName.rename.name());
-    try {
-      consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
-    } catch (TimeoutException e) {
-      throw new HoodieException("Timed out waiting for " + src + " to appear", e);
-    }
-
-    boolean success = fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst));
-
-    if (success) {
+    return executeFuncWithTimeMetrics(MetricName.rename.name(), src, () -> {
       try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
       } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + dst + " to appear", e);
+        throw new HoodieException("Timed out waiting for " + src + " to appear", e);
       }
 
-      try {
-        consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + src + " to disappear", e);
+      boolean success = fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst));
+
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + dst + " to appear", e);
+        }
+
+        try {
+          consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + src + " to disappear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
-    this.metricsRegistry.increment(MetricName.delete.name());
-    boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
-
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileDisappears(f);
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + f + " to disappear", e);
+    return executeFuncWithTimeMetrics(MetricName.delete.name(), f, () -> {
+      boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
+
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileDisappears(f);
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + f + " to disappear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertToDefaultPath(f));
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), f, () -> {
+      return fileSystem.listStatus(convertToDefaultPath(f));
+    });
   }
 
   @Override
@@ -304,27 +369,29 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    this.metricsRegistry.increment(MetricName.mkdirs.name());
-    boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+    return executeFuncWithTimeMetrics(MetricName.mkdirs.name(), f, () -> {
+      boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.getFileStatus.name());
-    try {
-      consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-    } catch (TimeoutException e) {
-      // pass
-    }
-    return fileSystem.getFileStatus(convertToDefaultPath(f));
+    return executeFuncWithTimeMetrics(MetricName.getFileStatus.name(), f, () -> {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+      } catch (TimeoutException e) {
+        // pass
+      }
+      return fileSystem.getFileStatus(convertToDefaultPath(f));
+    });
   }
 
   @Override
@@ -389,12 +456,12 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f) throws IOException {
-    return fileSystem.open(convertToDefaultPath(f));
+    return wrapInputStream(f, fileSystem.open(convertToDefaultPath(f)));
   }
 
   @Override
   public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication,
-      long blockSize, Progressable progress) throws IOException {
+                                               long blockSize, Progressable progress) throws IOException {
     Path p = convertToDefaultPath(f);
     return wrapOutputStream(p,
         fileSystem.createNonRecursive(p, overwrite, bufferSize, replication, blockSize, progress));
@@ -402,7 +469,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize,
-      short replication, long blockSize, Progressable progress) throws IOException {
+                                               short replication, long blockSize, Progressable progress) throws IOException {
     Path p = convertToDefaultPath(f);
     return wrapOutputStream(p,
         fileSystem.createNonRecursive(p, permission, overwrite, bufferSize, replication, blockSize, progress));
@@ -410,7 +477,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
-      int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+                                               int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
     Path p = convertToDefaultPath(f);
     return wrapOutputStream(p,
         fileSystem.createNonRecursive(p, permission, flags, bufferSize, replication, blockSize, progress));
@@ -462,8 +529,9 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean delete(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.delete.name());
-    return delete(f, true);
+    return executeFuncWithTimeMetrics(MetricName.delete.name(), f, () -> {
+      return delete(f, true);
+    });
   }
 
   @Override
@@ -508,32 +576,37 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertToDefaultPath(f), filter);
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), f, () -> {
+      return fileSystem.listStatus(convertToDefaultPath(f), filter);
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path[] files) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertDefaults(files));
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), files.length > 0 ? files[0] : null, () -> {
+      return fileSystem.listStatus(convertDefaults(files));
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertDefaults(files), filter);
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), files.length > 0 ? files[0] : null, () -> {
+      return fileSystem.listStatus(convertDefaults(files), filter);
+    });
   }
 
   @Override
   public FileStatus[] globStatus(Path pathPattern) throws IOException {
-    this.metricsRegistry.increment(MetricName.globStatus.name());
-    return fileSystem.globStatus(convertToDefaultPath(pathPattern));
+    return executeFuncWithTimeMetrics(MetricName.globStatus.name(), pathPattern, () -> {
+      return fileSystem.globStatus(convertToDefaultPath(pathPattern));
+    });
   }
 
   @Override
   public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
-    this.metricsRegistry.increment(MetricName.globStatus.name());
-    return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
+    return executeFuncWithTimeMetrics(MetricName.globStatus.name(), pathPattern, () -> {
+      return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
+    });
   }
 
   @Override
@@ -543,8 +616,9 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
-    this.metricsRegistry.increment(MetricName.listFiles.name());
-    return fileSystem.listFiles(convertToDefaultPath(f), recursive);
+    return executeFuncWithTimeMetrics(MetricName.listFiles.name(), f, () -> {
+      return fileSystem.listFiles(convertToDefaultPath(f), recursive);
+    });
   }
 
   @Override
@@ -554,16 +628,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.mkdirs.name());
-    boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+    return executeFuncWithTimeMetrics(MetricName.mkdirs.name(), f, () -> {
+      boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for directory " + f + " to appear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
index 0b70bed..6869be8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
@@ -43,7 +43,7 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   private final ConsistencyGuard consistencyGuard;
 
   public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, ConsistencyGuard consistencyGuard,
-      Runnable closeCallback) throws IOException {
+                                     Runnable closeCallback) throws IOException {
     super(out, null);
     this.path = path;
     this.closeCallback = closeCallback;
@@ -52,14 +52,22 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
 
   @Override
   public synchronized void write(byte[] b, int off, int len) throws IOException {
-    bytesWritten.addAndGet(len);
-    super.write(b, off, len);
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.write.name(), path,
+        len, () -> {
+          bytesWritten.addAndGet(len);
+          super.write(b, off, len);
+          return null;
+        });
   }
 
   @Override
   public void write(byte[] b) throws IOException {
-    bytesWritten.addAndGet(b.length);
-    super.write(b);
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.write.name(), path,
+        b.length, () -> {
+          bytesWritten.addAndGet(b.length);
+          super.write(b);
+          return null;
+        });
   }
 
   @Override
@@ -76,5 +84,4 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   public long getBytesWritten() {
     return bytesWritten.get();
   }
-
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
new file mode 100644
index 0000000..eca8ec3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+/**
+ * Wrapper over <code>FSDataInputStream</code> that also times the operations.
+ */
+public class TimedFSDataInputStream extends FSDataInputStream {
+
+  // Path
+  private final Path path;
+
+  public TimedFSDataInputStream(Path path, FSDataInputStream in) {
+    super(in);
+    this.path = path;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, 0, () -> super.read(buf));
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, length, () -> super.read(position, buffer, offset, length));
+  }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
+      throws IOException, UnsupportedOperationException {
+    return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, maxLength, () -> super.read(bufferPool, maxLength, opts));
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, buffer.length, () -> {
+          super.readFully(position, buffer);
+          return null;
+        });
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, length, () -> {
+          super.readFully(position, buffer, offset, length);
+          return null;
+        });
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
similarity index 58%
copy from hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
copy to hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 169e8bc..4fdf9f6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -22,76 +22,36 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Registry that tracks metrics local to a single jvm process.
  */
-public class Registry {
+public class LocalRegistry implements Registry {
   ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new ConcurrentHashMap<>();
+  private final String name;
 
-  private Registry(String name) {
+  public LocalRegistry(String name) {
     this.name = name;
   }
 
-  /**
-   * Get (or create) the registry for a provided name.
-   */
-  public static synchronized Registry getRegistry(String registryName) {
-    if (!registryMap.containsKey(registryName)) {
-      registryMap.put(registryName, new Registry(registryName));
-    }
-    return registryMap.get(registryName);
-  }
-
-  /**
-   * Get all registered metrics.
-   * @param flush clean all metrics as part of this operation.
-   * @param prefixWithRegistryName prefix each metric name with the registry name.
-   * @return
-   */
-  public static synchronized Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
-    HashMap<String, Long> allMetrics = new HashMap<>();
-    registryMap.forEach((registryName, registry) -> {
-      allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
-      if (flush) {
-        registry.clear();
-      }
-    });
-    return allMetrics;
-  }
-
+  @Override
   public void clear() {
     counters.clear();
   }
 
+  @Override
   public void increment(String name) {
     getCounter(name).increment();
   }
 
+  @Override
   public void add(String name, long value) {
     getCounter(name).add(value);
   }
 
-  private synchronized Counter getCounter(String name) {
-    if (!counters.containsKey(name)) {
-      counters.put(name, new Counter());
-    }
-    return counters.get(name);
-  }
-
-  /**
-   * Get all Counter type metrics.
-   */
-  public Map<String, Long> getAllCounts() {
-    return getAllCounts(false);
-  }
-
   /**
    * Get all Counter type metrics.
    */
+  @Override
   public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
     HashMap<String, Long> countersMap = new HashMap<>();
     counters.forEach((k, v) -> {
@@ -101,4 +61,10 @@ public class Registry {
     return countersMap;
   }
 
-}
\ No newline at end of file
+  private synchronized Counter getCounter(String name) {
+    if (!counters.containsKey(name)) {
+      counters.put(name, new Counter());
+    }
+    return counters.get(name);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
index 169e8bc..4ac1e61 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -18,87 +18,99 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
-  ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
+public interface Registry extends Serializable {
 
-  private static ConcurrentHashMap<String, Registry> registryMap = new ConcurrentHashMap<>();
+  ConcurrentHashMap<String, Registry> REGISTRY_MAP = new ConcurrentHashMap<>();
 
-  private Registry(String name) {
-    this.name = name;
+  /**
+   * Get (or create) the registry for a provided name.
+   *
+   * This function creates a {@code LocalRegistry}.
+   *
+   * @param registryName Name of the registry
+   */
+  static Registry getRegistry(String registryName) {
+    return getRegistry(registryName, LocalRegistry.class.getName());
   }
 
   /**
-   * Get (or create) the registry for a provided name.
+   * Get (or create) the registry for a provided name and given class.
+   *
+   * @param registryName Name of the registry.
+   * @param clazz The fully qualified name of the registry class to create.
    */
-  public static synchronized Registry getRegistry(String registryName) {
-    if (!registryMap.containsKey(registryName)) {
-      registryMap.put(registryName, new Registry(registryName));
+  static Registry getRegistry(String registryName, String clazz) {
+    synchronized (Registry.class) {
+      if (!REGISTRY_MAP.containsKey(registryName)) {
+        Registry registry = (Registry)ReflectionUtils.loadClass(clazz, registryName);
+        REGISTRY_MAP.put(registryName, registry);
+      }
+      return REGISTRY_MAP.get(registryName);
     }
-    return registryMap.get(registryName);
   }
 
   /**
    * Get all registered metrics.
-   * @param flush clean all metrics as part of this operation.
+   *
+   * @param flush clear all metrics after this operation.
    * @param prefixWithRegistryName prefix each metric name with the registry name.
    * @return
    */
-  public static synchronized Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
-    HashMap<String, Long> allMetrics = new HashMap<>();
-    registryMap.forEach((registryName, registry) -> {
-      allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
-      if (flush) {
-        registry.clear();
-      }
-    });
-    return allMetrics;
-  }
-
-  public void clear() {
-    counters.clear();
+  static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) {
+    synchronized (Registry.class) {
+      HashMap<String, Long> allMetrics = new HashMap<>();
+      REGISTRY_MAP.forEach((registryName, registry) -> {
+        allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
+        if (flush) {
+          registry.clear();
+        }
+      });
+      return allMetrics;
+    }
   }
 
-  public void increment(String name) {
-    getCounter(name).increment();
-  }
+  /**
+   * Clear all metrics.
+   */
+  void clear();
 
-  public void add(String name, long value) {
-    getCounter(name).add(value);
-  }
+  /**
+   * Increment the metric.
+   *
+   * @param name Name of the metric to increment.
+   */
+  void increment(String name);
 
-  private synchronized Counter getCounter(String name) {
-    if (!counters.containsKey(name)) {
-      counters.put(name, new Counter());
-    }
-    return counters.get(name);
-  }
+  /**
+   * Add value to the metric.
+   *
+   * @param name Name of the metric.
+   * @param value The value to add to the metrics.
+   */
+  void add(String name, long value);
 
   /**
    * Get all Counter type metrics.
    */
-  public Map<String, Long> getAllCounts() {
+  default Map<String, Long> getAllCounts() {
     return getAllCounts(false);
   }
 
   /**
    * Get all Counter type metrics.
+   *
+   * @param prefixWithRegistryName If true, the names of all metrics are prefixed with name of this registry.
    */
-  public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
-    HashMap<String, Long> countersMap = new HashMap<>();
-    counters.forEach((k, v) -> {
-      String key = prefixWithRegistryName ? name + "." + k : k;
-      countersMap.put(key, v.getValue());
-    });
-    return countersMap;
-  }
-
-}
\ No newline at end of file
+  Map<String, Long> getAllCounts(boolean prefixWithRegistryName);
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 4ae709e..6fb0a05 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -80,7 +80,7 @@ public abstract class AbstractHoodieLogRecordScanner {
   // Merge strategy to use when combining records from log
   private final String payloadClassFQN;
   // Log File Paths
-  private final List<String> logFilePaths;
+  protected final List<String> logFilePaths;
   // Read Lazily flag
   private final boolean readBlocksLazily;
   // Reverse reader - Not implemented yet (NA -> Why do we need ?)
@@ -148,7 +148,8 @@ public abstract class AbstractHoodieLogRecordScanner {
         switch (r.getBlockType()) {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath());
+            LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+                + r.getLogBlockHeader().get(INSTANT_TIME));
             if (isNewInstantBlock(r) && !readBlocksLazily) {
               // If this is an avro data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
@@ -202,8 +203,7 @@ public abstract class AbstractHoodieLogRecordScanner {
                     LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
-                  } else if (lastBlock.getBlockType() != CORRUPT_BLOCK
-                      && targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
+                  } else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
                     // rollback last data block or delete block
                     LOG.info("Rolling back the last log block read in " + logFile.getPath());
                     currentInstantLogBlocks.pop();
@@ -278,12 +278,14 @@ public abstract class AbstractHoodieLogRecordScanner {
     List<IndexedRecord> recs = dataBlock.getRecords();
     totalLogRecords.addAndGet(recs.size());
     for (IndexedRecord rec : recs) {
-      HoodieRecord<? extends HoodieRecordPayload> hoodieRecord =
-          SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
-      processNextRecord(hoodieRecord);
+      processNextRecord(createHoodieRecord(rec));
     }
   }
 
+  protected HoodieRecord<?> createHoodieRecord(IndexedRecord rec) {
+    return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
+  }
+
   /**
    * Process next record.
    *
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 14d523a..5bd43ac 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.TimedFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -73,8 +74,8 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
       boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
     if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize));
+      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
       // need to wrap in another BufferedFSInputStream the make bufferSize work?
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 18f2167..e99859e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -57,39 +57,56 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
   private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
 
   // Final map of compacted/merged records
-  private final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
+  protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
 
   // count of merged records in log
   private long numMergedRecordsInLog;
+  private long maxMemorySizeInBytes;
 
   // Stores the total time taken to perform reading and merging of log blocks
-  private final long totalTimeTakenToReadAndMergeBlocks;
+  private long totalTimeTakenToReadAndMergeBlocks;
   // A timer for calculating elapsed time in millis
   public final HoodieTimer timer = new HoodieTimer();
 
   @SuppressWarnings("unchecked")
   public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
-      String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, boolean reverseReader,
-      int bufferSize, String spillableMapBasePath) {
+                                      String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
+                                      boolean reverseReader, int bufferSize, String spillableMapBasePath) {
+    this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily,
+        reverseReader, bufferSize, spillableMapBasePath, true);
+  }
+
+  @SuppressWarnings("unchecked")
+  public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
+                                      String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
+                                      boolean reverseReader, int bufferSize, String spillableMapBasePath, boolean autoScan) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize);
     try {
       // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema));
-      // Do the scan and merge
-      timer.startTimer();
-      scan();
-      this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
-      this.numMergedRecordsInLog = records.size();
-      LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
-      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
-      LOG.info(
-          "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
-      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
-      LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
     } catch (IOException e) {
-      throw new HoodieIOException("IOException when reading log file ", e);
+      throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
     }
+
+    if (autoScan) {
+      performScan();
+    }
+  }
+
+  protected void performScan() {
+    // Do the scan and merge
+    timer.startTimer();
+    scan();
+    this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
+    this.numMergedRecordsInLog = records.size();
+    LOG.info("Number of log files scanned => " + logFilePaths.size());
+    LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
+    LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
+    LOG.info(
+        "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
+    LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
+    LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 61d9b7f..6d2682a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieHFileReader;
 import org.apache.log4j.LogManager;
@@ -118,6 +119,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
         recordKey = record.get(keyField.pos()).toString();
       }
       byte[] recordBytes = HoodieAvroUtils.indexedRecordToBytes(record);
+      ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey),
+          "Writing multiple records with same key not supported for " + this.getClass().getName());
       sortedRecordsMap.put(recordKey, recordBytes);
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 918d568..fcb4fd9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -62,12 +62,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");
 
   public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
-      COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
-      INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION,
-      INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
-      INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
-      REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
-
+      COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION,
+      DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION,
+      SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
+      CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
+      INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
+      INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
+      ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
+      REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION
+  ));
+  
   private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
   protected HoodieTableMetaClient metaClient;
   private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 640d489..32e60c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -27,6 +27,8 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.fs.FileStatus;
+
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -68,8 +70,10 @@ public class TimelineMetadataUtils {
     Map<String, HoodieRollbackPartitionMetadata> partitionMetadataBuilder = new HashMap<>();
     int totalDeleted = 0;
     for (HoodieRollbackStat stat : rollbackStats) {
+      Map<String, Long> appendFiles = stat.getCommandBlocksCount().keySet().stream()
+          .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
       HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(),
-          stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles());
+          stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), appendFiles);
       partitionMetadataBuilder.put(stat.getPartitionPath(), metadata);
       totalDeleted += stat.getSuccessDeleteFiles().size();
     }
@@ -146,6 +150,10 @@ public class TimelineMetadataUtils {
     return deserializeAvroMetadata(bytes, HoodieRollbackMetadata.class);
   }
 
+  public static HoodieRestoreMetadata deserializeHoodieRestoreMetadata(byte[] bytes) throws IOException {
+    return deserializeAvroMetadata(bytes, HoodieRestoreMetadata.class);
+  }
+
   public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[] bytes) throws IOException {
     return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 1dd6b00..65e9231 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -276,7 +276,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
           Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
           FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath);
           long beginLsTs = System.currentTimeMillis();
-          FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath);
+          FileStatus[] statuses = listPartition(partitionPath);
           long endLsTs = System.currentTimeMillis();
           LOG.info("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
               + (endLsTs - beginLsTs));
@@ -298,6 +298,16 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
   }
 
   /**
+   * Return all the files from the partition.
+   *
+   * @param partitionPath The absolute path of the partition
+   * @throws IOException
+   */
+  protected FileStatus[] listPartition(Path partitionPath) throws IOException {
+    return metaClient.getFs().listStatus(partitionPath);
+  }
+
+  /**
    * Helper to convert file-status to base-files.
    *
    * @param statuses List of File-Status
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieMetadataException.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieMetadataException.java
new file mode 100644
index 0000000..132a9f8
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieMetadataException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * <p>
+ * Exception thrown for table metadata related failures.
+ * </p>
+ */
+public class HoodieMetadataException extends HoodieException {
+  public HoodieMetadataException(String msg, Exception t) {
+    super(msg, t);
+  }
+
+  public HoodieMetadataException(String msg) {
+    super(msg);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java b/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java
index 7666e90..2febe51 100644
--- a/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/TableNotFoundException.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.exception;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 /**
@@ -39,16 +41,14 @@ public class TableNotFoundException extends HoodieException {
   }
 
   public static void checkTableValidity(FileSystem fs, Path basePathDir, Path metaPathDir) {
-    // Check if the base path is found
+    // Check if the base and meta paths are found
     try {
-      if (!fs.exists(basePathDir) || !fs.isDirectory(basePathDir)) {
-        throw new TableNotFoundException(basePathDir.toString());
-      }
-      // Check if the meta path is found
-      if (!fs.exists(metaPathDir) || !fs.isDirectory(metaPathDir)) {
+      // Since metaPath is within the basePath, it is enough to check the metaPath exists
+      FileStatus status = fs.getFileStatus(metaPathDir);
+      if (!status.isDirectory()) {
         throw new TableNotFoundException(metaPathDir.toString());
       }
-    } catch (IllegalArgumentException e) {
+    } catch (FileNotFoundException | IllegalArgumentException e) {
       // if the base path is file:///, then we have a IllegalArgumentException
       throw new TableNotFoundException(metaPathDir.toString());
     } catch (IOException e) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
new file mode 100644
index 0000000..cdff41c
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Table metadata provided by an internal DFS backed Hudi metadata table.
+ *
+ * If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system.
+ * No updates are applied to the table and it is not synced.
+ */
+public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
+  private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  private static final int BUFFER_SIZE = 10 * 1024 * 1024;
+
+  private final SerializableConfiguration hadoopConf;
+  private final String datasetBasePath;
+  private final String metadataBasePath;
+  private final Option<HoodieMetadataMetrics> metrics;
+  private HoodieTableMetaClient metaClient;
+
+  private boolean enabled;
+  private final boolean validateLookups;
+  private final boolean assumeDatePartitioning;
+  // Directory used for Spillable Map when merging records
+  private final String spillableMapDirectory;
+
+  // Readers for the base and log file which store the metadata
+  private transient HoodieFileReader<GenericRecord> basefileReader;
+  private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
+
+  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
+                                   boolean enabled, boolean validateLookups, boolean assumeDatePartitioning) {
+    this(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, false, assumeDatePartitioning);
+  }
+
+  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
+                                   boolean enabled, boolean validateLookups, boolean enableMetrics,
+                                   boolean assumeDatePartitioning) {
+    this.hadoopConf = new SerializableConfiguration(conf);
+    this.datasetBasePath = datasetBasePath;
+    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
+    this.validateLookups = validateLookups;
+    this.spillableMapDirectory = spillableMapDirectory;
+    this.enabled = enabled;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+
+    if (enabled) {
+      try {
+        this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
+      } catch (TableNotFoundException e) {
+        LOG.error("Metadata table was not found at path " + metadataBasePath);
+        this.enabled = false;
+      } catch (Exception e) {
+        LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e);
+        this.enabled = false;
+      }
+    } else {
+      LOG.info("Metadata table is disabled.");
+    }
+
+    if (enableMetrics) {
+      this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
+    } else {
+      this.metrics = Option.empty();
+    }
+  }
+
+  /**
+   * Return the list of partitions in the dataset.
+   *
+   * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of
+   * partitions is retrieved directly from the underlying {@code FileSystem}.
+   *
+   * On any errors retrieving the listing from the metadata, defaults to using the file system listings.
+   *
+   */
+  @Override
+  public List<String> getAllPartitionPaths()
+      throws IOException {
+    if (enabled) {
+      try {
+        return fetchAllPartitionPaths();
+      } catch (Exception e) {
+        LOG.error("Failed to retrieve list of partition from metadata", e);
+      }
+    }
+
+    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
+    return FSUtils.getAllPartitionPaths(fs, datasetBasePath, assumeDatePartitioning);
+  }
+
+  /**
+   * Return the list of files in a partition.
+   *
+   * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of
+   * partitions is retrieved directly from the underlying {@code FileSystem}.
+   *
+   * On any errors retrieving the listing from the metadata, defaults to using the file system listings.
+   *
+   * @param partitionPath The absolute path of the partition to list
+   */
+  @Override
+  public FileStatus[] getAllFilesInPartition(Path partitionPath)
+      throws IOException {
+    if (enabled) {
+      try {
+        return fetchAllFilesInPartition(partitionPath);
+      } catch (Exception e) {
+        LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e);
+      }
+    }
+
+    return FSUtils.getFs(partitionPath.toString(), hadoopConf.get()).listStatus(partitionPath);
+  }
+
+  /**
+   * Returns a list of all partitions.
+   */
+  protected List<String> fetchAllPartitionPaths() throws IOException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
+
+    List<String> partitions = Collections.emptyList();
+    if (hoodieRecord.isPresent()) {
+      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+        throw new HoodieMetadataException("Metadata partition list record is inconsistent: "
+            + hoodieRecord.get().getData());
+      }
+
+      partitions = hoodieRecord.get().getData().getFilenames();
+      // Partition-less tables have a single empty partition
+      if (partitions.contains(NON_PARTITIONED_NAME)) {
+        partitions.remove(NON_PARTITIONED_NAME);
+        partitions.add("");
+      }
+    }
+
+    if (validateLookups) {
+      // Validate the Metadata Table data by listing the partitions from the file system
+      timer.startTimer();
+      List<String> actualPartitions  = FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
+      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer()));
+
+      Collections.sort(actualPartitions);
+      Collections.sort(partitions);
+      if (!actualPartitions.equals(partitions)) {
+        LOG.error("Validation of metadata partition list failed. Lists do not match.");
+        LOG.error("Partitions from metadata: " + Arrays.toString(partitions.toArray()));
+        LOG.error("Partitions from file system: " + Arrays.toString(actualPartitions.toArray()));
+
+        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
+      }
+
+      // Return the direct listing as it should be correct
+      partitions = actualPartitions;
+    }
+
+    LOG.info("Listed partitions from metadata: #partitions=" + partitions.size());
+    return partitions;
+  }
+
+  /**
+   * Return all the files from the partition.
+   *
+   * @param partitionPath The absolute path of the partition
+   */
+  FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException {
+    String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), partitionPath);
+    if (partitionName.isEmpty()) {
+      partitionName = NON_PARTITIONED_NAME;
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(partitionName);
+    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
+
+    FileStatus[] statuses = {};
+    if (hoodieRecord.isPresent()) {
+      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+        throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: "
+            + hoodieRecord.get().getData());
+      }
+      statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
+    }
+
+    if (validateLookups) {
+      // Validate the Metadata Table data by listing the partitions from the file system
+      timer.startTimer();
+
+      // Ignore partition metadata file
+      FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath,
+          p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
+
+      List<String> directFilenames = Arrays.stream(directStatuses)
+          .map(s -> s.getPath().getName()).sorted()
+          .collect(Collectors.toList());
+
+      List<String> metadataFilenames = Arrays.stream(statuses)
+          .map(s -> s.getPath().getName()).sorted()
+          .collect(Collectors.toList());
+
+      if (!metadataFilenames.equals(directFilenames)) {
+        LOG.error("Validation of metadata file listing for partition " + partitionName + " failed.");
+        LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray()));
+        LOG.error("File list from direct listing: " + Arrays.toString(directFilenames.toArray()));
+
+        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
+      }
+
+      // Return the direct listing as it should be correct
+      statuses = directStatuses;
+    }
+
+    LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length);
+    return statuses;
+  }
+
+  /**
+   * Retrieve the merged {@code HoodieRecord} mapped to the given key.
+   *
+   * @param key The key of the record
+   */
+  private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) throws IOException {
+    openBaseAndLogFiles();
+
+    // Retrieve record from base file
+    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+    if (basefileReader != null) {
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
+      if (baseRecord.isPresent()) {
+        hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+            metaClient.getTableConfig().getPayloadClass());
+        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+      }
+    }
+
+    // Retrieve record from log file
+    Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
+    if (logHoodieRecord.isPresent()) {
+      if (hoodieRecord != null) {
+        // Merge the payloads
+        HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
+        hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
+      } else {
+        hoodieRecord = logHoodieRecord.get();
+      }
+    }
+
+    return Option.ofNullable(hoodieRecord);
+  }
+
+  /**
+   * Open readers to the base and log files.
+   */
+  private synchronized void openBaseAndLogFiles() throws IOException {
+    if (logRecordScanner != null) {
+      // Already opened
+      return;
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+
+    // Metadata is in sync till the latest completed instant on the dataset
+    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+    String latestInstantTime = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
+        .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+    // Find the latest file slice
+    HoodieTimeline timeline = metaClient.reloadActiveTimeline();
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+    List<FileSlice> latestSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
+    ValidationUtils.checkArgument(latestSlices.size() == 1);
+
+    // If the base file is present then create a reader
+    Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile();
+    if (basefile.isPresent()) {
+      String basefilePath = basefile.get().getPath();
+      basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+      LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime());
+    }
+
+    // Open the log record scanner using the log files from the latest file slice
+    List<String> logFilePaths = latestSlices.get(0).getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+        .map(o -> o.getPath().toString())
+        .collect(Collectors.toList());
+
+    Option<HoodieInstant> lastInstant = timeline.filterCompletedInstants().lastInstant();
+    String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+    // Load the schema
+    Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+
+    logRecordScanner =
+        new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
+            logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
+            spillableMapDirectory, null);
+
+    LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime
+        + "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")");
+
+    metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
+  }
+
+  public void closeReaders() {
+    if (basefileReader != null) {
+      basefileReader.close();
+      basefileReader = null;
+    }
+    logRecordScanner = null;
+  }
+
+  /**
+   * Return {@code True} if all Instants from the dataset have been synced with the Metadata Table.
+   */
+  @Override
+  public boolean isInSync() {
+    return enabled && findInstantsToSync().isEmpty();
+  }
+
+  private List<HoodieInstant> findInstantsToSync() {
+    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+    return findInstantsToSync(datasetMetaClient);
+  }
+
+  /**
+   * Return an ordered list of instants which have not been synced to the Metadata Table.
+   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   */
+  protected List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient) {
+    HoodieActiveTimeline metaTimeline = metaClient.reloadActiveTimeline();
+
+    // All instants on the data timeline, which are greater than the last instant on metadata timeline
+    // are candidates for sync.
+    Option<HoodieInstant> latestMetadataInstant = metaTimeline.filterCompletedInstants().lastInstant();
+    ValidationUtils.checkArgument(latestMetadataInstant.isPresent(),
+        "At least one completed instant should exist on the metadata table, before syncing.");
+    String latestMetadataInstantTime = latestMetadataInstant.get().getTimestamp();
+    HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE);
+    Option<HoodieInstant> earliestIncompleteInstant = candidateTimeline.filterInflightsAndRequested().firstInstant();
+
+    if (earliestIncompleteInstant.isPresent()) {
+      return candidateTimeline.filterCompletedInstants()
+          .findInstantsBefore(earliestIncompleteInstant.get().getTimestamp())
+          .getInstants().collect(Collectors.toList());
+    } else {
+      return candidateTimeline.filterCompletedInstants()
+          .getInstants().collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Return the timestamp of the latest compaction instant.
+   */
+  @Override
+  public Option<String> getSyncedInstantTime() {
+    if (!enabled) {
+      return Option.empty();
+    }
+
+    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
+        .lastInstant().map(HoodieInstant::getTimestamp);
+  }
+
+  public boolean enabled() {
+    return enabled;
+  }
+
+  public SerializableConfiguration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public String getDatasetBasePath() {
+    return datasetBasePath;
+  }
+
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
+  }
+
+  public Map<String, String> stats() {
+    return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new HashMap<>());
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
new file mode 100644
index 0000000..ae471dc
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
+ * useful in limiting memory usage when only a small subset of updates records are to be read.
+ */
+public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordScanner {
+  // Set of all record keys that are to be read in memory
+  private Set<String> mergeKeyFilter;
+
+  public HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
+                                              Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
+                                              String spillableMapBasePath, Set<String> mergeKeyFilter) {
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
+        spillableMapBasePath, false);
+    this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet();
+
+    performScan();
+  }
+
+  @Override
+  protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
+    if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(hoodieRecord.getRecordKey())) {
+      super.processNextRecord(hoodieRecord);
+    }
+  }
+
+  @Override
+  protected void processNextDeletedKey(HoodieKey hoodieKey) {
+    if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(hoodieKey.getRecordKey())) {
+      super.processNextDeletedKey(hoodieKey);
+    }
+  }
+
+  /**
+   * Retrieve a record given its key.
+   *
+   * @param key Key of the record to retrieve
+   * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
+   */
+  public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) {
+    return Option.ofNullable((HoodieRecord) records.get(key));
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
new file mode 100644
index 0000000..2bd773b
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieMetadataMetrics implements Serializable {
+
+  // Metric names
+  public static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
+  public static final String LOOKUP_FILES_STR = "lookup_files";
+  public static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
+  public static final String VALIDATE_FILES_STR = "validate_files";
+  public static final String VALIDATE_ERRORS_STR = "validate_errors";
+  public static final String SCAN_STR = "scan";
+  public static final String BASEFILE_READ_STR = "basefile_read";
+  public static final String INITIALIZE_STR = "initialize";
+  public static final String SYNC_STR = "sync";
+
+  // Stats names
+  public static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes";
+  public static final String STAT_TOTAL_LOG_FILE_SIZE = "totalLogFileSizeInBytes";
+  public static final String STAT_COUNT_BASE_FILES = "baseFileCount";
+  public static final String STAT_COUNT_LOG_FILES = "logFileCount";
+  public static final String STAT_COUNT_PARTITION = "partitionCount";
+  public static final String STAT_IN_SYNC = "isInSync";
+  public static final String STAT_LAST_COMPACTION_TIMESTAMP = "lastCompactionTimestamp";
+
+  private static final Logger LOG = LogManager.getLogger(HoodieMetadataMetrics.class);
+
+  private final Registry metricsRegistry;
+
+  public HoodieMetadataMetrics(Registry metricsRegistry) {
+    this.metricsRegistry = metricsRegistry;
+  }
+
+  public Map<String, String> getStats(boolean detailed, HoodieTableMetaClient metaClient, HoodieTableMetadata metadata) {
+    try {
+      metaClient.reloadActiveTimeline();
+      HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+      return getStats(fsView, detailed, metadata);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Unable to get metadata stats.", ioe);
+    }
+  }
+
+  private Map<String, String> getStats(HoodieTableFileSystemView fsView, boolean detailed, HoodieTableMetadata tableMetadata) throws IOException {
+    Map<String, String> stats = new HashMap<>();
+
+    // Total size of the metadata and count of base/log files
+    long totalBaseFileSizeInBytes = 0;
+    long totalLogFileSizeInBytes = 0;
+    int baseFileCount = 0;
+    int logFileCount = 0;
+    List<FileSlice> latestSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
+
+    for (FileSlice slice : latestSlices) {
+      if (slice.getBaseFile().isPresent()) {
+        totalBaseFileSizeInBytes += slice.getBaseFile().get().getFileStatus().getLen();
+        ++baseFileCount;
+      }
+      Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
+      while (it.hasNext()) {
+        totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
+        ++logFileCount;
+      }
+    }
+
+    stats.put(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE, String.valueOf(totalBaseFileSizeInBytes));
+    stats.put(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE, String.valueOf(totalLogFileSizeInBytes));
+    stats.put(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES, String.valueOf(baseFileCount));
+    stats.put(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES, String.valueOf(logFileCount));
+
+    if (detailed) {
+      stats.put(HoodieMetadataMetrics.STAT_COUNT_PARTITION, String.valueOf(tableMetadata.getAllPartitionPaths().size()));
+      stats.put(HoodieMetadataMetrics.STAT_IN_SYNC, String.valueOf(tableMetadata.isInSync()));
+    }
+
+    return stats;
+  }
+
+  protected void updateMetrics(String action, long durationInMs) {
+    if (metricsRegistry == null) {
+      return;
+    }
+
+    // Update sum of duration and total for count
+    String countKey = action + ".count";
+    String durationKey = action + ".totalDuration";
+    metricsRegistry.add(countKey, 1);
+    metricsRegistry.add(durationKey, durationInMs);
+
+    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", durationKey, durationInMs, countKey));
+  }
+
+  public void updateMetrics(long totalBaseFileSizeInBytes, long totalLogFileSizeInBytes, int baseFileCount,
+                            int logFileCount) {
+    if (metricsRegistry == null) {
+      return;
+    }
+
+    // Update sizes and count for metadata table's data files
+    metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
+    metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
+    metricsRegistry.add("basefile.count", baseFileCount);
+    metricsRegistry.add("logfile.count", logFileCount);
+
+    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, logfile.size=%d, basefile.count=%d, "
+        + "logfile.count=%d)", totalBaseFileSizeInBytes, totalLogFileSizeInBytes, baseFileCount, logFileCount));
+  }
+
+  public Registry registry() {
+    return metricsRegistry;
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
new file mode 100644
index 0000000..0886436
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+
+/**
+ * This is a payload which saves information about a single entry in the Metadata Table.
+ *
+ * The type of the entry is determined by the "type" saved within the record. The following types of entries are saved:
+ *
+ *   1. List of partitions: There is a single such record
+ *         key="__all_partitions__"
+ *
+ *   2. List of files in a Partition: There is one such record for each partition
+ *         key=Partition name
+ *
+ *  During compaction on the table, the deletions are merged with additions and hence pruned.
+ *
+ * Metadata Table records are saved with the schema defined in HoodieMetadata.avsc. This class encapsulates the
+ * HoodieMetadataRecord for ease of operations.
+ */
+public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadataPayload> {
+  // Type of the record
+  // This can be an enum in the schema but Avro 1.8 has a bug - https://issues.apache.org/jira/browse/AVRO-1810
+  private static final int PARTITION_LIST = 1;
+  private static final int FILE_LIST = 2;
+
+  private String key = null;
+  private int type = 0;
+  private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
+
+  public HoodieMetadataPayload(Option<GenericRecord> record) {
+    if (record.isPresent()) {
+      // This can be simplified using SpecificData.deepcopy once this bug is fixed
+      // https://issues.apache.org/jira/browse/AVRO-1811
+      key = record.get().get("key").toString();
+      type = (int) record.get().get("type");
+      if (record.get().get("filesystemMetadata") != null) {
+        filesystemMetadata = (Map<String, HoodieMetadataFileInfo>) record.get().get("filesystemMetadata");
+        filesystemMetadata.keySet().forEach(k -> {
+          GenericRecord v = filesystemMetadata.get(k);
+          filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long)v.get("size"), (Boolean)v.get("isDeleted")));
+        });
+      }
+    }
+  }
+
+  private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
+    this.key = key;
+    this.type = type;
+    this.filesystemMetadata = filesystemMetadata;
+  }
+
+  /**
+   * Create and return a {@code HoodieMetadataPayload} to save list of partitions.
+   *
+   * @param partitions The list of partitions
+   */
+  public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
+    Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
+    partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L,  false)));
+
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    return new HoodieRecord<>(key, payload);
+  }
+
+  /**
+   * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition.
+   *
+   * @param partition The name of the partition
+   * @param filesAdded Mapping of files to their sizes for files which have been added to this partition
+   * @param filesDeleted List of files which have been deleted from this partition
+   */
+  public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition,
+                                                                               Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+    Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
+    filesAdded.ifPresent(
+        m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false))));
+    filesDeleted.ifPresent(
+        m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L,  true))));
+
+    HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath());
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    return new HoodieRecord<>(key, payload);
+  }
+
+  @Override
+  public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) {
+    ValidationUtils.checkArgument(previousRecord.type == type,
+        "Cannot combine " + previousRecord.type  + " with " + type);
+
+    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+
+    switch (type) {
+      case PARTITION_LIST:
+      case FILE_LIST:
+        combinedFileInfo = combineFilesystemMetadata(previousRecord);
+        break;
+      default:
+        throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type);
+    }
+
+    return new HoodieMetadataPayload(key, type, combinedFileInfo);
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRecord, Schema schema) throws IOException {
+    HoodieMetadataPayload anotherPayload = new HoodieMetadataPayload(Option.of((GenericRecord)oldRecord));
+    HoodieRecordPayload combinedPayload = preCombine(anotherPayload);
+    return combinedPayload.getInsertValue(schema);
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
+    if (key == null) {
+      return Option.empty();
+    }
+
+    HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata);
+    return Option.of(record);
+  }
+
+  /**
+   * Returns the list of filenames added as part of this record.
+   */
+  public List<String> getFilenames() {
+    return filterFileInfoEntries(false).map(e -> e.getKey()).sorted().collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the list of filenames deleted as part of this record.
+   */
+  public List<String> getDeletions() {
+    return filterFileInfoEntries(true).map(e -> e.getKey()).sorted().collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the files added as part of this record.
+   */
+  public FileStatus[] getFileStatuses(Path partitionPath) {
+    return filterFileInfoEntries(false)
+        .map(e -> new FileStatus(e.getValue().getSize(), false, 0, 0, 0, 0, null, null, null,
+            new Path(partitionPath, e.getKey())))
+        .toArray(FileStatus[]::new);
+  }
+
+  private Stream<Map.Entry<String, HoodieMetadataFileInfo>> filterFileInfoEntries(boolean isDeleted) {
+    if (filesystemMetadata == null) {
+      return Stream.empty();
+    }
+
+    return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted);
+  }
+
+  private Map<String, HoodieMetadataFileInfo> combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+    Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+    if (previousRecord.filesystemMetadata != null) {
+      combinedFileInfo.putAll(previousRecord.filesystemMetadata);
+    }
+
+    if (filesystemMetadata != null) {
+      filesystemMetadata.forEach((filename, fileInfo) -> {
+        // If the filename wasnt present then we carry it forward
+        if (!combinedFileInfo.containsKey(filename)) {
+          combinedFileInfo.put(filename, fileInfo);
+        } else {
+          if (fileInfo.getIsDeleted()) {
+            // file deletion
+            combinedFileInfo.remove(filename);
+          } else {
+            // file appends.
+            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> {
+              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false);
+            });
+          }
+        }
+      });
+    }
+
+    return combinedFileInfo;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {");
+    sb.append("key=").append(key).append(", ");
+    sb.append("type=").append(type).append(", ");
+    sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(", ");
+    sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(", ");
+    sb.append('}');
+    return sb.toString();
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
new file mode 100644
index 0000000..acb29f7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface that supports querying various pieces of metadata about a hudi table.
+ */
+public interface HoodieTableMetadata extends Serializable {
+
+  // Table name suffix
+  String METADATA_TABLE_NAME_SUFFIX = "_metadata";
+  /**
+   * Timestamp for a commit when the base dataset had not had any commits yet. this is < than even
+   * {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS}, such that the metadata table
+   * can be prepped even before bootstrap is done.
+   */
+  String SOLO_COMMIT_TIMESTAMP = "0000000000000";
+  // Key for the record which saves list of all partitions
+  String RECORDKEY_PARTITION_LIST = "__all_partitions__";
+  // The partition name used for non-partitioned tables
+  String NON_PARTITIONED_NAME = ".";
+
+  // Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
+  static final String METADATA_TABLE_REL_PATH = HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "metadata";
+
+  /**
+   * Return the base path of the Metadata Table.
+   *
+   * @param tableBasePath The base path of the dataset
+   */
+  static String getMetadataTableBasePath(String tableBasePath) {
+    return tableBasePath + Path.SEPARATOR + METADATA_TABLE_REL_PATH;
+  }
+
+  /**
+   * Returns {@code True} if the given path contains a metadata table.
+   *
+   * @param basePath The base path to check
+   */
+  static boolean isMetadataTable(String basePath) {
+    return basePath.endsWith(METADATA_TABLE_REL_PATH);
+  }
+
+  static HoodieTableMetadata create(Configuration conf, String datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata,
+                                    boolean verifyListings, boolean enableMetrics, boolean shouldAssumeDatePartitioning) {
+    return new HoodieBackedTableMetadata(conf, datasetBasePath, spillableMapPath, useFileListingFromMetadata, verifyListings,
+        enableMetrics, shouldAssumeDatePartitioning);
+  }
+
+  /**
+   * Fetch all the files at the given partition path, per the latest snapshot of the metadata.
+   */
+  FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException;
+
+  /**
+   * Fetch list of all partition paths, per the latest snapshot of the metadata.
+   */
+  List<String> getAllPartitionPaths() throws IOException;
+
+  /**
+   * Get the instant time to which the metadata is synced w.r.t data timeline.
+   */
+  Option<String> getSyncedInstantTime();
+
+  boolean isInSync();
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
new file mode 100644
index 0000000..0436de7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+public enum MetadataPartitionType {
+  FILES("files");
+
+  private final String partitionPath;
+
+  MetadataPartitionType(String partitionPath) {
+    this.partitionPath = partitionPath;
+  }
+
+  public String partitionPath() {
+    return partitionPath;
+  }
+}