You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/04 16:00:10 UTC

[hudi] 04/06: [HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table (apache#2342)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4b94529aafd45b608c6d1ab13376216144b934f6
Author: Ryan Pifer <ry...@amazon.com>
AuthorDate: Thu Dec 31 08:57:13 2020 -0800

    [HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table (apache#2342)
    
    [RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387)
    
    Co-authored-by: Ryan Pifer <ry...@amazon.com>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 256 ++---------------
 .../java/org/apache/hudi/table/HoodieTable.java    |   2 +-
 .../hudi/client/TestCompactionAdminClient.java     |   6 +
 ...Metadata.java => TestHoodieBackedMetadata.java} | 152 ++++++++--
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |   6 +
 .../hudi/testutils/HoodieClientTestHarness.java    |   7 +-
 ...edTableMetadata.java => BaseTableMetadata.java} | 272 ++++--------------
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 227 +--------------
 .../HoodieMetadataMergedInstantRecordScanner.java  | 115 ++++++++
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 311 +++++++++++++++++++++
 .../apache/hudi/functional/TestCOWDataSource.scala |   4 +-
 11 files changed, 677 insertions(+), 681 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ed24980..823e70c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -39,9 +39,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
-import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -49,7 +47,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 
@@ -61,18 +58,14 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
-import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 
 /**
@@ -211,7 +204,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     return metadataWriteConfig;
   }
 
-  public HoodieTableMetadata metadata() {
+  public HoodieBackedTableMetadata metadata() {
     return metadata;
   }
 
@@ -340,7 +333,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
 
         if (p.getRight().length > filesInDir.size()) {
           // Is a partition. Add all data files to result.
-          partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
+          String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft());
+          partitionToFileStatus.put(partitionName, filesInDir);
         } else {
           // Add sub-dirs to the queue
           pathsToList.addAll(Arrays.stream(p.getRight())
@@ -374,35 +368,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
       final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
       for (HoodieInstant instant : instantsToSync) {
         LOG.info("Syncing instant " + instant + " to metadata table");
-        ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");
-
-        switch (instant.getAction()) {
-          case HoodieTimeline.CLEAN_ACTION:
-            HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
-            update(cleanMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.DELTA_COMMIT_ACTION:
-          case HoodieTimeline.COMMIT_ACTION:
-          case HoodieTimeline.COMPACTION_ACTION:
-            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-                timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
-            update(commitMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.ROLLBACK_ACTION:
-            HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-                timeline.getInstantDetails(instant).get());
-            update(rollbackMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.RESTORE_ACTION:
-            HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
-                timeline.getInstantDetails(instant).get());
-            update(restoreMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.SAVEPOINT_ACTION:
-            // Nothing to be done here
-            break;
-          default:
-            throw new HoodieException("Unknown type of action " + instant.getAction());
+
+        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, metadata.getSyncedInstantTime());
+        if (records.isPresent()) {
+          commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
         }
       }
       // re-init the table metadata, for any future writes.
@@ -420,44 +389,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   @Override
   public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
-    if (!enabled) {
-      return;
+    if (enabled) {
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime);
+      commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
-
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> allPartitions = new LinkedList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
-      final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
-      allPartitions.add(partition);
-
-      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
-      writeStats.forEach(hoodieWriteStat -> {
-        String pathWithPartition = hoodieWriteStat.getPath();
-        if (pathWithPartition == null) {
-          // Empty partition
-          LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
-          return;
-        }
-
-        int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
-        String filename = pathWithPartition.substring(offset);
-        ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
-        newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
-      });
-
-      // New files added to a partition
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
-          partition, Option.of(newFiles), Option.empty());
-      records.add(record);
-    });
-
-    // New partitions created
-    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
-    records.add(record);
-
-    LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
-        + ". #partitions_updated=" + records.size());
-    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
   }
 
   /**
@@ -468,26 +403,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   @Override
   public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
-    if (!enabled) {
-      return;
+    if (enabled) {
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime);
+      commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
-
-    List<HoodieRecord> records = new LinkedList<>();
-    int[] fileDeleteCount = {0};
-    cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
-      fileDeleteCount[0] += deletedPathInfo.size();
-
-      // Files deleted from a partition
-      List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName())
-          .collect(Collectors.toList());
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
-          Option.of(deletedFilenames));
-      records.add(record);
-    });
-
-    LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileDeleteCount[0]);
-    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
   }
 
   /**
@@ -498,26 +417,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   @Override
   public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
-    if (!enabled) {
-      return;
+    if (enabled) {
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime);
+      commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
-
-    List<HoodieRecord> records = new LinkedList<>();
-    int[] fileDeleteCount = {0};
-
-    cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
-      // Files deleted from a partition
-      List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
-          Option.of(new ArrayList<>(deletedFiles)));
-
-      records.add(record);
-      fileDeleteCount[0] += deletedFiles.size();
-    });
-
-    LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileDeleteCount[0]);
-    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
   }
 
   /**
@@ -528,16 +431,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
-    if (!enabled) {
-      return;
+    if (enabled) {
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, metadata.getSyncedInstantTime());
+      commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
-
-    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
-    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
-      rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles));
-    });
-    commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore");
   }
 
   /**
@@ -548,119 +445,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
-    if (!enabled) {
-      return;
+    if (enabled) {
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, metadata.getSyncedInstantTime());
+      commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
-
-    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
-    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-    processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles);
-    commitRollback(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
-  }
-
-  /**
-   * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
-   *
-   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
-   * function will extract this change file for each partition.
-   *
-   * @param rollbackMetadata {@code HoodieRollbackMetadata}
-   * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
-   * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
-   */
-  private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
-                                       Map<String, List<String>> partitionToDeletedFiles,
-                                       Map<String, Map<String, Long>> partitionToAppendedFiles) {
-    rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
-      final String partition = pm.getPartitionPath();
-
-      if (!pm.getSuccessDeleteFiles().isEmpty()) {
-        if (!partitionToDeletedFiles.containsKey(partition)) {
-          partitionToDeletedFiles.put(partition, new ArrayList<>());
-        }
-
-        // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
-        List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
-            .collect(Collectors.toList());
-        partitionToDeletedFiles.get(partition).addAll(deletedFiles);
-      }
-
-      if (!pm.getAppendFiles().isEmpty()) {
-        if (!partitionToAppendedFiles.containsKey(partition)) {
-          partitionToAppendedFiles.put(partition, new HashMap<>());
-        }
-
-        // Extract appended file name from the absolute paths saved in getAppendFiles()
-        pm.getAppendFiles().forEach((path, size) -> {
-          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
-            return size + oldSize;
-          });
-        });
-      }
-    });
-  }
-
-  /**
-   * Create file delete records and commit.
-   *
-   * @param partitionToDeletedFiles {@code Map} of partitions and the deleted files
-   * @param instantTime Timestamp at which the deletes took place
-   * @param operation Type of the operation which caused the files to be deleted
-   */
-  private void commitRollback(Map<String, List<String>> partitionToDeletedFiles,
-                              Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime,
-                              String operation) {
-    List<HoodieRecord> records = new LinkedList<>();
-    int[] fileChangeCount = {0, 0}; // deletes, appends
-
-    partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
-      // Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the
-      // metadata table. Hence, the deleted filed need to be checked against the metadata.
-      try {
-        FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition));
-        Set<String> currentFiles =
-            Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet());
-
-        int origCount = deletedFiles.size();
-        deletedFiles.removeIf(f -> !currentFiles.contains(f));
-        if (deletedFiles.size() != origCount) {
-          LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the "
-              + " metadata for partition " + partition
-              + ". To delete = " + origCount + ", found=" + deletedFiles.size());
-        }
-
-        fileChangeCount[0] += deletedFiles.size();
-
-        Option<Map<String, Long>> filesAdded = Option.empty();
-        if (partitionToAppendedFiles.containsKey(partition)) {
-          filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
-        }
-
-        HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
-            Option.of(new ArrayList<>(deletedFiles)));
-        records.add(record);
-      } catch (IOException e) {
-        throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e);
-      }
-    });
-
-    partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
-      fileChangeCount[1] += appendedFileMap.size();
-
-      // Validate that no appended file has been deleted
-      ValidationUtils.checkState(
-          !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())),
-          "Rollback file cannot both be appended and deleted");
-
-      // New files added to a partition
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap),
-          Option.empty());
-      records.add(record);
-    });
-
-    LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
-    commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index b268512..d56e6e7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -277,7 +277,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
   private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
     if (config.useFileListingMetadata()) {
       FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
-      return new HoodieMetadataFileSystemView(metaClient, this.metadata, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
+      return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline, viewConfig.isIncrementalTimelineSyncEnabled());
     } else {
       return getViewManager().getFileSystemView(metaClient);
     }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 03328dd..e59a950 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -37,6 +37,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -70,6 +71,11 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
     client = new CompactionAdminClient(context, basePath);
   }
 
