You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/11 15:00:48 UTC

[hudi] branch master updated: [HUDI-5349] Clean up partially failed restore (#7605)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a0e1ce4887 [HUDI-5349] Clean up partially failed restore (#7605)
9a0e1ce4887 is described below

commit 9a0e1ce4887891a4281503fb0049ac32dd785850
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Wed Jan 11 10:00:35 2023 -0500

    [HUDI-5349] Clean up partially failed restore (#7605)
    
    If a table was attempted w/ "restore" operation and if it failed mid-way, restore could still be lying around. when re-attempted, a new instant time will be allotted and re-attempted from scratch. but this may thwart compaction progression in MDT. so we need to ensure for a given savepoint, we always re-use restore instant if any.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 34 +++++---
 .../java/org/apache/hudi/table/HoodieTable.java    |  8 +-
 .../action/restore/BaseRestoreActionExecutor.java  | 10 +--
 .../restore/CopyOnWriteRestoreActionExecutor.java  |  4 +-
 .../restore/MergeOnReadRestoreActionExecutor.java  |  4 +-
 .../hudi/table/action/restore/RestoreUtils.java    | 27 +++++++
 .../action/rollback/RestorePlanActionExecutor.java | 15 ++--
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  4 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     | 10 +--
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  8 +-
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |  4 +-
 .../org/apache/hudi/client/TestClientRollback.java | 92 +++++++++++++++++++++-
 hudi-common/src/main/avro/HoodieRestorePlan.avsc   | 10 ++-
 13 files changed, 184 insertions(+), 46 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 5d23918e1a3..04b70d5545c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -89,6 +89,7 @@ import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.restore.RestoreUtils;
 import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.action.savepoint.SavepointHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -836,17 +837,19 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    * NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert
    * the (inflight/committed) record changes for all commits after the provided instant time.
    *
-   * @param instantTime Instant time to which restoration is requested
+   * @param savepointToRestoreTimestamp savepoint intstant time to which restoration is requested
    */
-  public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
-    LOG.info("Begin restore to instant " + instantTime);
-    final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
+  public HoodieRestoreMetadata restoreToInstant(final String savepointToRestoreTimestamp, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
+    LOG.info("Begin restore to instant " + savepointToRestoreTimestamp);
     Timer.Context timerContext = metrics.getRollbackCtx();
     try {
-      HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
-      Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
+      HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
+      Pair<String, Option<HoodieRestorePlan>> timestampAndRestorePlan = scheduleAndGetRestorePlan(savepointToRestoreTimestamp, table);
+      final String restoreInstantTimestamp = timestampAndRestorePlan.getLeft();
+      Option<HoodieRestorePlan> restorePlanOption = timestampAndRestorePlan.getRight();
+
       if (restorePlanOption.isPresent()) {
-        HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
+        HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTimestamp, savepointToRestoreTimestamp);
         if (timerContext != null) {
           final long durationInMs = metrics.getDurationInMs(timerContext.stop());
           final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
@@ -857,11 +860,24 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
         }
         return restoreMetadata;
       } else {
-        throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + instantTime);
+        throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + savepointToRestoreTimestamp);
       }
     } catch (Exception e) {
-      throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
+      throw new HoodieRestoreException("Failed to restore to " + savepointToRestoreTimestamp, e);
+    }
+  }
+
+  /**
+   * Check if there is a failed restore with the same savepointToRestoreTimestamp. Reusing the commit instead of
+   * creating a new one will prevent causing some issues with the metadata table.
+   * */
+  private Pair<String, Option<HoodieRestorePlan>> scheduleAndGetRestorePlan(final String savepointToRestoreTimestamp, HoodieTable<T, I, K, O> table) throws IOException {
+    Option<HoodieInstant> failedRestore = table.getRestoreTimeline().filterInflightsAndRequested().lastInstant();
+    if (failedRestore.isPresent() && savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table, failedRestore.get()))) {
+      return Pair.of(failedRestore.get().getTimestamp(), Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(), failedRestore.get())));
     }
+    final String restoreInstantTimestamp = HoodieActiveTimeline.createNewInstantTime();
+    return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, restoreInstantTimestamp, savepointToRestoreTimestamp));
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 3a31c80d428..ff9177ab7d2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -560,15 +560,15 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
    * that would cause any running queries that are accessing file slices written after the instant to fail.
    */
   public abstract HoodieRestoreMetadata restore(HoodieEngineContext context,
-                                                String restoreInstantTime,
-                                                String instantToRestore);
+                                                String restoreInstantTimestamp,
+                                                String savepointToRestoreTimestamp);
 
   /**
    * Schedules Restore for the table to the given instant.
    */
   public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
