You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/08/14 04:23:49 UTC

[hudi] branch master updated: [HUDI-2119] Ensure the rolled-back instance was previously synced to the Metadata Table when syncing a Rollback Instant. (#3210)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eed440  [HUDI-2119] Ensure the rolled-back instance was previously synced to the Metadata Table when syncing a Rollback Instant. (#3210)
8eed440 is described below

commit 8eed4406949992086fe36b8da0029de98f443588
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Aug 13 21:23:34 2021 -0700

    [HUDI-2119] Ensure the rolled-back instance was previously synced to the Metadata Table when syncing a Rollback Instant. (#3210)
    
    * [HUDI-2119] Ensure the rolled-back instance was previously synced to the Metadata Table when syncing a Rollback Instant.
    
    If the rolled-back instant was synced to the Metadata Table, a corresponding deltacommit with the same timestamp should have been created on the Metadata Table timeline. To ensure we can always perfomr this check, the Metadata Table instants should not be archived until their corresponding instants are present in the dataset timeline. But ensuring this requires a large number of instants to be kept on the metadata table.
    
    In this change, the metadata table will keep atleast the number of instants that the main dataset is keeping. If the instant being rolled back was before the metadata table timeline, the code will throw an exception and the metadata table will have to be re-bootstrapped. This should be a very rare occurance and should occur only when the dataset is being repaired by rolling back multiple commits or restoring to an much older time.
    
    * Fixed checkstyle
    
    * Improvements from review comments.
    
    Fixed  checkstyle
    Replaced explicit null check with Option.ofNullable
    Removed redundant function getSynedInstantTime
    
    * Renamed getSyncedInstantTime and getSyncedInstantTimeForReader.
    
    Sync is confusing so renamed to getUpdateTime() and getReaderTime().
    
    * Removed getReaderTime which is only for testing as the same method can be accessed during testing differently without making it part of the public interface.
    
    * Fix compilation error
    
    * Reverting changes to HoodieMetadataFileSystemView
    
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  18 +-
 .../hudi/metadata/HoodieTableMetadataWriter.java   |   6 -
 .../hudi/table/HoodieTimelineArchiveLog.java       |   2 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |  18 --
 .../SparkHoodieBackedTableMetadataWriter.java      |  19 --
 .../functional/TestHoodieBackedMetadata.java       | 213 +++++++++++++++++----
 .../apache/hudi/metadata/BaseTableMetadata.java    |  35 ++--
 .../metadata/FileSystemBackedTableMetadata.java    |   2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  19 ++
 .../metadata/HoodieMetadataFileSystemView.java     |  10 +-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |   9 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  52 +++--
 .../hudi/metadata/TimelineMergedTableMetadata.java |  21 +-
 13 files changed, 295 insertions(+), 129 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 70682c2..65c51f0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -137,6 +137,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
   private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) {
     int parallelism = writeConfig.getMetadataInsertParallelism();
 
+    int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep());
+    int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep());
+
     // Create the write config for the metadata table by borrowing options from the main write config.
     HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
@@ -162,7 +165,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
             .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
-            .archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep())
+            .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
             // we will trigger compaction manually, to control the instant times
             .withInlineCompaction(false)
             .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
@@ -416,7 +419,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
       for (HoodieInstant instant : instantsToSync) {
         LOG.info("Syncing instant " + instant + " to metadata table");
 
-        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime());
+        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
+            metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
         if (records.isPresent()) {
           commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
         }
@@ -478,7 +482,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
     if (enabled) {
-      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, metadata.getSyncedInstantTime());
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metaClient.getActiveTimeline(),
+          restoreMetadata, instantTime, metadata.getUpdateTime());
       commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
   }
@@ -492,7 +497,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
     if (enabled) {
-      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, metadata.getSyncedInstantTime());
+      List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metaClient.getActiveTimeline(),
+          rollbackMetadata, instantTime, metadata.getUpdateTime());
       commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
     }
   }
@@ -504,6 +510,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
     }
   }
 
