You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/11 15:00:48 UTC
[hudi] branch master updated: [HUDI-5349] Clean up partially failed restore (#7605)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9a0e1ce4887 [HUDI-5349] Clean up partially failed restore (#7605)
9a0e1ce4887 is described below
commit 9a0e1ce4887891a4281503fb0049ac32dd785850
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Wed Jan 11 10:00:35 2023 -0500
[HUDI-5349] Clean up partially failed restore (#7605)
If a table was attempted w/ "restore" operation and if it failed mid-way, restore could still be lying around. when re-attempted, a new instant time will be allotted and re-attempted from scratch. but this may thwart compaction progression in MDT. so we need to ensure for a given savepoint, we always re-use restore instant if any.
Co-authored-by: Jonathan Vexler <=>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 34 +++++---
.../java/org/apache/hudi/table/HoodieTable.java | 8 +-
.../action/restore/BaseRestoreActionExecutor.java | 10 +--
.../restore/CopyOnWriteRestoreActionExecutor.java | 4 +-
.../restore/MergeOnReadRestoreActionExecutor.java | 4 +-
.../hudi/table/action/restore/RestoreUtils.java | 27 +++++++
.../action/rollback/RestorePlanActionExecutor.java | 15 ++--
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 4 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 10 +--
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 8 +-
.../hudi/table/HoodieSparkMergeOnReadTable.java | 4 +-
.../org/apache/hudi/client/TestClientRollback.java | 92 +++++++++++++++++++++-
hudi-common/src/main/avro/HoodieRestorePlan.avsc | 10 ++-
13 files changed, 184 insertions(+), 46 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 5d23918e1a3..04b70d5545c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -89,6 +89,7 @@ import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.restore.RestoreUtils;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -836,17 +837,19 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
* NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert
* the (inflight/committed) record changes for all commits after the provided instant time.
*
- * @param instantTime Instant time to which restoration is requested
+ * @param savepointToRestoreTimestamp savepoint intstant time to which restoration is requested
*/
- public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
- LOG.info("Begin restore to instant " + instantTime);
- final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ public HoodieRestoreMetadata restoreToInstant(final String savepointToRestoreTimestamp, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
+ LOG.info("Begin restore to instant " + savepointToRestoreTimestamp);
Timer.Context timerContext = metrics.getRollbackCtx();
try {
- HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
- Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
+ Pair<String, Option<HoodieRestorePlan>> timestampAndRestorePlan = scheduleAndGetRestorePlan(savepointToRestoreTimestamp, table);
+ final String restoreInstantTimestamp = timestampAndRestorePlan.getLeft();
+ Option<HoodieRestorePlan> restorePlanOption = timestampAndRestorePlan.getRight();
+
if (restorePlanOption.isPresent()) {
- HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
+ HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTimestamp, savepointToRestoreTimestamp);
if (timerContext != null) {
final long durationInMs = metrics.getDurationInMs(timerContext.stop());
final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
@@ -857,11 +860,24 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
}
return restoreMetadata;
} else {
- throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + instantTime);
+ throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + savepointToRestoreTimestamp);
}
} catch (Exception e) {
- throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
+ throw new HoodieRestoreException("Failed to restore to " + savepointToRestoreTimestamp, e);
+ }
+ }
+
+ /**
+ * Check if there is a failed restore with the same savepointToRestoreTimestamp. Reusing the commit instead of
+ * creating a new one will prevent causing some issues with the metadata table.
+ * */
+ private Pair<String, Option<HoodieRestorePlan>> scheduleAndGetRestorePlan(final String savepointToRestoreTimestamp, HoodieTable<T, I, K, O> table) throws IOException {
+ Option<HoodieInstant> failedRestore = table.getRestoreTimeline().filterInflightsAndRequested().lastInstant();
+ if (failedRestore.isPresent() && savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table, failedRestore.get()))) {
+ return Pair.of(failedRestore.get().getTimestamp(), Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(), failedRestore.get())));
}
+ final String restoreInstantTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, restoreInstantTimestamp, savepointToRestoreTimestamp));
}
/**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 3a31c80d428..ff9177ab7d2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -560,15 +560,15 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
* that would cause any running queries that are accessing file slices written after the instant to fail.
*/
public abstract HoodieRestoreMetadata restore(HoodieEngineContext context,
- String restoreInstantTime,
- String instantToRestore);
+ String restoreInstantTimestamp,
+ String savepointToRestoreTimestamp);
/**
* Schedules Restore for the table to the given instant.
*/
public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
- String restoreInstantTime,
- String instantToRestore);
+ String restoreInstantTimestamp,
+ String savepointToRestoreTimestamp);
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
rollbackInflightCompaction(inflightInstant, s -> Option.empty());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index dfcec915b00..d74432e8aa7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -52,16 +52,16 @@ public abstract class BaseRestoreActionExecutor<T, I, K, O> extends BaseActionEx
private static final Logger LOG = LogManager.getLogger(BaseRestoreActionExecutor.class);
- private final String restoreInstantTime;
+ private final String savepointToRestoreTimestamp;
private final TransactionManager txnManager;
public BaseRestoreActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
- String restoreInstantTime) {
+ String savepointToRestoreTimestamp) {
super(context, config, table, instantTime);
- this.restoreInstantTime = restoreInstantTime;
+ this.savepointToRestoreTimestamp = savepointToRestoreTimestamp;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
@@ -131,7 +131,7 @@ public abstract class BaseRestoreActionExecutor<T, I, K, O> extends BaseActionEx
// if not, rollbacks will be considered not completed and might hinder metadata table compaction.
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
.getReverseOrderedInstants()
- .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
+ .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
.collect(Collectors.toList());
instantsToRollback.forEach(entry -> {
if (entry.isCompleted()) {
@@ -140,7 +140,7 @@ public abstract class BaseRestoreActionExecutor<T, I, K, O> extends BaseActionEx
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
});
- LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
+ LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + savepointToRestoreTimestamp);
return restoreMetadata;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index 9dcb3409135..eea97e1e5ca 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -35,8 +35,8 @@ public class CopyOnWriteRestoreActionExecutor<T, I, K, O>
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
- String restoreInstantTime) {
- super(context, config, table, instantTime, restoreInstantTime);
+ String savepointToRestoreTimestamp) {
+ super(context, config, table, instantTime, savepointToRestoreTimestamp);
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index a7e5774515d..f449a120d57 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -31,8 +31,8 @@ import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
public class MergeOnReadRestoreActionExecutor<T, I, K, O>
extends BaseRestoreActionExecutor<T, I, K, O> {
public MergeOnReadRestoreActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table,
- String instantTime, String restoreInstantTime) {
- super(context, config, table, instantTime, restoreInstantTime);
+ String instantTime, String savepointToRestoreTimestamp) {
+ super(context, config, table, instantTime, savepointToRestoreTimestamp);
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
index 24c57a0709b..e71a8566f95 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import java.io.IOException;
@@ -43,4 +46,28 @@ public class RestoreUtils {
return TimelineMetadataUtils.deserializeAvroMetadata(
metaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
}
+
+ public static String getSavepointToRestoreTimestampV1Schema(HoodieTable table, HoodieRestorePlan plan) {
+ //get earliest rollback
+ String firstRollback = plan.getInstantsToRollback().get(plan.getInstantsToRollback().size() - 1).getCommitTime();
+ //find last instant before first rollback
+ Option<HoodieInstant> savepointInstance = table.getActiveTimeline().getSavePointTimeline().findInstantsBefore(firstRollback).lastInstant();
+ return savepointInstance.isPresent() ? savepointInstance.get().getTimestamp() : null;
+ }
+
+ /**
+ * Get the savepoint timestamp that this restore instant is restoring
+ * @param table the HoodieTable
+ * @param restoreInstant Instant referring to restore action
+ * @return timestamp of the savepoint we are restoring
+ * @throws IOException
+ *
+ * */
+ public static String getSavepointToRestoreTimestamp(HoodieTable table, HoodieInstant restoreInstant) throws IOException {
+ HoodieRestorePlan plan = getRestorePlan(table.getMetaClient(), restoreInstant);
+ if (plan.getVersion().compareTo(RestorePlanActionExecutor.RESTORE_PLAN_VERSION_1) > 0) {
+ return plan.getSavepointToRestoreTimestamp();
+ }
+ return getSavepointToRestoreTimestampV1Schema(table, plan);
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index 34617afc525..55f03aa4143 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -50,16 +50,17 @@ public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
private static final Logger LOG = LogManager.getLogger(RestorePlanActionExecutor.class);
public static final Integer RESTORE_PLAN_VERSION_1 = 1;
- public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_1;
- private final String restoreInstantTime;
+ public static final Integer RESTORE_PLAN_VERSION_2 = 2;
+ public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_2;
+ private final String savepointToRestoreTimestamp;
public RestorePlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
- String restoreInstantTime) {
+ String savepointToRestoreTimestamp) {
super(context, config, table, instantTime);
- this.restoreInstantTime = restoreInstantTime;
+ this.savepointToRestoreTimestamp = savepointToRestoreTimestamp;
}
@Override
@@ -72,13 +73,13 @@ public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
// filter only clustering related replacecommits (Not insert_overwrite related commits)
.filter(instant -> ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant))
.getReverseOrderedInstants()
- .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
+ .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
.collect(Collectors.toList());
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> commitInstantsToRollback = table.getActiveTimeline().getWriteTimeline()
.getReverseOrderedInstants()
- .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
+ .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
.filter(instant -> !pendingClusteringInstantsToRollback.contains(instant))
.collect(Collectors.toList());
@@ -87,7 +88,7 @@ public class RestorePlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
.map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
.collect(Collectors.toList());
- HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION);
+ HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION, savepointToRestoreTimestamp);
table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan));
table.getMetaClient().reloadActiveTimeline();
LOG.info("Requesting Restore with instant time " + restoreInstant);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 8e671b2ec5e..f09cf106db2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -344,12 +344,12 @@ public class HoodieFlinkCopyOnWriteTable<T>
}
@Override
- public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+ public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
throw new HoodieNotSupportedException("Restore is not supported yet");
}
@Override
- public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+ public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
throw new HoodieNotSupportedException("Savepoint and restore is not supported yet");
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 2cf2895f3f4..f3899b38590 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -250,16 +250,16 @@ public class HoodieJavaCopyOnWriteTable<T>
}
@Override
- public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+ public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+ return new RestorePlanActionExecutor(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
}
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context,
- String restoreInstantTime,
- String instantToRestore) {
+ String restoreInstantTimestamp,
+ String savepointToRestoreTimestamp) {
return new CopyOnWriteRestoreActionExecutor(
- context, config, this, restoreInstantTime, instantToRestore).execute();
+ context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 81b7cd00242..9691c8a9ab7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -286,12 +286,12 @@ public class HoodieSparkCopyOnWriteTable<T>
}
@Override
- public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new CopyOnWriteRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
+ public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+ return new CopyOnWriteRestoreActionExecutor<>(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
}
@Override
- public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new RestorePlanActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
+ public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+ return new RestorePlanActionExecutor<>(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 25511cd19ab..32ded270406 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -195,8 +195,8 @@ public class HoodieSparkMergeOnReadTable<T> extends HoodieSparkCopyOnWriteTable<
}
@Override
- public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new MergeOnReadRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
+ public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
+ return new MergeOnReadRestoreActionExecutor<>(context, config, this, restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index c810ff4f968..d226b5b995a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -54,6 +55,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -63,6 +66,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static org.apache.hudi.table.action.restore.RestoreUtils.getRestorePlan;
+import static org.apache.hudi.table.action.restore.RestoreUtils.getSavepointToRestoreTimestampV1Schema;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -75,11 +80,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class TestClientRollback extends HoodieClientTestBase {
+ private static Stream<Arguments> testSavepointAndRollbackParams() {
+ return Arrays.stream(new Boolean[][] {
+ {false, false}, {true, true}, {true, false},
+ }).map(Arguments::of);
+ }
+
/**
* Test case for rollback-savepoint interaction.
*/
- @Test
- public void testSavepointAndRollback() throws Exception {
+ @ParameterizedTest
+ @MethodSource("testSavepointAndRollbackParams")
+ public void testSavepointAndRollback(Boolean testFailedRestore, Boolean failedRestoreInflight) throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
@@ -175,9 +187,84 @@ public class TestClientRollback extends HoodieClientTestBase {
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals(0, dataFiles.size(), "The data files for commit 004 should be rolled back");
+
+ if (testFailedRestore) {
+ //test to make sure that restore commit is reused when the restore fails and is re-ran
+ HoodieInstant inst = table.getActiveTimeline().getRestoreTimeline().getInstants().get(0);
+ String restoreFileName = table.getMetaClient().getBasePathV2().toString() + "/.hoodie/" + inst.getFileName();
+
+ //delete restore commit file
+ assertTrue((new File(restoreFileName)).delete());
+
+ if (!failedRestoreInflight) {
+ //delete restore inflight file
+ assertTrue((new File(restoreFileName + ".inflight")).delete());
+ }
+ try (SparkRDDWriteClient newClient = getHoodieWriteClient(cfg)) {
+ //restore again
+ newClient.restoreToSavepoint(savepoint.getTimestamp());
+
+ //verify that we resuse the existing restore commit
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieSparkTable.create(getConfig(), context, metaClient);
+ List<HoodieInstant> restoreInstants = table.getActiveTimeline().getRestoreTimeline().getInstants();
+ assertEquals(1, restoreInstants.size());
+ assertEquals(HoodieInstant.State.COMPLETED, restoreInstants.get(0).getState());
+ assertEquals(inst.getTimestamp(), restoreInstants.get(0).getTimestamp());
+ }
+ }
}
}
+ private List<HoodieRecord> updateRecords(SparkRDDWriteClient client, List<HoodieRecord> records, String newCommitTime) throws IOException {
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> recs = dataGen.generateUpdates(newCommitTime, records);
+ List<WriteStatus> statuses = client.upsert(jsc.parallelize(recs, 1), newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+ return recs;
+ }
+
+ @Test
+ public void testGetSavepointOldSchema() throws Exception {
+ HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
+
+ /**
+ * Write 1 (only inserts)
+ */
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ records = updateRecords(client, records, "002");
+
+ client.savepoint("hoodie-unit-test", "test");
+
+
+ records = updateRecords(client, records, "003");
+ updateRecords(client, records, "004");
+
+ // rollback to savepoint 002
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
+ HoodieInstant savepoint = table.getCompletedSavepointTimeline().lastInstant().get();
+ client.restoreToSavepoint(savepoint.getTimestamp());
+
+ //verify that getSavepointToRestoreTimestampV1Schema is correct
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieSparkTable.create(getConfig(), context, metaClient);
+ HoodieRestorePlan plan = getRestorePlan(metaClient, table.getActiveTimeline().getRestoreTimeline().lastInstant().get());
+ assertEquals("002", getSavepointToRestoreTimestampV1Schema(table, plan));
+ }
+ }
+
/**
* Test case for rollback-savepoint with KEEP_LATEST_FILE_VERSIONS policy.
*/
@@ -740,4 +827,5 @@ public class TestClientRollback extends HoodieClientTestBase {
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
}
}
+
}
diff --git a/hudi-common/src/main/avro/HoodieRestorePlan.avsc b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
index 1ad9e6a4b9c..9f93986eb8f 100644
--- a/hudi-common/src/main/avro/HoodieRestorePlan.avsc
+++ b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
@@ -32,6 +32,12 @@
{
"name":"version",
"type":["int", "null"],
- "default": 1
- }]
+ "default": 2
+ },
+ {
+ "name": "savepointToRestoreTimestamp",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
}
\ No newline at end of file