-                                                    String restoreInstantTime,
-                                                    String instantToRestore);
+                                                    String restoreInstantTimestamp,
+                                                    String savepointToRestoreTimestamp);
 
   public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
     rollbackInflightCompaction(inflightInstant, s -> Option.empty());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index dfcec915b00..d74432e8aa7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -52,16 +52,16 @@ public abstract class BaseRestoreActionExecutor<T, I, K, O> extends BaseActionEx
 
   private static final Logger LOG = LogManager.getLogger(BaseRestoreActionExecutor.class);
 
-  private final String restoreInstantTime;
+  private final String savepointToRestoreTimestamp;
   private final TransactionManager txnManager;
 
   public BaseRestoreActionExecutor(HoodieEngineContext context,
                                    HoodieWriteConfig config,
                                    HoodieTable<T, I, K, O> table,
                                    String instantTime,
-                                   String restoreInstantTime) {
+                                   String savepointToRestoreTimestamp) {
     super(context, config, table, instantTime);
-    this.restoreInstantTime = restoreInstantTime;
+    this.savepointToRestoreTimestamp = savepointToRestoreTimestamp;
     this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
   }
 
@@ -131,7 +131,7 @@ public abstract class BaseRestoreActionExecutor<T, I, K, O> extends BaseActionEx
     // if not, rollbacks will be considered not completed and might hinder metadata table compaction.
     List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
         .getReverseOrderedInstants()
-        .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
+        .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
         .collect(Collectors.toList());
     instantsToRollback.forEach(entry -> {
       if (entry.isCompleted()) {
@@ -140,7 +140,7 @@ public abstract class BaseRestoreActionExecutor<T, I, K, O> extends BaseActionEx
       table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
       table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
     });
-    LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
+    LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + savepointToRestoreTimestamp);
     return restoreMetadata;
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index 9dcb3409135..eea97e1e5ca 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -35,8 +35,8 @@ public class CopyOnWriteRestoreActionExecutor<T, I, K, O>
                                           HoodieWriteConfig config,
                                           HoodieTable table,
                                           String instantTime,