+  public HoodieBackedTableMetadata getMetadataReader() {
+    return metadata;
+  }
+
   /**
    * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
    *
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 1b02a3b..02c5b9e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -23,7 +23,6 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.util.Option;
 
 import java.io.Serializable;
 
@@ -41,9 +40,4 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
   void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
 
   void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
-
-  /**
-   * Return the timestamp of the latest instant synced to the metadata table.
-   */
-  Option<String> getLatestSyncedInstantTime();
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index b9e7dd4..c63a683 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -205,7 +205,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
     if (config.isMetadataTableEnabled()) {
       try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
           config.getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) {
-        Option<String> lastSyncedInstantTime = tableMetadata.getSyncedInstantTime();
+        Option<String> lastSyncedInstantTime = tableMetadata.getUpdateTime();
 
         if (lastSyncedInstantTime.isPresent()) {
           LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 2981130..458af1a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -126,23 +125,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
   }
 
   /**
-   * Return the timestamp of the latest instant synced.
-   * <p>
-   * To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest
-   * delta-commit.
-   */
-  @Override
-  public Option<String> getLatestSyncedInstantTime() {
-    if (!enabled) {
-      return Option.empty();
-    }
-
-    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
-    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
-        .lastInstant().map(HoodieInstant::getTimestamp);
-  }
-
-  /**
    * Tag each record with the location.
    * <p>
    * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index c014e8b..7c12a9e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -29,8 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -135,23 +133,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
   }
 
   /**
-   * Return the timestamp of the latest instant synced.
-   *
-   * To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest
-   * delta-commit.
-   */
-  @Override
-  public Option<String> getLatestSyncedInstantTime() {
-    if (!enabled) {
-      return Option.empty();
-    }
-
-    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
-    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
-        .lastInstant().map(HoodieInstant::getTimestamp);
-  }
-
-  /**
    * Tag each record with the location.
    *
    * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 22110d5..e78c2c8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -18,6 +18,24 @@
 
 package org.apache.hudi.client.functional;
 
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -50,9 +68,11 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -61,9 +81,6 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -76,22 +93,6 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 @Tag("functional")
 public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
 
@@ -515,6 +516,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
     }
+
     String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
       // Commit with metadata disabled
@@ -530,6 +532,144 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
       client.syncTableMetadata();
       validateMetadata(client);
     }
+
+    // If an unsynced commit is automatically rolled back during next commit, the rollback commit gets a timestamp
+    // greater than than the new commit which is started. Ensure that in this case the rollback is not processed
+    // as the earlier failed commit would not have been committed.
+    //
+    //  Dataset:   C1        C2         C3.inflight[failed]   C4   R5[rolls back C3]
+    //  Metadata:  C1.delta  C2.delta
+    //
+    // When R5 completes, C3.xxx will be deleted. When C4 completes, C4 and R5 will be committed to Metadata Table in
+    // that order. R5 should be neglected as C3 was never committed to metadata table.
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) {
+      // Metadata disabled and no auto-commit
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      // Not committed so left in inflight state
+      client.syncTableMetadata();
+      assertTrue(metadata(client).isInSync());
+      validateMetadata(client);
+    }
+
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) {
+      // Metadata enabled
+      // The previous commit will be rolled back automatically
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      assertTrue(metadata(client).isInSync());
+      validateMetadata(client);
+    }
+
+    // In this scenario an async operations is started and completes around the same time of the failed commit.
+    // Rest of the reasoning is same as above test.
+    //  C4.clean was an asynchronous clean started along with C3. The clean completed but C3 commit failed.
+    //
+    //  Dataset:   C1        C2         C3.inflight[failed]  C4.clean     C5   R6[rolls back C3]
+    //  Metadata:  C1.delta  C2.delta
+    //
+    // When R6 completes, C3.xxx will be deleted. When C5 completes, C4, C5 and R6 will be committed to Metadata Table
+    // in that order. R6 should be neglected as C3 was never committed to metadata table.
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) {
+      // Metadata disabled and no auto-commit
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      // Not committed so left in inflight state
+      client.clean();
+      client.syncTableMetadata();
+      assertTrue(metadata(client).isInSync());
+      validateMetadata(client);
+    }
+
+    newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) {
+      // Metadata enabled
+      // The previous commit will be rolled back automatically
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      assertTrue(metadata(client).isInSync());
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table
+   * timeline.
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testManualRollbacks(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Setting to archive more aggressively on the Metadata Table than the Dataset
+    final int maxDeltaCommitsBeforeCompaction = 4;
+    final int minArchiveCommitsMetadata = 2;
+    final int minArchiveCommitsDataset = 4;
+    HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+            .archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1)
+            .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
+            .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
+      // Initialize table with metadata
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Perform multiple commits
+      for (int i = 1; i < 10; ++i) {
+        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+        if (i == 1) {
+          records = dataGen.generateInserts(newCommitTime, 5);
+        } else {
+          records = dataGen.generateUpdates(newCommitTime, 2);
+        }
+        client.startCommitWithTime(newCommitTime);
+        writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+        assertNoWriteErrors(writeStatuses);
+      }
+
+      // We can only rollback those commits whose deltacommit have not been archived yet.
+      int numRollbacks = 0;
+      boolean exceptionRaised = false;
+
+      List<HoodieInstant> allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants()
+          .collect(Collectors.toList());
+      for (HoodieInstant instantToRollback : allInstants) {
+        try {
+          client.rollback(instantToRollback.getTimestamp());
+          client.syncTableMetadata();
+          ++numRollbacks;
+        } catch (HoodieMetadataException e) {
+          exceptionRaised = true;
+          break;
+        }
+      }
+
+      assertTrue(exceptionRaised, "Rollback of archived instants should fail");
+      // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original
+      // instants present before rollback started.
+      assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2,
+          "Rollbacks of non archived instants should work");
+    }
   }
 
   /**
@@ -657,12 +797,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
       // Table should sync only before the inflightActionTimestamp
       HoodieBackedTableMetadataWriter writer =
           (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
-      assertEquals(writer.getLatestSyncedInstantTime().get(), beforeInflightActionTimestamp);
+      assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp);
 
       // Reader should sync to all the completed instants
       HoodieTableMetadata metadata  = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
           client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
-      assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
+      assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime);
 
       // Remove the inflight instance holding back table sync
       fs.delete(inflightCleanPath, false);
@@ -670,12 +810,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
 
       writer =
           (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
-      assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime);
+      assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
 
       // Reader should sync to all the completed instants
       metadata  = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
           client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
-      assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
+      assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
     }
 
     // Enable metadata table and ensure it is synced
@@ -693,7 +833,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   }
 
   /**
-   * Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config.
+   * Instants on Metadata Table should be archived as per config but we always keep atlest the number of instants
+   * as on the dataset. Metadata Table should be automatically compacted as per config.
    */
   @Test
   public void testCleaningArchivingAndCompaction() throws Exception {
@@ -701,12 +842,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
     final int maxDeltaCommitsBeforeCompaction = 4;
+    final int minArchiveLimit = 4;
+    final int maxArchiveLimit = 6;
     HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
-            .archiveCommitsWith(6, 8).retainCommits(1)
+            .archiveCommitsWith(minArchiveLimit - 2, maxArchiveLimit - 2).retainCommits(1)
             .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
-        // don't archive the data timeline at all.
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(Integer.MAX_VALUE - 1, Integer.MAX_VALUE)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveLimit, maxArchiveLimit)
             .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build())
         .build();
 
