You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2023/03/06 22:41:26 UTC

[hudi] branch master updated: [HUDI-5881] Handle pending clean instants while running savepoint (#8105)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46a2ccf56ee [HUDI-5881] Handle pending clean instants while running savepoint (#8105)
46a2ccf56ee is described below

commit 46a2ccf56eeefabf03bd339640607b996e307987
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Tue Mar 7 04:11:15 2023 +0530

    [HUDI-5881] Handle pending clean instants while running savepoint (#8105)
---
 .../action/savepoint/SavepointActionExecutor.java  | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 4e0ae1da223..0c90311fe99 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.action.savepoint;
 
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -44,6 +43,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCleanerPlan;
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieCleanMetadata;
+
 public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieSavepointMetadata> {
 
   private static final Logger LOG = LogManager.getLogger(SavepointActionExecutor.class);
@@ -64,21 +67,29 @@ public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I
 
   @Override
   public HoodieSavepointMetadata execute() {
-    Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
     if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) {
       throw new HoodieSavepointException("Could not savepoint non-existing commit " + instantTime);
     }
 
     try {
       // Check the last commit that was not cleaned and check if savepoint time is > that commit
-      String lastCommitRetained;
-      if (cleanInstant.isPresent()) {
-        HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
-            .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
-        lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
-      } else {
-        lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
-      }
+      Option<HoodieInstant> cleanInstant = table.getCleanTimeline().lastInstant();
+      String lastCommitRetained = cleanInstant.map(instant -> {
+        try {
+          if (instant.isCompleted()) {
+            return deserializeHoodieCleanMetadata(
+                table.getActiveTimeline().getInstantDetails(instant).get())
+                .getEarliestCommitToRetain();
+          } else {
+            // clean is pending or inflight
+            return deserializeCleanerPlan(
+                table.getActiveTimeline().getInstantDetails(new HoodieInstant(REQUESTED, instant.getAction(), instant.getTimestamp())).get())
+                .getEarliestInstantToRetain().getTimestamp();
+          }
+        } catch (IOException e) {
+          throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
+        }
+      }).orElse(table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp());
 
       // Cannot allow savepoint time on a commit that could have been cleaned
       ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),