-                                          String restoreInstantTime) {
-    super(context, config, table, instantTime, restoreInstantTime);
+                                          String savepointToRestoreTimestamp) {
+    super(context, config, table, instantTime, savepointToRestoreTimestamp);
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index a7e5774515d..f449a120d57 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -31,8 +31,8 @@ import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 public class MergeOnReadRestoreActionExecutor<T, I, K, O>
     extends BaseRestoreActionExecutor<T, I, K, O> {
   public MergeOnReadRestoreActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table,
-                                          String instantTime, String restoreInstantTime) {
-    super(context, config, table, instantTime, restoreInstantTime);
+                                          String instantTime, String savepointToRestoreTimestamp) {
+    super(context, config, table, instantTime, savepointToRestoreTimestamp);
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
index 24c57a0709b..e71a8566f95 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 
 import java.io.IOException;
 
@@ -43,4 +46,28 @@ public class RestoreUtils {
     return TimelineMetadataUtils.deserializeAvroMetadata(
         metaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
   }
+
+  public static String getSavepointToRestoreTimestampV1Schema(HoodieTable table, HoodieRestorePlan plan) {
+    //get earliest rollback
+    String firstRollback = plan.getInstantsToRollback().get(plan.getInstantsToRollback().size() - 1).getCommitTime();
+    //find last instant before first rollback
+    Option<HoodieInstant> savepointInstance = table.getActiveTimeline().getSavePointTimeline().findInstantsBefore(firstRollback).lastInstant();
+    return savepointInstance.isPresent() ? savepointInstance.get().getTimestamp() : null;
+  }
+
+  /**
+   * Get the savepoint timestamp that this restore instant is restoring
+   * @param table          the HoodieTable
+   * @param restoreInstant Instant referring to restore action
+   * @return timestamp of the savepoint we are restoring
+   * @throws IOException
+   *
+   * */
+  public static String getSavepointToRestoreTimestamp(HoodieTable table, HoodieInstant restoreInstant) throws IOException {
+    HoodieRestorePlan plan = getRestorePlan(table.getMetaClient(), restoreInstant);
+    if (plan.getVersion().compareTo(RestorePlanActionExecutor.RESTORE_PLAN_VERSION_1) > 0) {
+      return plan.getSavepointToRestoreTimestamp();
+    }
+    return getSavepointToRestoreTimestampV1Schema(table, plan);
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index 34617afc525..55f03aa4143 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -50,16 +50,17 @@ public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
   private static final Logger LOG = LogManager.getLogger(RestorePlanActionExecutor.class);
 
   public static final Integer RESTORE_PLAN_VERSION_1 = 1;
-  public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_1;
-  private final String restoreInstantTime;
+  public static final Integer RESTORE_PLAN_VERSION_2 = 2;
+  public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_2;
+  private final String savepointToRestoreTimestamp;
 
   public RestorePlanActionExecutor(HoodieEngineContext context,
                                    HoodieWriteConfig config,
                                    HoodieTable<T, I, K, O> table,
                                    String instantTime,
-                                   String restoreInstantTime) {
+                                   String savepointToRestoreTimestamp) {
     super(context, config, table, instantTime);
-    this.restoreInstantTime = restoreInstantTime;
+    this.savepointToRestoreTimestamp = savepointToRestoreTimestamp;
   }
 
   @Override
@@ -72,13 +73,13 @@ public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
               // filter only clustering related replacecommits (Not insert_overwrite related commits)
               .filter(instant -> ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant))
               .getReverseOrderedInstants()
-              .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
+              .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
               .collect(Collectors.toList());
 
       // Get all the commits on the timeline after the provided commit time
       List<HoodieInstant> commitInstantsToRollback = table.getActiveTimeline().getWriteTimeline()
               .getReverseOrderedInstants()
-              .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
+              .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
               .filter(instant -> !pendingClusteringInstantsToRollback.contains(instant))
               .collect(Collectors.toList());
 
@@ -87,7 +88,7 @@ public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
               .map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
               .collect(Collectors.toList());
 
-      HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION);
+      HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION, savepointToRestoreTimestamp);
       table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan));
       table.getMetaClient().reloadActiveTimeline();
       LOG.info("Requesting Restore with instant time " + restoreInstant);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 8e671b2ec5e..f09cf106db2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -344,12 +344,12 @@ public class HoodieFlinkCopyOnWriteTable<T>
   }
 
   @Override
-  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
     throw new HoodieNotSupportedException("Restore is not supported yet");
   }
 
   @Override
-  public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+  public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
     throw new HoodieNotSupportedException("Savepoint and restore is not supported yet");
   }
 
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 2cf2895f3f4..f3899b38590 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -250,16 +250,16 @@ public class HoodieJavaCopyOnWriteTable<T>
   }
 
   @Override
-  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+    return new RestorePlanActionExecutor(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
   }
 
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context,
-                                       String restoreInstantTime,
-                                       String instantToRestore) {
+                                       String restoreInstantTimestamp,
+                                       String savepointToRestoreTimestamp) {
     return new CopyOnWriteRestoreActionExecutor(
-        context, config, this, restoreInstantTime, instantToRestore).execute();
+        context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 81b7cd00242..9691c8a9ab7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -286,12 +286,12 @@ public class HoodieSparkCopyOnWriteTable<T>
   }
 
   @Override
-  public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new CopyOnWriteRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
+  public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+    return new CopyOnWriteRestoreActionExecutor<>(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
   }
 
   @Override
-  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new RestorePlanActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
+  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+    return new RestorePlanActionExecutor<>(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 25511cd19ab..32ded270406 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -195,8 +195,8 @@ public class HoodieSparkMergeOnReadTable<T> extends HoodieSparkCopyOnWriteTable<
   }
 
   @Override