@@ -736,6 +878,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
     // ensure archiving has happened
     long numDataCompletedInstants = datasetMetaClient.getActiveTimeline().filterCompletedInstants().countInstants();
     long numDeltaCommits = metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants();
+    assertTrue(numDeltaCommits >= minArchiveLimit);
     assertTrue(numDeltaCommits < numDataCompletedInstants, "Must have less delta commits than total completed instants on data timeline.");
   }
 
@@ -1030,14 +1173,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
         // File sizes should be valid
         Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
 
-        // Block sizes should be valid
-        Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
-        List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
-        Collections.sort(fsBlockSizes);
-        List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
-        Collections.sort(metadataBlockSizes);
-        assertEquals(fsBlockSizes, metadataBlockSizes);
-
         if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
           LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
           LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
@@ -1054,6 +1189,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
           }
         }
 
+        // Block sizes should be valid
+        Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
+        List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
+        Collections.sort(fsBlockSizes);
+        List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
+        Collections.sort(metadataBlockSizes);
+        assertEquals(fsBlockSizes, metadataBlockSizes);
+
         assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
         assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 16f85fb..44850b9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -64,7 +65,6 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
   protected final HoodieMetadataConfig metadataConfig;
   // Directory used for Spillable Map when merging records
   protected final String spillableMapDirectory;
-  private String syncedInstantTime;
 
   protected boolean enabled;
   private TimelineMergedTableMetadata timelineMergedMetadata;
@@ -84,9 +84,6 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
     } else {
       this.metrics = Option.empty();
     }
