You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:41 UTC
[hudi] 14/20: [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ee779fe86fef3ecf2144e72a7e90c75d15ad1ded
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jan 6 20:29:29 2023 +0800
[HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)
(cherry picked from commit f745e6457353804359b20575c597b38507237aba)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 16 ++++-
.../hudi/table/action/clean/CleanPlanner.java | 19 +++++-
.../apache/hudi/common/util/ClusteringUtils.java | 37 ++++++++++++
.../hudi/common/util/TestClusteringUtils.java | 69 ++++++++++++++++++++++
4 files changed, 137 insertions(+), 4 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 2992f4abd4c..974440f165c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -397,7 +398,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
}).flatMap(Collection::stream);
}
- private Stream<HoodieInstant> getCommitInstantsToArchive() {
+ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -430,6 +431,11 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
: Option.empty();
+ // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned,
+ // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers.
+ Option<HoodieInstant> oldestInstantToRetainForClustering =
+ ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient());
+
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
.filter(s -> {
@@ -442,7 +448,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
- // Ensure commits >= oldest pending compaction commit is retained
+ // Ensure commits >= the oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
@@ -459,6 +465,10 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
+ ).filter(s ->
+ oldestInstantToRetainForClustering.map(instantToRetain ->
+ HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
+ .orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
@@ -466,7 +476,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
}
}
- private Stream<HoodieInstant> getInstantsToArchive() {
+ private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 64e69b1d2a9..737388645b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -465,7 +465,24 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
int hoursRetained = config.getCleanerHoursRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
- earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+ Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
+ .getActiveTimeline()
+ .getCommitsTimeline()
+ .filter(s -> !s.isCompleted()).firstInstant();
+ if (earliestPendingCommits.isPresent()) {
+ // Earliest commit to retain must not be later than the earliest pending commit
+ earliestCommitToRetain =
+ commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
+ if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+ return Option.of(nthInstant);
+ } else {
+ return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+ }
+ }).orElse(Option.empty());
+ } else {
+ earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
+ - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+ }
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
Instant instant = Instant.now();
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 9d741a03f82..17b8672094f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -223,4 +224,40 @@ public class ClusteringUtils {
public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
return getClusteringPlan(metaClient, instant).isPresent();
}
+
+ /**
+ * Checks whether the latest clustering instant has a subsequent cleaning action. Returns
+ * the clustering instant if there is such cleaning action or empty.
+ *
+ * @param activeTimeline The active timeline
+ * @param metaClient The meta client
+ * @return the oldest instant to retain for clustering
+ */
+ public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
+ HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException {
+ HoodieTimeline replaceTimeline = activeTimeline.getCompletedReplaceTimeline();
+ if (!replaceTimeline.empty()) {
+ Option<HoodieInstant> cleanInstantOpt =
+ activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
+ if (cleanInstantOpt.isPresent()) {
+ // The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
+ // the clean metadata.
+ HoodieInstant cleanInstant = cleanInstantOpt.get();
+ String earliestCommitToRetain =
+ CleanerUtils.getCleanerPlan(metaClient,
+ cleanInstant.isRequested()
+ ? cleanInstant
+ : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
+ .getEarliestInstantToRetain().getTimestamp();
+ return StringUtils.isNullOrEmpty(earliestCommitToRetain)
+ ? Option.empty()
+ : replaceTimeline.filter(instant ->
+ HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+ HoodieTimeline.GREATER_THAN_OR_EQUALS,
+ earliestCommitToRetain))
+ .firstInstant();
+ }
+ }
+ return Option.empty();
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index a5d45d1184f..edd877a28fd 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -18,17 +18,21 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -43,6 +47,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
/**
* Tests for {@link ClusteringUtils}.
@@ -114,6 +119,70 @@ public class TestClusteringUtils extends HoodieCommonTestHarness {
assertEquals(requestedClusteringPlan, inflightClusteringPlan);
}
+ @Test
+ public void testGetOldestInstantToRetainForClustering() throws IOException {
+ String partitionPath1 = "partition1";
+ List<String> fileIds1 = new ArrayList<>();
+ fileIds1.add(UUID.randomUUID().toString());
+ String clusterTime1 = "1";
+ HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+ HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
+ HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
+ List<String> fileIds2 = new ArrayList<>();
+ fileIds2.add(UUID.randomUUID().toString());
+ fileIds2.add(UUID.randomUUID().toString());
+ String clusterTime2 = "2";
+ HoodieInstant requestedInstant2 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
+ HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant2, Option.empty());
+ metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant2, Option.empty());
+ List<String> fileIds3 = new ArrayList<>();
+ fileIds3.add(UUID.randomUUID().toString());
+ fileIds3.add(UUID.randomUUID().toString());
+ fileIds3.add(UUID.randomUUID().toString());
+ String clusterTime3 = "3";
+ HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime3, fileIds3);
+ HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty());
+ HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
+ metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+ assertFalse(actual.isPresent());
+ // test first uncompleted clean instant is requested.
+ String cleanTime1 = "4";
+ HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
+ HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
+ .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+ .setAction(completedInstant1.getAction())
+ .setTimestamp(completedInstant1.getTimestamp())
+ .setState(completedInstant1.getState().name()))
+ .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+ .setFilesToBeDeletedPerPartition(new HashMap<>())
+ .setVersion(CleanPlanV2MigrationHandler.VERSION)
+ .build();
+ metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
+ metaClient.reloadActiveTimeline();
+ actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+ assertEquals(clusterTime1, actual.get().getTimestamp());
+ HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
+ metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty());
+ // test first uncompleted clean instant is inflight.
+ String cleanTime2 = "5";
+ HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2);
+ HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
+ .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+ .setAction(completedInstant3.getAction())
+ .setTimestamp(completedInstant3.getTimestamp())
+ .setState(completedInstant3.getState().name()))
+ .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+ .setFilesToBeDeletedPerPartition(new HashMap<>())
+ .setVersion(CleanPlanV2MigrationHandler.VERSION)
+ .build();
+ metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
+ metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty());
+ metaClient.reloadActiveTimeline();
+ actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+ assertEquals(clusterTime3, actual.get().getTimestamp());
+ }
+
private void validateClusteringInstant(List<String> fileIds, String partitionPath,
String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
for (String fileId : fileIds) {