You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/10 13:06:42 UTC

[hudi] branch master updated: [HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e7ec3a8  [HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)
e7ec3a8 is described below

commit e7ec3a82dc274b8d683d74e59ee7bf35d7827ce0
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Thu Feb 10 08:06:23 2022 -0500

    [HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)
    
    - This adds a restore plan and serializes it to restore.requested meta file in timeline. This also means that we are introducing schedule and execution phases for restore which was not present before.
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 24 ++++---
 .../hudi/exception/HoodieRestoreException.java     |  4 ++
 .../java/org/apache/hudi/table/HoodieTable.java    | 15 ++++
 .../action/restore/BaseRestoreActionExecutor.java  | 59 +++++++++++----
 .../hudi/table/action/restore/RestoreUtils.java    | 46 ++++++++++++
 .../action/rollback/RestorePlanActionExecutor.java | 84 ++++++++++++++++++++++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  6 ++
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  7 ++
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  7 ++
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |  2 +
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  1 +
 .../src/main/avro/HoodieRestorePlan.avsc           | 31 +++++---
 .../table/timeline/HoodieActiveTimeline.java       | 29 +++++++-
 .../hudi/common/table/timeline/HoodieInstant.java  |  1 +
 .../hudi/common/table/timeline/HoodieTimeline.java |  5 ++
 .../table/timeline/TimelineMetadataUtils.java      |  5 ++
 16 files changed, 292 insertions(+), 34 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 271f8a3..2414a9f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.callback.HoodieWriteCommitCallback;
@@ -690,16 +691,21 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     Timer.Context timerContext = metrics.getRollbackCtx();
     try {
       HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
-      HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
-      if (timerContext != null) {
-        final long durationInMs = metrics.getDurationInMs(timerContext.stop());
-        final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
-            .flatMap(Collection::stream)
-            .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
-            .sum();
-        metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
+      Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
+      if (restorePlanOption.isPresent()) {
+        HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
+        if (timerContext != null) {
+          final long durationInMs = metrics.getDurationInMs(timerContext.stop());
+          final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
+              .flatMap(Collection::stream)
+              .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
+              .sum();
+          metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
+        }
+        return restoreMetadata;
+      } else {
+        throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + instantTime);
       }
-      return restoreMetadata;
     } catch (Exception e) {
       throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
index c6c9076..baad53a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
@@ -23,4 +23,8 @@ public class HoodieRestoreException extends HoodieException {
   public HoodieRestoreException(String msg, Throwable e) {
     super(msg, e);
   }
+
+  public HoodieRestoreException(String msg) {
+    super(msg);
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index f19b6aa..639467f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -347,6 +348,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
   }
 
   /**
+   * Get restore timeline.
+   */
+  public HoodieTimeline getRestoreTimeline() {
+    return getActiveTimeline().getRestoreTimeline();
+  }
+
+  /**
    * Get only the completed (no-inflights) savepoint timeline.
    */
   public HoodieTimeline getCompletedSavepointTimeline() {
@@ -498,6 +506,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
                                                 String instantToRestore);
 
   /**
+   * Schedules Restore for the table to the given instant.
+   */
+  public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
+                                                    String restoreInstantTime,
+                                                    String instantToRestore);
+
+  /**
    * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
    * to the .requested file.
    *
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 58247bb..9025623 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.action.restore;
 
+import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -29,14 +31,18 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -65,27 +71,51 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
     HoodieTimer restoreTimer = new HoodieTimer();
     restoreTimer.startTimer();
 
-    // Get all the commits on the timeline after the provided commit time
-    List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
-        .getReverseOrderedInstants()
-        .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
-        .collect(Collectors.toList());
+    Option<HoodieInstant> restoreInstant = table.getRestoreTimeline()
+        .filterInflightsAndRequested()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .firstInstant();
+    if (!restoreInstant.isPresent()) {
+      throw new HoodieRollbackException("No pending restore instants found to execute restore");
+    }
+    try {
+      List<HoodieInstant> instantsToRollback = getInstantsToRollback(restoreInstant.get());
+      ValidationUtils.checkArgument(restoreInstant.get().getState().equals(HoodieInstant.State.REQUESTED)
+          || restoreInstant.get().getState().equals(HoodieInstant.State.INFLIGHT));
+      Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
+      if (restoreInstant.get().isRequested()) {
+        table.getActiveTimeline().transitionRestoreRequestedToInflight(restoreInstant.get());
+      }
 
-    Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
-    table.getActiveTimeline().createNewInstant(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime));
-    instantsToRollback.forEach(instant -> {
-      instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
-      LOG.info("Deleted instant " + instant);
-    });
+      instantsToRollback.forEach(instant -> {
+        instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
+        LOG.info("Deleted instant " + instant);
+      });
 
-    try {
       return finishRestore(instantToMetadata,
           instantsToRollback,
           restoreTimer.endTimer()
       );
     } catch (IOException io) {
-      throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
+      throw new HoodieRestoreException("unable to Restore instant " + restoreInstant.get(), io);
+    }
+  }
+
+  private List<HoodieInstant> getInstantsToRollback(HoodieInstant restoreInstant) throws IOException {
+    List<HoodieInstant> instantsToRollback = new ArrayList<>();
+    HoodieRestorePlan restorePlan = RestoreUtils.getRestorePlan(table.getMetaClient(), restoreInstant);
+    for (HoodieInstantInfo instantInfo : restorePlan.getInstantsToRollback()) {
+      // If restore crashed mid-way, there are chances that some commits are already rolled back,
+      // but some are not. so, we can ignore those commits which are fully rolledback in previous attempt if any.
+      Option<HoodieInstant> rollbackInstantOpt = table.getActiveTimeline().getWriteTimeline()
+          .filter(instant -> instant.getTimestamp().equals(instantInfo.getCommitTime()) && instant.getAction().equals(instantInfo.getAction())).firstInstant();
+      if (rollbackInstantOpt.isPresent()) {
+        instantsToRollback.add(rollbackInstantOpt.get());
+      } else {
+        LOG.warn("Ignoring already rolledback instant " + instantInfo.toString());
+      }
     }
+    return instantsToRollback;
   }
 
   protected abstract HoodieRollbackMetadata rollbackInstant(HoodieInstant rollbackInstant);
@@ -99,7 +129,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
     writeToMetadata(restoreMetadata);
     table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
         TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
-    // get all rollbacks instants after restore instant time and delete them.
+    // get all pending rollbacks instants after restore instant time and delete them.
     // if not, rollbacks will be considered not completed and might hinder metadata table compaction.
     List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
         .getReverseOrderedInstants()
@@ -115,6 +145,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
 
   /**
    * Update metadata table if available. Any update to metadata table happens within data table lock.
+   *
    * @param restoreMetadata instance of {@link HoodieRestoreMetadata} to be applied to metadata.
    */
   private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
new file mode 100644
index 0000000..24c57a0
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.restore;
+
+import org.apache.hudi.avro.model.HoodieRestorePlan;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+
+import java.io.IOException;
+
+public class RestoreUtils {
+
+  /**
+   * Get Latest version of Restore plan corresponding to a restore instant.
+   *
+   * @param metaClient      Hoodie Table Meta Client
+   * @param restoreInstant Instant referring to restore action
+   * @return Rollback plan corresponding to rollback instant
+   * @throws IOException
+   */
+  public static HoodieRestorePlan getRestorePlan(HoodieTableMetaClient metaClient, HoodieInstant restoreInstant)
+      throws IOException {
+    final HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(restoreInstant);
+    return TimelineMetadataUtils.deserializeAvroMetadata(
+        metaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
new file mode 100644
index 0000000..a25eaed
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Plans the restore action and add a restore.requested meta file to timeline.
+ */
+public class RestorePlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {
+
+
+  private static final Logger LOG = LogManager.getLogger(RestorePlanActionExecutor.class);
+
+  public static final Integer RESTORE_PLAN_VERSION_1 = 1;
+  public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_1;
+  private final String restoreInstantTime;
+
+  public RestorePlanActionExecutor(HoodieEngineContext context,
+                                   HoodieWriteConfig config,
+                                   HoodieTable<T, I, K, O> table,
+                                   String instantTime,
+                                   String restoreInstantTime) {
+    super(context, config, table, instantTime);
+    this.restoreInstantTime = restoreInstantTime;
+  }
+
+  @Override
+  public Option<HoodieRestorePlan> execute() {
+    final HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
+    try {
+      // Get all the commits on the timeline after the provided commit time
+      List<HoodieInstantInfo> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
+          .getReverseOrderedInstants()
+          .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)).map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
+          .collect(Collectors.toList());
+
+      HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION);
+      table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan));
+      table.getMetaClient().reloadActiveTimeline();
+      LOG.info("Requesting Restore with instant time " + restoreInstant);
+      return Option.of(restorePlan);
+    } catch (IOException e) {
+      LOG.error("Got exception when saving restore requested file", e);
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9aceffe..7e41ab1 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -343,6 +344,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
   }
 
   @Override
+  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+    throw new HoodieNotSupportedException("Restore is not supported yet");
+  }
+
+  @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
     throw new HoodieNotSupportedException("Savepoint and restore is not supported yet");
   }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 62a6980..f8590e9 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -63,6 +64,7 @@ import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor
 import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
 import org.slf4j.Logger;
