You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2023/03/06 22:41:26 UTC
[hudi] branch master updated: [HUDI-5881] Handle pending clean instants while running savepoint (#8105)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 46a2ccf56ee [HUDI-5881] Handle pending clean instants while running savepoint (#8105)
46a2ccf56ee is described below
commit 46a2ccf56eeefabf03bd339640607b996e307987
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Tue Mar 7 04:11:15 2023 +0530
[HUDI-5881] Handle pending clean instants while running savepoint (#8105)
---
.../action/savepoint/SavepointActionExecutor.java | 31 +++++++++++++++-------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 4e0ae1da223..0c90311fe99 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.savepoint;
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -44,6 +43,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCleanerPlan;
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieCleanMetadata;
+
public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieSavepointMetadata> {
private static final Logger LOG = LogManager.getLogger(SavepointActionExecutor.class);
@@ -64,21 +67,29 @@ public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
@Override
public HoodieSavepointMetadata execute() {
- Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) {
throw new HoodieSavepointException("Could not savepoint non-existing commit " + instantTime);
}
try {
// Check the last commit that was not cleaned and check if savepoint time is > that commit
- String lastCommitRetained;
- if (cleanInstant.isPresent()) {
- HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
- .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
- lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
- } else {
- lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
- }
+ Option<HoodieInstant> cleanInstant = table.getCleanTimeline().lastInstant();
+ String lastCommitRetained = cleanInstant.map(instant -> {
+ try {
+ if (instant.isCompleted()) {
+ return deserializeHoodieCleanMetadata(
+ table.getActiveTimeline().getInstantDetails(instant).get())
+ .getEarliestCommitToRetain();
+ } else {
+ // clean is pending or inflight
+ return deserializeCleanerPlan(
+ table.getActiveTimeline().getInstantDetails(new HoodieInstant(REQUESTED, instant.getAction(), instant.getTimestamp())).get())
+ .getEarliestInstantToRetain().getTimestamp();
+ }
+ } catch (IOException e) {
+ throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
+ }
+ }).orElse(table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp());
// Cannot allow savepoint time on a commit that could have been cleaned
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),