-    if (enabled) {
-      openTimelineScanner();
-    }
   }
 
   /**
@@ -298,27 +295,12 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
 
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key);
 
-  private void openTimelineScanner() {
+  protected void openTimelineScanner(HoodieActiveTimeline metadataTableTimeline) {
     if (timelineMergedMetadata == null) {
       List<HoodieInstant> unSyncedInstants = findInstantsToSyncForReader();
       timelineMergedMetadata =
-          new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), null);
-
-      syncedInstantTime = unSyncedInstants.isEmpty() ? getLatestDatasetInstantTime()
-          : unSyncedInstants.get(unSyncedInstants.size() - 1).getTimestamp();
-    }
-  }
-
-  /**
-   * Return the timestamp of the latest synced instant.
-   */
-  @Override
-  public Option<String> getSyncedInstantTime() {
-    if (!enabled) {
-      return Option.empty();
+          new TimelineMergedTableMetadata(datasetMetaClient, metadataTableTimeline, unSyncedInstants, getUpdateTime(), null);
     }
-
-    return Option.ofNullable(syncedInstantTime);
   }
 
   /**
@@ -344,8 +326,19 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
     return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());
   }
 
+  public HoodieMetadataConfig getMetadataConfig() {
+    return metadataConfig;
+  }
+
   protected String getLatestDatasetInstantTime() {
     return datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
         .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
   }
+
+  public Option<String> getReaderTime() {
+    if (timelineMergedMetadata == null) {
+      return Option.empty();
+    }
+    return timelineMergedMetadata.getSyncedInstantTime();
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index ce1cf55..bb3115a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -126,7 +126,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
   }
 
   @Override
-  public Option<String> getSyncedInstantTime() {
+  public Option<String> getUpdateTime() {
     throw new UnsupportedOperationException();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 5234c4d..554a165 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -115,6 +116,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
         this.metaClient = null;
         this.tableConfig = null;
       }
+
+      if (enabled) {
+        openTimelineScanner(metaClient.getActiveTimeline());
+      }
     }
   }
 
@@ -273,6 +278,20 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   }
 
   /**
+   * Return the timestamp of the latest synced instant.
+   */
+  @Override
+  public Option<String> getUpdateTime() {
+    if (!enabled) {
+      return Option.empty();
+    }
+
+    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+    return timeline.getDeltaCommitTimeline().filterCompletedInstants()
+        .lastInstant().map(HoodieInstant::getTimestamp);
+  }
+
+  /**
    * Return an ordered list of instants which have not been synced to the Metadata Table.
    */
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index a3d0e2d..999ea8e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -31,6 +26,11 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
 /**
  * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
  */
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 923a5a5..3964cd1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -68,6 +68,9 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
    * @param basePath The base path to check
    */
   static boolean isMetadataTable(String basePath) {
+    if (basePath.endsWith(Path.SEPARATOR)) {
+      basePath = basePath.substring(0, basePath.length() - 1);
+    }
     return basePath.endsWith(METADATA_TABLE_REL_PATH);
   }
 
@@ -102,9 +105,11 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
   Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths) throws IOException;
 
   /**
-   * Get the instant time to which the metadata is synced w.r.t data timeline.
+   * Get the instant time at which Metadata Table was last updated.
+   *
+   * This is the timestamp of the Instant on the dataset which was last synced to the Metadata Table.
    */
-  Option<String> getSyncedInstantTime();
+  Option<String> getUpdateTime();
 
   boolean isInSync();
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 5942312..14fe07b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -33,7 +34,7 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
-
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -61,11 +62,13 @@ public class HoodieTableMetadataUtil {
    * Converts a timeline instant to metadata table records.
    *
    * @param datasetMetaClient The meta client associated with the timeline instant
+   * @param metadataTableTimeline Current timeline of the Metadata Table
    * @param instant to fetch and convert to metadata table records
    * @return a list of metadata table records
    * @throws IOException
    */
-  public static Option<List<HoodieRecord>> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option<String> lastSyncTs) throws IOException {
+  public static Option<List<HoodieRecord>> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient,
+      HoodieActiveTimeline metadataTableTimeline, HoodieInstant instant, Option<String> lastSyncTs) throws IOException {
     HoodieTimeline timeline = datasetMetaClient.getActiveTimeline();
     Option<List<HoodieRecord>> records = Option.empty();
     ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");
@@ -85,12 +88,12 @@ public class HoodieTableMetadataUtil {
       case HoodieTimeline.ROLLBACK_ACTION:
         HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
             timeline.getInstantDetails(instant).get());
-        records = Option.of(convertMetadataToRecords(rollbackMetadata, instant.getTimestamp(), lastSyncTs));
+        records = Option.of(convertMetadataToRecords(metadataTableTimeline, rollbackMetadata, instant.getTimestamp(), lastSyncTs));
         break;
       case HoodieTimeline.RESTORE_ACTION:
         HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
             timeline.getInstantDetails(instant).get());
-        records = Option.of(convertMetadataToRecords(restoreMetadata, instant.getTimestamp(), lastSyncTs));
+        records = Option.of(convertMetadataToRecords(metadataTableTimeline, restoreMetadata, instant.getTimestamp(), lastSyncTs));
         break;
       case HoodieTimeline.SAVEPOINT_ACTION:
         // Nothing to be done here