@@ -248,6 +250,11 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
   }
 
   @Override
+  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+    return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+  }
+
+  @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context,
                                        String restoreInstantTime,
                                        String instantToRestore) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 12c0483..6a33055 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -74,6 +75,7 @@ import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecuto
 import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
 import org.apache.avro.Schema;
@@ -258,6 +260,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
+    new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
     new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
@@ -353,4 +356,8 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
     return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
   }
 
+  @Override
+  public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+    return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 75af5d0..334efa7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -52,6 +52,7 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitAct
 import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 
 import org.apache.spark.api.java.JavaRDD;
 
@@ -150,6 +151,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
 
   @Override
   public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
+    new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
     new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 7334f4c..a1d7569 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -576,6 +576,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
         TimelineLayoutVersion.CURR_VERSION).build();
     client = getHoodieWriteClient(newConfig);
+
     client.restoreToInstant("004");
 
     assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
similarity index 61%
copy from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
copy to hudi-common/src/main/avro/HoodieRestorePlan.avsc
index c6c9076..1ad9e6a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
+++ b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,12 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.hudi.exception;
-
-public class HoodieRestoreException extends HoodieException {
-
-  public HoodieRestoreException(String msg, Throwable e) {
-    super(msg, e);
-  }
-}
+{
+   "namespace":"org.apache.hudi.avro.model",
+   "type":"record",
+   "name":"HoodieRestorePlan",
+   "fields":[
+    {
+           "name": "instantsToRollback",
+           "default": [],
+           "type": {
+             "type": "array",
+             "default": null,
+             "items": "HoodieInstantInfo"
+           }
+    },
+    {
+           "name":"version",
+           "type":["int", "null"],
+           "default": 1
+    }]
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index c7473bd..1fa3845 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -70,7 +70,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
       SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
       CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
       INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
-      INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
+      REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
       ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
       REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
   private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
@@ -289,6 +289,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
   }
 