+  @AfterEach
+  public void cleanUp() throws Exception {
+    cleanupResources();
+  }
+
   @Test
   public void testUnscheduleCompactionPlan() throws Exception {
     int numEntriesPerInstant = 10;
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
similarity index 87%
rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index b9c3511..313eda2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -79,8 +79,8 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-public class TestHoodieFsMetadata extends HoodieClientTestHarness {
-  private static final Logger LOG = LogManager.getLogger(TestHoodieFsMetadata.class);
+public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
+  private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
 
   @TempDir
   public java.nio.file.Path tempFolder;
@@ -95,7 +95,7 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
     initSparkContexts("TestHoodieMetadata");
     initFileSystem();
     fs.mkdirs(new Path(basePath));
-    initMetaClient();
+    initMetaClient(tableType);
     initTestDataGenerator();
     metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
 
@@ -371,7 +371,41 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
       client.syncTableMetadata();
       validateMetadata(client);
     }
+  }
+
+  /**
+   * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op
+   * occurs to metadata.
+   * @throws Exception
+   */
+  @Test
+  public void testRollbackUnsyncedCommit() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      // Initialize table with metadata
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+    }
+
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      // Commit with metadata disabled
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.rollback(newCommitTime);
+    }
 
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) {
+      validateMetadata(client);
+    }
   }
 
   /**
@@ -637,14 +671,93 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
   }
 
   /**
+   * Test when reading from metadata table which is out of sync with dataset that results are still consistent.
+   */
+  //  @ParameterizedTest
+  //  @EnumSource(HoodieTableType.class)
+  @Test
+  public void testMetadataOutOfSync() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true));
+
+    // Enable metadata so table is initialized
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
+      // Perform Bulk Insert
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+    }
+
+    // Perform commit operations with metadata disabled
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      // Perform Insert
+      String newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      // Perform Upsert
+      newCommitTime = "003";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
+      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      // Compaction
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = "004";
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+      }
+    }
+
+    assertFalse(metadata(unsyncedClient).isInSync());
+    validateMetadata(unsyncedClient);
+
+    // Perform clean operation with metadata disabled
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      // One more commit needed to trigger clean so upsert and compact
+      String newCommitTime = "005";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
+      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = "006";
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+      }
+
+      // Clean
+      newCommitTime = "007";
+      client.clean(newCommitTime);
+    }
+
+    assertFalse(metadata(unsyncedClient).isInSync());
+    validateMetadata(unsyncedClient);
+
+    // Perform restore with metadata disabled
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
+      client.restoreToInstant("004");
+    }
+
+    assertFalse(metadata(unsyncedClient).isInSync());
+    validateMetadata(unsyncedClient);
+  }
+
+
+  /**
    * Validate the metadata tables contents to ensure it matches what is on the file system.
    *
    * @throws IOException
    */
   private void validateMetadata(SparkRDDWriteClient client) throws IOException {
     HoodieWriteConfig config = client.getConfig();
-    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
-    assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
+
+    HoodieBackedTableMetadata tableMetadata = metadata(client);
+    assertNotNull(tableMetadata, "MetadataReader should have been initialized");
     if (!config.useFileListingMetadata()) {
       return;
     }
@@ -652,17 +765,9 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
     HoodieTimer timer = new HoodieTimer().startTimer();
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
-    // Validate write config for metadata table
-    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
-    assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
-    assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
-
-    // Metadata table should be in sync with the dataset
-    assertTrue(metadata(client).isInSync());
-
     // Partitions should match
     List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, basePath);
