You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/10 13:06:42 UTC
[hudi] branch master updated: [HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e7ec3a8 [HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)
e7ec3a8 is described below
commit e7ec3a82dc274b8d683d74e59ee7bf35d7827ce0
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Thu Feb 10 08:06:23 2022 -0500
[HUDI-2432] Adding restore.requested instant and restore plan for restore action (#4605)
- This adds a restore plan and serializes it to restore.requested meta file in timeline. This also means that we are introducing schedule and execution phases for restore which was not present before.
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 24 ++++---
.../hudi/exception/HoodieRestoreException.java | 4 ++
.../java/org/apache/hudi/table/HoodieTable.java | 15 ++++
.../action/restore/BaseRestoreActionExecutor.java | 59 +++++++++++----
.../hudi/table/action/restore/RestoreUtils.java | 46 ++++++++++++
.../action/rollback/RestorePlanActionExecutor.java | 84 ++++++++++++++++++++++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 6 ++
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 7 ++
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 7 ++
.../hudi/table/HoodieSparkMergeOnReadTable.java | 2 +
.../TestHoodieClientOnCopyOnWriteStorage.java | 1 +
.../src/main/avro/HoodieRestorePlan.avsc | 31 +++++---
.../table/timeline/HoodieActiveTimeline.java | 29 +++++++-
.../hudi/common/table/timeline/HoodieInstant.java | 1 +
.../hudi/common/table/timeline/HoodieTimeline.java | 5 ++
.../table/timeline/TimelineMetadataUtils.java | 5 ++
16 files changed, 292 insertions(+), 34 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 271f8a3..2414a9f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
@@ -690,16 +691,21 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
- HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
- if (timerContext != null) {
- final long durationInMs = metrics.getDurationInMs(timerContext.stop());
- final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
- .flatMap(Collection::stream)
- .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
- .sum();
- metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
+ Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
+ if (restorePlanOption.isPresent()) {
+ HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
+ if (timerContext != null) {
+ final long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
+ .flatMap(Collection::stream)
+ .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
+ .sum();
+ metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
+ }
+ return restoreMetadata;
+ } else {
+ throw new HoodieRestoreException("Failed to restore " + config.getBasePath() + " to commit " + instantTime);
}
- return restoreMetadata;
} catch (Exception e) {
throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
index c6c9076..baad53a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
@@ -23,4 +23,8 @@ public class HoodieRestoreException extends HoodieException {
public HoodieRestoreException(String msg, Throwable e) {
super(msg, e);
}
+
+ public HoodieRestoreException(String msg) {
+ super(msg);
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index f19b6aa..639467f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -347,6 +348,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
}
/**
+ * Get restore timeline.
+ */
+ public HoodieTimeline getRestoreTimeline() {
+ return getActiveTimeline().getRestoreTimeline();
+ }
+
+ /**
* Get only the completed (no-inflights) savepoint timeline.
*/
public HoodieTimeline getCompletedSavepointTimeline() {
@@ -498,6 +506,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
String instantToRestore);
/**
+ * Schedules Restore for the table to the given instant.
+ */
+ public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
+ String restoreInstantTime,
+ String instantToRestore);
+
+ /**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
* to the .requested file.
*
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 58247bb..9025623 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -18,7 +18,9 @@
package org.apache.hudi.table.action.restore;
+import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -29,14 +31,18 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -65,27 +71,51 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
HoodieTimer restoreTimer = new HoodieTimer();
restoreTimer.startTimer();
- // Get all the commits on the timeline after the provided commit time
- List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
- .getReverseOrderedInstants()
- .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
- .collect(Collectors.toList());
+ Option<HoodieInstant> restoreInstant = table.getRestoreTimeline()
+ .filterInflightsAndRequested()
+ .filter(instant -> instant.getTimestamp().equals(instantTime))
+ .firstInstant();
+ if (!restoreInstant.isPresent()) {
+ throw new HoodieRollbackException("No pending restore instants found to execute restore");
+ }
+ try {
+ List<HoodieInstant> instantsToRollback = getInstantsToRollback(restoreInstant.get());
+ ValidationUtils.checkArgument(restoreInstant.get().getState().equals(HoodieInstant.State.REQUESTED)
+ || restoreInstant.get().getState().equals(HoodieInstant.State.INFLIGHT));
+ Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
+ if (restoreInstant.get().isRequested()) {
+ table.getActiveTimeline().transitionRestoreRequestedToInflight(restoreInstant.get());
+ }
- Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
- table.getActiveTimeline().createNewInstant(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime));
- instantsToRollback.forEach(instant -> {
- instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
- LOG.info("Deleted instant " + instant);
- });
+ instantsToRollback.forEach(instant -> {
+ instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
+ LOG.info("Deleted instant " + instant);
+ });
- try {
return finishRestore(instantToMetadata,
instantsToRollback,
restoreTimer.endTimer()
);
} catch (IOException io) {
- throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
+ throw new HoodieRestoreException("unable to Restore instant " + restoreInstant.get(), io);
+ }
+ }
+
+ private List<HoodieInstant> getInstantsToRollback(HoodieInstant restoreInstant) throws IOException {
+ List<HoodieInstant> instantsToRollback = new ArrayList<>();
+ HoodieRestorePlan restorePlan = RestoreUtils.getRestorePlan(table.getMetaClient(), restoreInstant);
+ for (HoodieInstantInfo instantInfo : restorePlan.getInstantsToRollback()) {
+ // If restore crashed mid-way, there are chances that some commits are already rolled back,
+ // but some are not. so, we can ignore those commits which are fully rolledback in previous attempt if any.
+ Option<HoodieInstant> rollbackInstantOpt = table.getActiveTimeline().getWriteTimeline()
+ .filter(instant -> instant.getTimestamp().equals(instantInfo.getCommitTime()) && instant.getAction().equals(instantInfo.getAction())).firstInstant();
+ if (rollbackInstantOpt.isPresent()) {
+ instantsToRollback.add(rollbackInstantOpt.get());
+ } else {
+ LOG.warn("Ignoring already rolledback instant " + instantInfo.toString());
+ }
}
+ return instantsToRollback;
}
protected abstract HoodieRollbackMetadata rollbackInstant(HoodieInstant rollbackInstant);
@@ -99,7 +129,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
writeToMetadata(restoreMetadata);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
- // get all rollbacks instants after restore instant time and delete them.
+ // get all pending rollbacks instants after restore instant time and delete them.
// if not, rollbacks will be considered not completed and might hinder metadata table compaction.
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
.getReverseOrderedInstants()
@@ -115,6 +145,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
/**
* Update metadata table if available. Any update to metadata table happens within data table lock.
+ *
* @param restoreMetadata instance of {@link HoodieRestoreMetadata} to be applied to metadata.
*/
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
new file mode 100644
index 0000000..24c57a0
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/RestoreUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.restore;
+
+import org.apache.hudi.avro.model.HoodieRestorePlan;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+
+import java.io.IOException;
+
+public class RestoreUtils {
+
+ /**
+ * Get Latest version of Restore plan corresponding to a restore instant.
+ *
+ * @param metaClient Hoodie Table Meta Client
+ * @param restoreInstant Instant referring to restore action
+ * @return Rollback plan corresponding to rollback instant
+ * @throws IOException
+ */
+ public static HoodieRestorePlan getRestorePlan(HoodieTableMetaClient metaClient, HoodieInstant restoreInstant)
+ throws IOException {
+ final HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(restoreInstant);
+ return TimelineMetadataUtils.deserializeAvroMetadata(
+ metaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
new file mode 100644
index 0000000..a25eaed
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Plans the restore action and add a restore.requested meta file to timeline.
+ */
+public class RestorePlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieRestorePlan>> {
+
+
+ private static final Logger LOG = LogManager.getLogger(RestorePlanActionExecutor.class);
+
+ public static final Integer RESTORE_PLAN_VERSION_1 = 1;
+ public static final Integer LATEST_RESTORE_PLAN_VERSION = RESTORE_PLAN_VERSION_1;
+ private final String restoreInstantTime;
+
+ public RestorePlanActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ String restoreInstantTime) {
+ super(context, config, table, instantTime);
+ this.restoreInstantTime = restoreInstantTime;
+ }
+
+ @Override
+ public Option<HoodieRestorePlan> execute() {
+ final HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
+ try {
+ // Get all the commits on the timeline after the provided commit time
+ List<HoodieInstantInfo> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
+ .getReverseOrderedInstants()
+ .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)).map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
+ .collect(Collectors.toList());
+
+ HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION);
+ table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan));
+ table.getMetaClient().reloadActiveTimeline();
+ LOG.info("Requesting Restore with instant time " + restoreInstant);
+ return Option.of(restorePlan);
+ } catch (IOException e) {
+ LOG.error("Got exception when saving restore requested file", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9aceffe..7e41ab1 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -343,6 +344,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
+ public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+ throw new HoodieNotSupportedException("Restore is not supported yet");
+ }
+
+ @Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
throw new HoodieNotSupportedException("Savepoint and restore is not supported yet");
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 62a6980..f8590e9 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -63,6 +64,7 @@ import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.slf4j.Logger;
@@ -248,6 +250,11 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
+ public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+ return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+ }
+
+ @Override
public HoodieRestoreMetadata restore(HoodieEngineContext context,
String restoreInstantTime,
String instantToRestore) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 12c0483..6a33055 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
@@ -74,6 +75,7 @@ import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.avro.Schema;
@@ -258,6 +260,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
@Override
public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
+ new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
}
@@ -353,4 +356,8 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
}
+ @Override
+ public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
+ return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 75af5d0..334efa7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -52,6 +52,7 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitAct
import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
+import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.spark.api.java.JavaRDD;
@@ -150,6 +151,7 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
@Override
public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
+ new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 7334f4c..a1d7569 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -576,6 +576,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig);
+
client.restoreToInstant("004");
assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
similarity index 61%
copy from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
copy to hudi-common/src/main/avro/HoodieRestorePlan.avsc
index c6c9076..1ad9e6a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java
+++ b/hudi-common/src/main/avro/HoodieRestorePlan.avsc
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,12 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.hudi.exception;
-
-public class HoodieRestoreException extends HoodieException {
-
- public HoodieRestoreException(String msg, Throwable e) {
- super(msg, e);
- }
-}
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "type":"record",
+ "name":"HoodieRestorePlan",
+ "fields":[
+ {
+ "name": "instantsToRollback",
+ "default": [],
+ "type": {
+ "type": "array",
+ "default": null,
+ "items": "HoodieInstantInfo"
+ }
+ },
+ {
+ "name":"version",
+ "type":["int", "null"],
+ "default": 1
+ }]
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index c7473bd..1fa3845 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -70,7 +70,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
- INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
+ REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
@@ -289,6 +289,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
}
+ public Option<byte[]> readRestoreInfoAsBytes(HoodieInstant instant) {
+ // Rollback metadata are always stored only in timeline .hoodie
+ return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
+ }
+
//-----------------------------------------------------------------
// BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
//-----------------------------------------------------------------
@@ -430,6 +435,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
/**
+ * Transition Restore State from requested to inflight.
+ *
+ * @param requestedInstant requested instant
+ * @return commit instant
+ */
+ public HoodieInstant transitionRestoreRequestedToInflight(HoodieInstant requestedInstant) {
+ ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.RESTORE_ACTION), "Transition to inflight requested for a restore instant with diff action "
+ + requestedInstant.toString());
+ ValidationUtils.checkArgument(requestedInstant.isRequested(), "Transition to inflight requested for an instant not in requested state " + requestedInstant.toString());
+ HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, RESTORE_ACTION, requestedInstant.getTimestamp());
+ transitionState(requestedInstant, inflight, Option.empty());
+ return inflight;
+ }
+
+ /**
* Transition replace requested file to replace inflight.
*
* @param requestedInstant Requested instant
@@ -599,6 +619,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInMetaPath(instant.getFileName(), content, false);
}
+ public void saveToRestoreRequested(HoodieInstant instant, Option<byte[]> content) {
+ ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.RESTORE_ACTION));
+ ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
+ // Plan is stored in meta path
+ createFileInMetaPath(instant.getFileName(), content, false);
+ }
+
private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index a8df62c..9cd0883 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -166,6 +166,7 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
}
} else if (HoodieTimeline.RESTORE_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp)
+ : isRequested() ? HoodieTimeline.makeRequestedRestoreFileName(timestamp)
: HoodieTimeline.makeRestoreFileName(timestamp);
} else if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 6ea44a8..25b9c2e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -78,6 +78,7 @@ public interface HoodieTimeline extends Serializable {
String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION);
+ String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION;
String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION;
String RESTORE_EXTENSION = "." + RESTORE_ACTION;
String INFLIGHT_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + INFLIGHT_EXTENSION;
@@ -386,6 +387,10 @@ public interface HoodieTimeline extends Serializable {
return StringUtils.join(instant, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION);
}
+ static String makeRequestedRestoreFileName(String instant) {
+ return StringUtils.join(instant, HoodieTimeline.REQUESTED_RESTORE_EXTENSION);
+ }
+
static String makeInflightRollbackFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 723c594..70a23f1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
@@ -112,6 +113,10 @@ public class TimelineMetadataUtils {
return serializeAvroMetadata(rollbackPlan, HoodieRollbackPlan.class);
}
+ public static Option<byte[]> serializeRestorePlan(HoodieRestorePlan restorePlan) throws IOException {
+ return serializeAvroMetadata(restorePlan, HoodieRestorePlan.class);
+ }
+
public static Option<byte[]> serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException {
return serializeAvroMetadata(metadata, HoodieCleanMetadata.class);
}