@@ -213,21 +216,22 @@ public class HoodieTableMetadataUtil {
    * @param instantTime
    * @return a list of metadata table records
    */
-  public static List<HoodieRecord> convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline,
+      HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
     Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
     Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
     restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
-      rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs));
+      rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs));
     });
 
     return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore");
   }
 
-  public static List<HoodieRecord> convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, Option<String> lastSyncTs) {
-
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline,
+      HoodieRollbackMetadata rollbackMetadata, String instantTime, Option<String> lastSyncTs) {
     Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
     Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-    processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs);
+    processRollbackMetadata(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs);
     return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
   }
 
@@ -236,27 +240,47 @@ public class HoodieTableMetadataUtil {
    *
    * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This
    * function will extract this change file for each partition.
-   *
+   * @param metadataTableTimeline Current timeline of the Metdata Table
    * @param rollbackMetadata {@code HoodieRollbackMetadata}
    * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
    * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
    */
-  private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
+  private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata,
                                               Map<String, List<String>> partitionToDeletedFiles,
                                               Map<String, Map<String, Long>> partitionToAppendedFiles,
                                               Option<String> lastSyncTs) {
 
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
+      final String instantToRollback = rollbackMetadata.getCommitsRollback().get(0);
       // Has this rollback produced new files?
       boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty();
       boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
-      // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata
+
+      // If instant-to-rollback has not been synced to metadata table yet then there is no need to update metadata
+      // This can happen in two cases:
+      //  Case 1: Metadata Table timeline is behind the instant-to-rollback.
       boolean shouldSkip = lastSyncTs.isPresent()
-          && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get());
+          && HoodieTimeline.compareTimestamps(instantToRollback, HoodieTimeline.GREATER_THAN, lastSyncTs.get());
 
       if (!hasNonZeroRollbackLogFiles && shouldSkip) {
         LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s",
-            rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get()));
+            instantToRollback, lastSyncTs.get()));
+        return;
+      }
+
+      // Case 2: The instant-to-rollback was never committed to Metadata Table. This can happen if the instant-to-rollback
+      // was a failed commit (never completed) as only completed instants are synced to Metadata Table.
+      // But the required Metadata Table instants should not have been archived
+      HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback);
+      if (metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) {
+        throw new HoodieMetadataException(String.format("The instant %s required to sync rollback of %s has been archived",
+            syncedInstant, instantToRollback));
+      }
+
+      shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant);
+      if (!hasNonZeroRollbackLogFiles && shouldSkip) {
+        LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, since this instant was never committed to Metadata Table",
+            instantToRollback));
         return;
       }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java
index 9ba3f26..b2aca1f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
@@ -49,16 +50,18 @@ public class TimelineMergedTableMetadata implements Serializable {
   private List<HoodieInstant> instants;
   private Option<String> lastSyncTs;
   private Set<String> mergeKeyFilter;
+  private HoodieActiveTimeline metadataTableTimeline;
 
   // keep it a simple hash map, so it can be easily passed onto the executors, once merged.
   protected final Map<String, HoodieRecord<? extends HoodieRecordPayload>> timelineMergedRecords;
 
-  public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, List<HoodieInstant> instants,
-                                     Option<String> lastSyncTs, Set<String> mergeKeyFilter) {
+  public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, HoodieActiveTimeline metadataTableTimeline,
+                                     List<HoodieInstant> instants, Option<String> lastSyncTs, Set<String> mergeKeyFilter) {
     this.metaClient = metaClient;
     this.instants = instants;
     this.lastSyncTs = lastSyncTs;
     this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet();
+    this.metadataTableTimeline = metadataTableTimeline;
     this.timelineMergedRecords = new HashMap<>();
 
     scan();
@@ -73,7 +76,8 @@ public class TimelineMergedTableMetadata implements Serializable {
   private void scan() {
     for (HoodieInstant instant : instants) {
       try {
-        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, lastSyncTs);
+        Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient,
+            metadataTableTimeline, instant, lastSyncTs);
         if (records.isPresent()) {
           records.get().forEach(record -> processNextRecord(record));
         }
@@ -112,4 +116,15 @@ public class TimelineMergedTableMetadata implements Serializable {
   public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) {
     return Option.ofNullable((HoodieRecord) timelineMergedRecords.get(key));
   }
+
+  /**
+   * Returns the timestamp of the latest synced instant.
+   */
+  public Option<String> getSyncedInstantTime() {
+    if (instants.isEmpty()) {
+      return Option.empty();
+    }
+
+    return Option.of(instants.get(instants.size() - 1).getTimestamp());
+  }
 }