-    List<String> metadataPartitions = metadataWriter.metadata().getAllPartitionPaths();
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
 
     Collections.sort(fsPartitions);
     Collections.sort(metadataPartitions);
@@ -684,7 +789,7 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
           partitionPath = new Path(basePath, partition);
         }
         FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
-        FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath);
+        FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
         List<String> fsFileNames = Arrays.stream(fsStatuses)
             .map(s -> s.getPath().getName()).collect(Collectors.toList());
         List<String> metadataFilenames = Arrays.stream(metaStatuses)
@@ -705,9 +810,9 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
         // FileSystemView should expose the same data
         List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
 
-        fileGroups.forEach(g -> LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
-        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
-        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
+        fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
+        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
+        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(s)));
 
         long numFiles = fileGroups.stream()
             .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
@@ -720,10 +825,17 @@ public class TestHoodieFsMetadata extends HoodieClientTestHarness {
       }
     });
 
-    HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+    assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
+
+    // Validate write config for metadata table
+    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
+    assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table");
+    assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table");
 
     // Metadata table should be in sync with the dataset
-    assertTrue(metadataWriter.metadata().isInSync());
+    assertTrue(metadata(client).isInSync());
+    HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
 
     // Metadata table is MOR
     assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index b8e02b9..6a292f5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -91,6 +92,11 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
     initDFSMetaClient();
   }
 
