You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/04/19 08:10:05 UTC
[hudi] branch master updated: [HUDI-6056] Validate archival configs alignment with cleaner configs with policy based on hours (#8422)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f9f110695fc [HUDI-6056] Validate archival configs alignment with cleaner configs with policy based on hours (#8422)
f9f110695fc is described below
commit f9f110695fc69f2d7085648a6610888bb10ad8e4
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Wed Apr 19 01:09:48 2023 -0700
[HUDI-6056] Validate archival configs alignment with cleaner configs with policy based on hours (#8422)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 33 ++++++++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 38 +++++-----
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 84 ++++++++++++++++++++++
3 files changed, 134 insertions(+), 21 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index f9f734a9437..74e0a2565f8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
@@ -74,10 +75,15 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -86,6 +92,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
@@ -203,6 +210,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
* 2. Do merge.
* 3. Delete all the candidates.
* 4. Delete the merge plan.
+ *
* @param context HoodieEngineContext
* @throws IOException
*/
@@ -241,8 +249,9 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
* Find the latest 'huge archive file' index as a break point and only check/merge newer archive files.
* Because we need to keep the original order of archive files which is important when loading archived instants with time filter.
* {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
+ *
* @param smallFileLimitBytes small File Limit Bytes
- * @param fsStatuses Sort by version suffix in reverse
+ * @param fsStatuses Sort by version suffix in reverse
* @return merge candidates
*/
private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
@@ -266,6 +275,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
/**
* Check/Solve if there is any failed and unfinished merge small archive files operation
+ *
* @param context HoodieEngineContext used for parallelize to delete small archive files if necessary.
* @throws IOException
*/
@@ -475,7 +485,24 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);
- return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
+ List<HoodieInstant> instantsToArchive = instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep).collect(Collectors.toList());
+ // If cleaner is based on hours, lets ensure hudi does not archive commits yet to cleaned by the cleaner.
+ if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS && !instantsToArchive.isEmpty()) {
+ String latestCommitToArchive = instantsToArchive.get(instantsToArchive.size() - 1).getTimestamp();
+ try {
+ Instant latestCommitInstant = HoodieActiveTimeline.parseDateFromInstantTime(commitTimeline.lastInstant().get().getTimestamp()).toInstant();
+ ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(latestCommitInstant, ZoneId.systemDefault());
+ String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(config.getCleanerHoursRetained()).toInstant()));
+ if (HoodieTimeline.compareTimestamps(latestCommitToArchive, GREATER_THAN_OR_EQUALS, earliestTimeToRetain)) {
+ throw new HoodieIOException("Please align your archival configs based on cleaner configs. 'hoodie.keep.min.commits' : "
+ + config.getMinCommitsToKeep() + " + should be greater than "
+ + " 'hoodie.cleaner.hours.retained' : " + config.getCleanerHoursRetained());
+ }
+ } catch (ParseException e) {
+ throw new HoodieIOException("Failed to parse latest commit instant time " + commitTimeline.lastInstant().get().getTimestamp() + e.getMessage());
+ }
+ }
+ return instantsToArchive.stream();
} else {
return Stream.empty();
}
@@ -543,7 +570,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
return instants.flatMap(hoodieInstant -> {
List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
- HoodieInstant.getComparableAction(hoodieInstant.getAction())));
+ HoodieInstant.getComparableAction(hoodieInstant.getAction())));
if (instantsToStream != null) {
return instantsToStream.stream();
} else {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 415e208268e..3009316bb91 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3099,24 +3099,26 @@ public class HoodieWriteConfig extends HoodieConfig {
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
}
- HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
- // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
- // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
- int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
- int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
- int cleanerCommitsRetained =
- Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
- checkArgument(maxInstantsToKeep > minInstantsToKeep,
- String.format(
- "Increase %s=%d to be greater than %s=%d.",
- HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
- HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
- checkArgument(minInstantsToKeep > cleanerCommitsRetained,
- String.format(
- "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
- + "missing data from few instants.",
- HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
- HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
+ HoodieCleaningPolicy cleaningPolicy = HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
+ if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+ // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
+ // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
+ int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
+ int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
+ int cleanerCommitsRetained =
+ Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
+ checkArgument(maxInstantsToKeep > minInstantsToKeep,
+ String.format(
+ "Increase %s=%d to be greater than %s=%d.",
+ HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
+ checkArgument(minInstantsToKeep > cleanerCommitsRetained,
+ String.format(
+ "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
+ + "missing data from few instants.",
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
+ }
boolean inlineCompact = writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
boolean inlineCompactSchedule = writeConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index aa86b126ebd..6e254e3146d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -25,9 +25,11 @@ import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -42,6 +44,7 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -51,6 +54,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -62,6 +66,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -72,14 +77,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -90,6 +101,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -304,6 +316,78 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
}
}
+ @Test
+ public void testArchivalWithCleanBasedOnHours() throws Exception {
+ init();
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(1)
+ .retainCommits(2)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3,4).build())
+ .build();
+
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+ Instant instant = Instant.now();
+ ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
+
+ triggerCommit(p0, p1, commitDateTime, 5, testTable);
+ triggerCommit(p0, p1, commitDateTime, 10, testTable);
+ triggerCommit(p0, p1, commitDateTime, 20, testTable);
+ triggerCommit(p0, p1, commitDateTime, 30, testTable);
+ triggerCommit(p0, p1, commitDateTime, 40, testTable);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ // lets trigger archival. should fail since archival configs does not align w/ cleaner configs.
+ try {
+ archiveAndGetCommitsList(config);
+ Assertions.fail("Should have failed archival since archival configs are not aligned with cleaner configs. ");
+ } catch (HoodieIOException e) {
+ assertTrue(e.getMessage().contains("Please align your archival configs based on cleaner configs"));
+ }
+ }
+
+ private void triggerCommit(String p0, String p1, ZonedDateTime curDateTime, int minutesForCommit, HoodieTestTable testTable) throws Exception {
+
+ String file1P0C0 = UUID.randomUUID().toString();
+ String file1P1C0 = UUID.randomUUID().toString();
+ String commitTs = HoodieActiveTimeline.formatDate(Date.from(curDateTime.minusMinutes(minutesForCommit).toInstant()));
+ testTable.addInflightCommit(commitTs).withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+ HoodieCommitMetadata commitMetadata = generateCommitMetadata(commitTs,
+ Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
+ }
+ })
+ );
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTs),
+ Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+ }
+
+ protected static HoodieCommitMetadata generateCommitMetadata(
+ String instantTime, Map<String, List<String>> partitionToFilePaths) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA);
+ partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f));
+ writeStat.setFileId(f);
+ writeStat.setTotalWriteBytes(1);
+ writeStat.setFileSizeInBytes(1);
+ metadata.addWriteStat(partitionPath, writeStat);
+ }));
+ return metadata;
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {