You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:41 UTC

[hudi] 14/20: [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ee779fe86fef3ecf2144e72a7e90c75d15ad1ded
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jan 6 20:29:29 2023 +0800

    [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)
    
    (cherry picked from commit f745e6457353804359b20575c597b38507237aba)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 16 ++++-
 .../hudi/table/action/clean/CleanPlanner.java      | 19 +++++-
 .../apache/hudi/common/util/ClusteringUtils.java   | 37 ++++++++++++
 .../hudi/common/util/TestClusteringUtils.java      | 69 ++++++++++++++++++++++
 4 files changed, 137 insertions(+), 4 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 2992f4abd4c..974440f165c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.FileIOUtils;
@@ -397,7 +398,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
         }).flatMap(Collection::stream);
   }
 
-  private Stream<HoodieInstant> getCommitInstantsToArchive() {
+  private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
     // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
     // with logic above to avoid Stream.concat
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -430,6 +431,11 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
               : Option.empty();
 
+      // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned,
+      // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers.
+      Option<HoodieInstant> oldestInstantToRetainForClustering =
+          ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient());
+
       // Actually do the commits
       Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
           .filter(s -> {
@@ -442,7 +448,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
             }
           }).filter(s -> {
-            // Ensure commits >= oldest pending compaction commit is retained
+            // Ensure commits >= the oldest pending compaction commit is retained
             return oldestPendingCompactionAndReplaceInstant
                 .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
                 .orElse(true);
@@ -459,6 +465,10 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               oldestInstantToRetainForCompaction.map(instantToRetain ->
                       compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
+          ).filter(s ->
+              oldestInstantToRetainForClustering.map(instantToRetain ->
+                      HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
+                  .orElse(true)
           );
       return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
     } else {
@@ -466,7 +476,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
     }
   }
 
-  private Stream<HoodieInstant> getInstantsToArchive() {
+  private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
     Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
     if (config.isMetastoreEnabled()) {
       return Stream.empty();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 64e69b1d2a9..737388645b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -465,7 +465,24 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
     int hoursRetained = config.getCleanerHoursRetained();
     if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
         && commitTimeline.countInstants() > commitsRetained) {
-      earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+      Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
+          .getActiveTimeline()
+          .getCommitsTimeline()
+          .filter(s -> !s.isCompleted()).firstInstant();
+      if (earliestPendingCommits.isPresent()) {
+        // Earliest commit to retain must not be later than the earliest pending commit
+        earliestCommitToRetain =
+            commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
+              if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+                return Option.of(nthInstant);
+              } else {
+                return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+              }
+            }).orElse(Option.empty());
+      } else {
+        earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
+            - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+      }
     } else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
       Instant instant = Instant.now();
       ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 9d741a03f82..17b8672094f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -223,4 +224,40 @@ public class ClusteringUtils {
   public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
     return getClusteringPlan(metaClient, instant).isPresent();
   }
+
+  /**
+   * Checks whether the latest clustering instant has a subsequent cleaning action. Returns
+   * the clustering instant if there is such cleaning action or empty.
+   *
+   * @param activeTimeline The active timeline
+   * @param metaClient     The meta client
+   * @return the oldest instant to retain for clustering
+   */
+  public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
+      HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException {
+    HoodieTimeline replaceTimeline = activeTimeline.getCompletedReplaceTimeline();
+    if (!replaceTimeline.empty()) {
+      Option<HoodieInstant> cleanInstantOpt =
+          activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
+      if (cleanInstantOpt.isPresent()) {
+        // The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
+        // the clean metadata.
+        HoodieInstant cleanInstant = cleanInstantOpt.get();
+        String earliestCommitToRetain =
+            CleanerUtils.getCleanerPlan(metaClient,
+                    cleanInstant.isRequested()
+                        ? cleanInstant
+                        : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
+                .getEarliestInstantToRetain().getTimestamp();
+        return StringUtils.isNullOrEmpty(earliestCommitToRetain)
+            ? Option.empty()
+            : replaceTimeline.filter(instant ->
+                HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+                    HoodieTimeline.GREATER_THAN_OR_EQUALS,
+                    earliestCommitToRetain))
+            .firstInstant();
+      }
+    }
+    return Option.empty();
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index a5d45d1184f..edd877a28fd 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -18,17 +18,21 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,6 +47,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 /**
  * Tests for {@link ClusteringUtils}.
@@ -114,6 +119,70 @@ public class TestClusteringUtils extends HoodieCommonTestHarness {
     assertEquals(requestedClusteringPlan, inflightClusteringPlan);
   }
 
+  @Test
+  public void testGetOldestInstantToRetainForClustering() throws IOException {
+    String partitionPath1 = "partition1";
+    List<String> fileIds1 = new ArrayList<>();
+    fileIds1.add(UUID.randomUUID().toString());
+    String clusterTime1 = "1";
+    HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+    HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
+    HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
+    List<String> fileIds2 = new ArrayList<>();
+    fileIds2.add(UUID.randomUUID().toString());
+    fileIds2.add(UUID.randomUUID().toString());
+    String clusterTime2 = "2";
+    HoodieInstant requestedInstant2 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
+    HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant2, Option.empty());
+    metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant2, Option.empty());
+    List<String> fileIds3 = new ArrayList<>();
+    fileIds3.add(UUID.randomUUID().toString());
+    fileIds3.add(UUID.randomUUID().toString());
+    fileIds3.add(UUID.randomUUID().toString());
+    String clusterTime3 = "3";
+    HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime3, fileIds3);
+    HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty());
+    HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
+    metaClient.reloadActiveTimeline();
+    Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+    assertFalse(actual.isPresent());
+    // test first uncompleted clean instant is requested.
+    String cleanTime1 = "4";
+    HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
+    HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
+        .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+            .setAction(completedInstant1.getAction())
+            .setTimestamp(completedInstant1.getTimestamp())
+            .setState(completedInstant1.getState().name()))
+        .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+        .setFilesToBeDeletedPerPartition(new HashMap<>())
+        .setVersion(CleanPlanV2MigrationHandler.VERSION)
+        .build();
+    metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
+    metaClient.reloadActiveTimeline();
+    actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+    assertEquals(clusterTime1, actual.get().getTimestamp());
+    HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
+    metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty());
+    // test first uncompleted clean instant is inflight.
+    String cleanTime2 = "5";
+    HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2);
+    HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
+        .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+            .setAction(completedInstant3.getAction())
+            .setTimestamp(completedInstant3.getTimestamp())
+            .setState(completedInstant3.getState().name()))
+        .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+        .setFilesToBeDeletedPerPartition(new HashMap<>())
+        .setVersion(CleanPlanV2MigrationHandler.VERSION)
+        .build();
+    metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
+    metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty());
+    metaClient.reloadActiveTimeline();
+    actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+    assertEquals(clusterTime3, actual.get().getTimestamp());
+  }
+
   private void validateClusteringInstant(List<String> fileIds, String partitionPath,
                                          String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
     for (String fileId : fileIds) {