-  public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
-    return new MergeOnReadRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
+  public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+    return new MergeOnReadRestoreActionExecutor<>(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index c810ff4f968..d226b5b995a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -54,6 +55,8 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -63,6 +66,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static org.apache.hudi.table.action.restore.RestoreUtils.getRestorePlan;
+import static org.apache.hudi.table.action.restore.RestoreUtils.getSavepointToRestoreTimestampV1Schema;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -75,11 +80,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class TestClientRollback extends HoodieClientTestBase {
 
+  private static Stream<Arguments> testSavepointAndRollbackParams() {
+    return Arrays.stream(new Boolean[][] {
+        {false, false}, {true, true}, {true, false},
+    }).map(Arguments::of);
+  }
+
   /**
    * Test case for rollback-savepoint interaction.
    */
-  @Test
-  public void testSavepointAndRollback() throws Exception {
+  @ParameterizedTest
+  @MethodSource("testSavepointAndRollbackParams")
+  public void testSavepointAndRollback(Boolean testFailedRestore, Boolean failedRestoreInflight) throws Exception {
     HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
         .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
@@ -175,9 +187,84 @@ public class TestClientRollback extends HoodieClientTestBase {
 
       dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
       assertEquals(0, dataFiles.size(), "The data files for commit 004 should be rolled back");
+
+      if (testFailedRestore) {
+        //test to make sure that restore commit is reused when the restore fails and is re-ran
+        HoodieInstant inst =  table.getActiveTimeline().getRestoreTimeline().getInstants().get(0);
+        String restoreFileName = table.getMetaClient().getBasePathV2().toString() + "/.hoodie/" +  inst.getFileName();
+
+        //delete restore commit file
+        assertTrue((new File(restoreFileName)).delete());
+
+        if (!failedRestoreInflight) {
+          //delete restore inflight file
+          assertTrue((new File(restoreFileName + ".inflight")).delete());
+        }
+        try (SparkRDDWriteClient newClient = getHoodieWriteClient(cfg)) {
+          //restore again
+          newClient.restoreToSavepoint(savepoint.getTimestamp());
+
+          //verify that we resuse the existing restore commit
+          metaClient = HoodieTableMetaClient.reload(metaClient);
+          table = HoodieSparkTable.create(getConfig(), context, metaClient);
+          List<HoodieInstant> restoreInstants = table.getActiveTimeline().getRestoreTimeline().getInstants();
+          assertEquals(1, restoreInstants.size());
+          assertEquals(HoodieInstant.State.COMPLETED, restoreInstants.get(0).getState());
+          assertEquals(inst.getTimestamp(), restoreInstants.get(0).getTimestamp());
+        }
+      }
     }
   }
 
+  private List<HoodieRecord> updateRecords(SparkRDDWriteClient client, List<HoodieRecord> records, String newCommitTime) throws IOException {
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> recs = dataGen.generateUpdates(newCommitTime, records);
+    List<WriteStatus> statuses = client.upsert(jsc.parallelize(recs, 1), newCommitTime).collect();
+    assertNoWriteErrors(statuses);
+    return recs;
+  }
+
+  @Test
+  public void testGetSavepointOldSchema() throws Exception {
+    HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
+        .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
+
+      /**
+       * Write 1 (only inserts)
+       */
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+      List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      records = updateRecords(client, records, "002");
+
+      client.savepoint("hoodie-unit-test", "test");
+
+
+      records = updateRecords(client, records, "003");
+      updateRecords(client, records, "004");
+
+      // rollback to savepoint 002
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
+      HoodieInstant savepoint = table.getCompletedSavepointTimeline().lastInstant().get();
+      client.restoreToSavepoint(savepoint.getTimestamp());
+
+      //verify that getSavepointToRestoreTimestampV1Schema is correct
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      table = HoodieSparkTable.create(getConfig(), context, metaClient);
+      HoodieRestorePlan plan = getRestorePlan(metaClient, table.getActiveTimeline().getRestoreTimeline().lastInstant().get());
+      assertEquals("002", getSavepointToRestoreTimestampV1Schema(table, plan));
+    }
+  }
+  
   /**
    * Test case for rollback-savepoint with KEEP_LATEST_FILE_VERSIONS policy.
    */
@@ -740,4 +827,5 @@ public class TestClientRollback extends HoodieClientTestBase {
       assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
     }
   }
+
 }
diff --git a/hudi-common/src/main/avro/HoodieRestorePlan.avsc b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
index 1ad9e6a4b9c..9f93986eb8f 100644
--- a/hudi-common/src/main/avro/HoodieRestorePlan.avsc
+++ b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
@@ -32,6 +32,12 @@
     {
            "name":"version",
            "type":["int", "null"],
-           "default": 1
-    }]
+           "default": 2
+    },
+    {
+           "name": "savepointToRestoreTimestamp",
+           "type": ["null", "string"],
+           "default": null
+    }
+    ]
 }
\ No newline at end of file