+  public Option<byte[]> readRestoreInfoAsBytes(HoodieInstant instant) {
+    // Rollback metadata are always stored only in timeline .hoodie
+    return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
+  }
+
   //-----------------------------------------------------------------
   //      BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
   //-----------------------------------------------------------------
@@ -430,6 +435,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   /**
+   * Transition Restore State from requested to inflight.
+   *
+   * @param requestedInstant requested instant
+   * @return commit instant
+   */
+  public HoodieInstant transitionRestoreRequestedToInflight(HoodieInstant requestedInstant) {
+    ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.RESTORE_ACTION), "Transition to inflight requested for a restore instant with diff action "
+        + requestedInstant.toString());
+    ValidationUtils.checkArgument(requestedInstant.isRequested(), "Transition to inflight requested for an instant not in requested state " + requestedInstant.toString());
+    HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, RESTORE_ACTION, requestedInstant.getTimestamp());
+    transitionState(requestedInstant, inflight, Option.empty());
+    return inflight;
+  }
+
+  /**
    * Transition replace requested file to replace inflight.
    *
    * @param requestedInstant Requested instant
@@ -599,6 +619,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     createFileInMetaPath(instant.getFileName(), content, false);
   }
 
+  public void saveToRestoreRequested(HoodieInstant instant, Option<byte[]> content) {
+    ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.RESTORE_ACTION));
+    ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
+    // Plan is stored in meta path
+    createFileInMetaPath(instant.getFileName(), content, false);
+  }
+
   private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
     Path fullPath = new Path(metaClient.getMetaPath(), filename);
     if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index a8df62c..9cd0883 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -166,6 +166,7 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
       }
     } else if (HoodieTimeline.RESTORE_ACTION.equals(action)) {
       return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp)
+          : isRequested() ? HoodieTimeline.makeRequestedRestoreFileName(timestamp)
           : HoodieTimeline.makeRestoreFileName(timestamp);
     } else if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)) {
       return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 6ea44a8..25b9c2e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -78,6 +78,7 @@ public interface HoodieTimeline extends Serializable {
   String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
   String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
   String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION);
+  String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION;
   String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION;
   String RESTORE_EXTENSION = "." + RESTORE_ACTION;
   String INFLIGHT_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + INFLIGHT_EXTENSION;
@@ -386,6 +387,10 @@ public interface HoodieTimeline extends Serializable {
     return StringUtils.join(instant, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION);
   }
 
+  static String makeRequestedRestoreFileName(String instant) {
+    return StringUtils.join(instant, HoodieTimeline.REQUESTED_RESTORE_EXTENSION);
+  }
+
   static String makeInflightRollbackFileName(String instant) {
     return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 723c594..70a23f1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
@@ -112,6 +113,10 @@ public class TimelineMetadataUtils {
     return serializeAvroMetadata(rollbackPlan, HoodieRollbackPlan.class);
   }
 
+  public static Option<byte[]> serializeRestorePlan(HoodieRestorePlan restorePlan) throws IOException {
+    return serializeAvroMetadata(restorePlan, HoodieRestorePlan.class);
+  }
+
   public static Option<byte[]> serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException {
     return serializeAvroMetadata(metadata, HoodieCleanMetadata.class);
   }