+  @AfterEach
+  public void cleanUp() throws Exception {
+    cleanupResources();
+  }
+
   @Test
   public void testLeftOverUpdatedPropFileCleanup() throws IOException {
     testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 9fa1c47..e6523af 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -204,6 +205,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
    * @throws IOException
    */
   protected void initMetaClient() throws IOException {
+    initMetaClient(getTableType());
+  }
+
+  protected void initMetaClient(HoodieTableType tableType) throws IOException {
     if (basePath == null) {
       throw new IllegalStateException("The base path has not been initialized.");
     }
@@ -212,7 +217,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
       throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    metaClient = HoodieTestUtils.init(context.getHadoopConf().get(), basePath, getTableType());
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
similarity index 50%
copy from hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
copy to hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 4858e6e..f62d9d8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,105 +19,64 @@
 
 package org.apache.hudi.metadata;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SpillableMapUtils;
-import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
 import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-/**
- * Table metadata provided by an internal DFS backed Hudi metadata table.
- *
- * If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system.
- * No updates are applied to the table and it is not synced.
- */
-public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
-  private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
-  private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  private static final int BUFFER_SIZE = 10 * 1024 * 1024;
+public abstract class BaseTableMetadata implements HoodieTableMetadata {
 
-  private final SerializableConfiguration hadoopConf;
-  private final String datasetBasePath;
-  private final String metadataBasePath;
-  private final Option<HoodieMetadataMetrics> metrics;
-  private HoodieTableMetaClient metaClient;
+  private static final Logger LOG = LogManager.getLogger(BaseTableMetadata.class);
+
+  static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  static final int BUFFER_SIZE = 10 * 1024 * 1024;
+
+  protected final SerializableConfiguration hadoopConf;
+  protected final String datasetBasePath;
+  protected boolean enabled;
+  protected final Option<HoodieMetadataMetrics> metrics;
 
-  private boolean enabled;
   private final boolean validateLookups;
   private final boolean assumeDatePartitioning;
-  // Directory used for Spillable Map when merging records
-  private final String spillableMapDirectory;
 
-  // Readers for the base and log file which store the metadata
-  private transient HoodieFileReader<GenericRecord> baseFileReader;
-  private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
-
-  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
-                                   boolean enabled, boolean validateLookups, boolean assumeDatePartitioning) {
-    this(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, false, assumeDatePartitioning);
-  }
+  // Directory used for Spillable Map when merging records
+  protected final String spillableMapDirectory;
+  private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner;
 
-  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
-                                   boolean enabled, boolean validateLookups, boolean enableMetrics,
-                                   boolean assumeDatePartitioning) {
-    this.hadoopConf = new SerializableConfiguration(conf);
+  protected BaseTableMetadata(Configuration hadoopConf, String datasetBasePath, String spillableMapDirectory,
+                              boolean enabled, boolean validateLookups, boolean enableMetrics,
+                              boolean assumeDatePartitioning) {
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
     this.datasetBasePath = datasetBasePath;
-    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
-    this.validateLookups = validateLookups;
     this.spillableMapDirectory = spillableMapDirectory;
+
     this.enabled = enabled;
+    this.validateLookups = validateLookups;
     this.assumeDatePartitioning = assumeDatePartitioning;
 
-    if (enabled) {
-      try {
-        this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
-      } catch (TableNotFoundException e) {
-        LOG.warn("Metadata table was not found at path " + metadataBasePath);
-        this.enabled = false;
-      } catch (Exception e) {
-        LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e);
-        this.enabled = false;
-      }
-    } else {
-      LOG.info("Metadata table is disabled.");
-    }
-
     if (enableMetrics) {
       this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
     } else {
@@ -134,8 +94,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
    *
    */
   @Override
-  public List<String> getAllPartitionPaths()
-      throws IOException {
+  public List<String> getAllPartitionPaths() throws IOException {
     if (enabled) {
       try {
         return fetchAllPartitionPaths();
@@ -163,7 +122,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
       try {
         return fetchAllFilesInPartition(partitionPath);
       } catch (Exception e) {
-        LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e);
+        LOG.error("Failed to retrieve files in partition " + partitionPath + " from metadata", e);
       }
     }
 
@@ -247,6 +206,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
       timer.startTimer();
 
       // Ignore partition metadata file
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
       FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath,
           p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
       metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
@@ -281,165 +241,53 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
    * @param key The key of the record
    */
   private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) throws IOException {
-    openBaseAndLogFiles();
-
-    // Retrieve record from base file
-    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-    if (baseFileReader != null) {
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
-      if (baseRecord.isPresent()) {
-        hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
-            metaClient.getTableConfig().getPayloadClass());
-        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
-      }
-    }
 
-    // Retrieve record from log file
-    Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
-    if (logHoodieRecord.isPresent()) {
-      if (hoodieRecord != null) {
-        // Merge the payloads
-        HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
-        hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
+    Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord;
+    openTimelineScanner();
+
+    Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord = getRecordByKeyFromMetadata(key);
+    // Retrieve record from unsynced timeline instants
+    Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = timelineRecordScanner.getRecordByKey(key);
+    if (timelineHoodieRecord.isPresent()) {
+      if (metadataHoodieRecord.isPresent()) {
+        HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
+        mergedRecord = Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
       } else {
-        hoodieRecord = logHoodieRecord.get();
+        mergedRecord = timelineHoodieRecord;
       }
+    } else {
+      mergedRecord = metadataHoodieRecord;
     }
-
-    return Option.ofNullable(hoodieRecord);
+    return mergedRecord;
   }
 
-  /**
-   * Open readers to the base and log files.
-   */
-  private synchronized void openBaseAndLogFiles() throws IOException {
-    if (logRecordScanner != null) {
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException;
+
+  private void openTimelineScanner() throws IOException {
+    if (timelineRecordScanner != null) {
       // Already opened
       return;
     }
 
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
-    // Metadata is in sync till the latest completed instant on the dataset
     HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    String latestInstantTime = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
-        .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-    // Find the latest file slice
-    HoodieTimeline timeline = metaClient.reloadActiveTimeline();
-    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
-    List<FileSlice> latestSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
-    ValidationUtils.checkArgument(latestSlices.size() == 1);
-
-    // If the base file is present then create a reader
-    Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile();
-    if (basefile.isPresent()) {
-      String basefilePath = basefile.get().getPath();
-      baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
-      LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime());
-    }
-
-    // Open the log record scanner using the log files from the latest file slice
-    List<String> logFilePaths = latestSlices.get(0).getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
-        .map(o -> o.getPath().toString())
-        .collect(Collectors.toList());
-
-    Option<HoodieInstant> lastInstant = timeline.filterCompletedInstants().lastInstant();
-    String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-    // Load the schema
+    List<HoodieInstant> unsyncedInstants = findInstantsToSync(datasetMetaClient);
     Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-
-    logRecordScanner =
-        new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
-            logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
-            spillableMapDirectory, null);
-
-    LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime
-        + "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")");
-
-    metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
-  }
-
-  public void closeReaders() {
-    if (baseFileReader != null) {
-      baseFileReader.close();
-      baseFileReader = null;
-    }
-    logRecordScanner = null;
+    timelineRecordScanner =
+        new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null);
   }
 
-  /**
-   * Return {@code True} if all Instants from the dataset have been synced with the Metadata Table.
-   */
-  @Override
-  public boolean isInSync() {
-    return enabled && findInstantsToSync().isEmpty();
-  }
-
-  private List<HoodieInstant> findInstantsToSync() {
+  protected List<HoodieInstant> findInstantsToSync() {
     HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
     return findInstantsToSync(datasetMetaClient);
   }
 
-  /**
-   * Return an ordered list of instants which have not been synced to the Metadata Table.
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
-   */
-  protected List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient) {
-    HoodieActiveTimeline metaTimeline = metaClient.reloadActiveTimeline();
-
-    // All instants on the data timeline, which are greater than the last instant on metadata timeline
-    // are candidates for sync.
-    Option<HoodieInstant> latestMetadataInstant = metaTimeline.filterCompletedInstants().lastInstant();
-    ValidationUtils.checkArgument(latestMetadataInstant.isPresent(),
-        "At least one completed instant should exist on the metadata table, before syncing.");
-    String latestMetadataInstantTime = latestMetadataInstant.get().getTimestamp();
-    HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE);
-    Option<HoodieInstant> earliestIncompleteInstant = candidateTimeline.filterInflightsAndRequested().firstInstant();
-
-    if (earliestIncompleteInstant.isPresent()) {
-      return candidateTimeline.filterCompletedInstants()
-          .findInstantsBefore(earliestIncompleteInstant.get().getTimestamp())
-          .getInstants().collect(Collectors.toList());
-    } else {
-      return candidateTimeline.filterCompletedInstants()
-          .getInstants().collect(Collectors.toList());
-    }
-  }
-
-  /**
-   * Return the timestamp of the latest compaction instant.
-   */
-  @Override
-  public Option<String> getSyncedInstantTime() {
-    if (!enabled) {
-      return Option.empty();
-    }
+  protected abstract List<HoodieInstant> findInstantsToSync(HoodieTableMetaClient datasetMetaClient);
 
-    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
-    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
-        .lastInstant().map(HoodieInstant::getTimestamp);
-  }
-
-  public boolean enabled() {
-    return enabled;
-  }
-
-  public SerializableConfiguration getHadoopConf() {
-    return hadoopConf;
-  }
-
-  public String getDatasetBasePath() {
-    return datasetBasePath;
-  }
-
-  public HoodieTableMetaClient getMetaClient() {
-    return metaClient;
+  public boolean isInSync() {
+    return enabled && findInstantsToSync().isEmpty();
   }
 
-  public Map<String, String> stats() {
-    return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new HashMap<>());
+  protected void closeReaders() {
+    timelineRecordScanner = null;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 4858e6e..65c3244 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,28 +18,12 @@
 
 package org.apache.hudi.metadata;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -52,37 +36,36 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Table metadata provided by an internal DFS backed Hudi metadata table.
  *
  * If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system.
  * No updates are applied to the table and it is not synced.
  */
-public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+public class HoodieBackedTableMetadata extends BaseTableMetadata {
 
   private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
-  private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  private static final int BUFFER_SIZE = 10 * 1024 * 1024;
 
-  private final SerializableConfiguration hadoopConf;
-  private final String datasetBasePath;
   private final String metadataBasePath;
-  private final Option<HoodieMetadataMetrics> metrics;
   private HoodieTableMetaClient metaClient;
 
-  private boolean enabled;
-  private final boolean validateLookups;
-  private final boolean assumeDatePartitioning;
-  // Directory used for Spillable Map when merging records
-  private final String spillableMapDirectory;
-
   // Readers for the base and log file which store the metadata
   private transient HoodieFileReader<GenericRecord> baseFileReader;
   private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
@@ -95,14 +78,8 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
   public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
                                    boolean enabled, boolean validateLookups, boolean enableMetrics,
                                    boolean assumeDatePartitioning) {
-    this.hadoopConf = new SerializableConfiguration(conf);
-    this.datasetBasePath = datasetBasePath;
+    super(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning);
     this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
-    this.validateLookups = validateLookups;
-    this.spillableMapDirectory = spillableMapDirectory;
-    this.enabled = enabled;
-    this.assumeDatePartitioning = assumeDatePartitioning;
-
     if (enabled) {
       try {
         this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
@@ -116,171 +93,10 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
     } else {
       LOG.info("Metadata table is disabled.");
     }
-
-    if (enableMetrics) {
-      this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
-    } else {
-      this.metrics = Option.empty();
-    }
-  }
-
-  /**
-   * Return the list of partitions in the dataset.
-   *
-   * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of
-   * partitions is retrieved directly from the underlying {@code FileSystem}.
-   *
-   * On any errors retrieving the listing from the metadata, defaults to using the file system listings.
-   *
-   */
-  @Override
-  public List<String> getAllPartitionPaths()
-      throws IOException {
-    if (enabled) {
-      try {
-        return fetchAllPartitionPaths();
-      } catch (Exception e) {
-        LOG.error("Failed to retrieve list of partition from metadata", e);
-      }
-    }
-    return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning).getAllPartitionPaths();
   }
 
-  /**
-   * Return the list of files in a partition.
-   *
-   * If the Metadata Table is enabled, the listing is retrieved from the stored metadata. Otherwise, the list of
-   * partitions is retrieved directly from the underlying {@code FileSystem}.
-   *
-   * On any errors retrieving the listing from the metadata, defaults to using the file system listings.
-   *
-   * @param partitionPath The absolute path of the partition to list
-   */
   @Override
-  public FileStatus[] getAllFilesInPartition(Path partitionPath)
-      throws IOException {
-    if (enabled) {
-      try {
-        return fetchAllFilesInPartition(partitionPath);
-      } catch (Exception e) {
-        LOG.error("Failed to retrive files in partition " + partitionPath + " from metadata", e);
-      }
-    }
-
-    return FSUtils.getFs(partitionPath.toString(), hadoopConf.get()).listStatus(partitionPath);
-  }
-
-  /**
-   * Returns a list of all partitions.
-   */
-  protected List<String> fetchAllPartitionPaths() throws IOException {
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
-    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
-
-    List<String> partitions = Collections.emptyList();
-    if (hoodieRecord.isPresent()) {
-      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
-        throw new HoodieMetadataException("Metadata partition list record is inconsistent: "
-            + hoodieRecord.get().getData());
-      }
-
-      partitions = hoodieRecord.get().getData().getFilenames();
-      // Partition-less tables have a single empty partition
-      if (partitions.contains(NON_PARTITIONED_NAME)) {
-        partitions.remove(NON_PARTITIONED_NAME);
-        partitions.add("");
-      }
-    }
-
-    if (validateLookups) {
-      // Validate the Metadata Table data by listing the partitions from the file system
-      timer.startTimer();
-      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, assumeDatePartitioning);
-      List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
-      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer()));
-
-      Collections.sort(actualPartitions);
-      Collections.sort(partitions);
-      if (!actualPartitions.equals(partitions)) {
-        LOG.error("Validation of metadata partition list failed. Lists do not match.");
-        LOG.error("Partitions from metadata: " + Arrays.toString(partitions.toArray()));
-        LOG.error("Partitions from file system: " + Arrays.toString(actualPartitions.toArray()));
-
-        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
-      }
-
-      // Return the direct listing as it should be correct
-      partitions = actualPartitions;
-    }
-
-    LOG.info("Listed partitions from metadata: #partitions=" + partitions.size());
-    return partitions;
-  }
-
-  /**
-   * Return all the files from the partition.
-   *
-   * @param partitionPath The absolute path of the partition
-   */
-  FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException {
-    String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), partitionPath);
-    if (partitionName.isEmpty()) {
-      partitionName = NON_PARTITIONED_NAME;
-    }
-
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(partitionName);
-    metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
-
-    FileStatus[] statuses = {};
-    if (hoodieRecord.isPresent()) {
-      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
-        throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: "
-            + hoodieRecord.get().getData());
-      }
-      statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
-    }
-
-    if (validateLookups) {
-      // Validate the Metadata Table data by listing the partitions from the file system
-      timer.startTimer();
-
-      // Ignore partition metadata file
-      FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath,
-          p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
-
-      List<String> directFilenames = Arrays.stream(directStatuses)
-          .map(s -> s.getPath().getName()).sorted()
-          .collect(Collectors.toList());
-
-      List<String> metadataFilenames = Arrays.stream(statuses)
-          .map(s -> s.getPath().getName()).sorted()
-          .collect(Collectors.toList());
-
-      if (!metadataFilenames.equals(directFilenames)) {
-        LOG.error("Validation of metadata file listing for partition " + partitionName + " failed.");
-        LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray()));
-        LOG.error("File list from direct listing: " + Arrays.toString(directFilenames.toArray()));
-
-        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
-      }
-
-      // Return the direct listing as it should be correct
-      statuses = directStatuses;
-    }
-
-    LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length);
-    return statuses;
-  }
-
-  /**
-   * Retrieve the merged {@code HoodieRecord} mapped to the given key.
-   *
-   * @param key The key of the record
-   */
-  private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) throws IOException {
+  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException {
     openBaseAndLogFiles();
 
     // Retrieve record from base file
@@ -313,7 +129,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
   /**
    * Open readers to the base and log files.
    */
-  private synchronized void openBaseAndLogFiles() throws IOException {
+  protected synchronized void openBaseAndLogFiles() throws IOException {
     if (logRecordScanner != null) {
       // Already opened
       return;
@@ -371,19 +187,6 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
   }
 
   /**
-   * Return {@code True} if all Instants from the dataset have been synced with the Metadata Table.
-   */
-  @Override
-  public boolean isInSync() {
-    return enabled && findInstantsToSync().isEmpty();
-  }
-
-  private List<HoodieInstant> findInstantsToSync() {
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    return findInstantsToSync(datasetMetaClient);
-  }
-
-  /**
    * Return an ordered list of instants which have not been synced to the Metadata Table.
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
new file mode 100644
index 0000000..1dcd322
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides functionality to convert timeline instants to table metadata records and then merge by key. Specify
+ *  a filter to limit keys that are merged and stored in memory.
+ */
+public class HoodieMetadataMergedInstantRecordScanner {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class);
+
+  HoodieTableMetaClient metaClient;
+  private List<HoodieInstant> instants;
+  private Option<String> lastSyncTs;
+  private Set<String> mergeKeyFilter;
+  protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
+
+  public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient metaClient, List<HoodieInstant> instants,
+                                                  Option<String> lastSyncTs, Schema readerSchema, Long maxMemorySizeInBytes,
+                                                  String spillableMapBasePath, Set<String> mergeKeyFilter) throws IOException {
+    this.metaClient = metaClient;
+    this.instants = instants;
+    this.lastSyncTs = lastSyncTs;
+    this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet();
+    this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
+        new HoodieRecordSizeEstimator(readerSchema));
+
+    scan();
+  }
+
+  /**
+   * Converts instants in scanner to metadata table records and processes each record.
+   *
+   * @param
+   * @throws IOException
+   */
+  private void scan() {
+    for (HoodieInstant instant : instants) {
+      try {
+        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, lastSyncTs);
+        if (records.isPresent()) {
+          records.get().forEach(record -> processNextRecord(record));
+        }
+      } catch (Exception e) {
+        LOG.error(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e);
+        throw new HoodieException(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e);
+      }
+    }
+  }
+
+  /**
+   * Process metadata table record by merging with existing record if it is a part of the key filter.
+   *
+   * @param hoodieRecord
+   */
+  private void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) {
+    String key = hoodieRecord.getRecordKey();
+    if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) {
+      if (records.containsKey(key)) {
+        // Merge and store the merged record
+        HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
+        records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
+      } else {
+        // Put the record as is
+        records.put(key, hoodieRecord);
+      }
+    }
+  }
+
+  /**
+   * Retrieve merged hoodie record for given key.
+   *
+   * @param key of the record to retrieve
+   * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
+   */
+  public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) {
+    return Option.ofNullable((HoodieRecord) records.get(key));
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
new file mode 100644
index 0000000..3017e82
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+
+/**
+ * A utility to convert timeline information to metadata table records.
+ */
+public class HoodieTableMetadataUtil {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
+
+  /**
+   * Converts a timeline instant to metadata table records.
+   *
+   * @param datasetMetaClient The meta client associated with the timeline instant
+   * @param instant to fetch and convert to metadata table records
+   * @return a list of metadata table records
+   * @throws IOException
+   */
+  public static Option<List<HoodieRecord>> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option<String> lastSyncTs) throws IOException {
+    HoodieTimeline timeline = datasetMetaClient.getActiveTimeline();
+    Option<List<HoodieRecord>> records = Option.empty();
+    ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");
+
+    switch (instant.getAction()) {
+      case HoodieTimeline.CLEAN_ACTION:
+        HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
+        records = Option.of(convertMetadataToRecords(cleanMetadata, instant.getTimestamp()));
+        break;
+      case HoodieTimeline.DELTA_COMMIT_ACTION:
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.COMPACTION_ACTION:
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+            timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+        records = Option.of(convertMetadataToRecords(commitMetadata, instant.getTimestamp()));
+        break;
+      case HoodieTimeline.ROLLBACK_ACTION:
+        HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+            timeline.getInstantDetails(instant).get());
+        records = Option.of(convertMetadataToRecords(rollbackMetadata, instant.getTimestamp(), lastSyncTs));
+        break;
+      case HoodieTimeline.RESTORE_ACTION:
+        HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+            timeline.getInstantDetails(instant).get());
+        records = Option.of(convertMetadataToRecords(restoreMetadata, instant.getTimestamp(), lastSyncTs));
+        break;
+      case HoodieTimeline.SAVEPOINT_ACTION:
+        // Nothing to be done here
+        break;
+      default:
+        throw new HoodieException("Unknown type of action " + instant.getAction());
+    }
+
+    return records;
+  }
+
+  /**
+   * Finds all new files/partitions created as part of commit and creates metadata table records for them.
+   *
+   * @param commitMetadata
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> allPartitions = new LinkedList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
+      final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
+      allPartitions.add(partition);
+
+      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+      writeStats.forEach(hoodieWriteStat -> {
+        String pathWithPartition = hoodieWriteStat.getPath();
+        if (pathWithPartition == null) {
+          // Empty partition
+          LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
+          return;
+        }
+
+        int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
+        String filename = pathWithPartition.substring(offset);
+        ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
+        newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
+      });
+
+      // New files added to a partition
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
+          partition, Option.of(newFiles), Option.empty());
+      records.add(record);
+    });
+
+    // New partitions created
+    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions));
+    records.add(record);
+
+    LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType()
+        + ". #partitions_updated=" + records.size());
+    return records;
+  }
+
+  /**
+   * Finds all files that will be deleted as part of a planned clean and creates metadata table records for them.
+   *
+   * @param cleanerPlan from timeline to convert
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanerPlan cleanerPlan, String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+
+    int[] fileDeleteCount = {0};
+    cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
+      fileDeleteCount[0] += deletedPathInfo.size();
+
+      // Files deleted from a partition
+      List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName())
+          .collect(Collectors.toList());
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+          Option.of(deletedFilenames));
+      records.add(record);
+    });
+
+    LOG.info("Found at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size()
+        + ", #files_deleted=" + fileDeleteCount[0]);
+    return records;
+  }
+
+  /**
+   * Finds all files that were deleted as part of a clean and creates metadata table records for them.
+   *
+   * @param cleanMetadata
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileDeleteCount = {0};
+
+    cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
+      // Files deleted from a partition
+      List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+          Option.of(new ArrayList<>(deletedFiles)));
+
+      records.add(record);
+      fileDeleteCount[0] += deletedFiles.size();
+    });
+
+    LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+        + ", #files_deleted=" + fileDeleteCount[0]);
+    return records;
+  }
+
+  /**
+   * Aggregates all files deleted and appended to from all rollbacks associated with a restore operation then
+   * creates metadata table records for them.
+   *
+   * @param restoreMetadata
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
+      rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs));
+    });
+
+    return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore");
+  }
+
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, Option<String> lastSyncTs) {
+
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+    processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs);
+    return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
+  }
+
+  /**
+   * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}.
+   *
+   * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
+   * function will extract this change file for each partition.
+   *
+   * @param rollbackMetadata {@code HoodieRollbackMetadata}
+   * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
+   * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
+   */
+  private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
+                                              Map<String, List<String>> partitionToDeletedFiles,
+                                              Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                              Option<String> lastSyncTs) {
+
+    rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
+      // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata
+      if (lastSyncTs.isPresent() && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get())) {
+        return;
+      }
+
+      final String partition = pm.getPartitionPath();
+      if (!pm.getSuccessDeleteFiles().isEmpty()) {
+        if (!partitionToDeletedFiles.containsKey(partition)) {
+          partitionToDeletedFiles.put(partition, new ArrayList<>());
+        }
+
+        // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
+        List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
+            .collect(Collectors.toList());
+        partitionToDeletedFiles.get(partition).addAll(deletedFiles);
+      }
+
+      if (!pm.getAppendFiles().isEmpty()) {
+        if (!partitionToAppendedFiles.containsKey(partition)) {
+          partitionToAppendedFiles.put(partition, new HashMap<>());
+        }
+
+        // Extract appended file name from the absolute paths saved in getAppendFiles()
+        pm.getAppendFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+            return size + oldSize;
+          });
+        });
+      }
+    });
+  }
+
+  private static List<HoodieRecord> convertFilesToRecords(Map<String, List<String>> partitionToDeletedFiles,
+                                                          Map<String, Map<String, Long>> partitionToAppendedFiles, String instantTime,
+                                                          String operation) {
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileChangeCount = {0, 0}; // deletes, appends
+
+    partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
+      fileChangeCount[0] += deletedFiles.size();
+
+      Option<Map<String, Long>> filesAdded = Option.empty();
+      if (partitionToAppendedFiles.containsKey(partition)) {
+        filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
+      }
+
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
+          Option.of(new ArrayList<>(deletedFiles)));
+      records.add(record);
+    });
+
+    partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
+      fileChangeCount[1] += appendedFileMap.size();
+
+      // Validate that no appended file has been deleted
+      ValidationUtils.checkState(
+          !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())),
+          "Rollback file cannot both be appended and deleted");
+
+      // New files added to a partition
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap),
+          Option.empty());
+      records.add(record);
+    });
+
+    LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size()
+        + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]);
+
+    return records;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index c3843cc..f315a26 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -86,7 +86,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
+  //TODO(metadata): Needs HUDI-1459 to be fixed
+  //@ValueSource(booleans = Array(true, false))
+  @ValueSource(booleans = Array(false))
   def testCopyOnWriteStorage(isMetadataEnabled: Boolean) {
     // Insert Operation
     val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList