You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:18:52 UTC
[hudi] 09/40: [HUDI-850] Avoid unnecessary listings in incremental
cleaning mode (#1576)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ea48ca9b62a2f216460a0795ef9d519481510cd7
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Fri May 1 21:37:21 2020 -0700
[HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576)
---
.../java/org/apache/hudi/table/CleanHelper.java | 85 ++++++++++++++++------
.../java/org/apache/hudi/table/TestCleaner.java | 42 +++++------
2 files changed, 85 insertions(+), 42 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
index 3c73c7e..38eddd7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
@@ -38,13 +38,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
-
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,34 +90,77 @@ public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializab
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
- if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
- && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
- Option<HoodieInstant> lastClean =
- hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+ switch (config.getCleanerPolicy()) {
+ case KEEP_LATEST_COMMITS:
+ return getPartitionPathsForCleanByCommits(newInstantToRetain);
+ case KEEP_LATEST_FILE_VERSIONS:
+ return getPartitionPathsForFullCleaning();
+ default:
+ throw new IllegalStateException("Unknown Cleaner Policy");
+ }
+ }
+
+ /**
+ * Return partition paths for cleaning by commits mode.
+ * @param instantToRetain Earliest Instant to retain
+ * @return list of partitions
+ * @throws IOException
+ */
+ private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
+ if (!instantToRetain.isPresent()) {
+ LOG.info("No earliest commit to retain. No need to scan partitions !!");
+ return Collections.emptyList();
+ }
+
+ if (config.incrementalCleanerModeEnabled()) {
+ Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = AvroUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
- LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
- + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
- + ". New Instant to retain : " + newInstantToRetain);
- return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
- HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
- newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
- try {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
- return commitMetadata.getPartitionToWriteStats().keySet().stream();
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }).distinct().collect(Collectors.toList());
+ return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
}
}
}
- // Otherwise go to brute force mode of scanning all partitions
- return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
- hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
+ return getPartitionPathsForFullCleaning();
+ }
+
+ /**
+ * Use Incremental Mode for finding partition paths.
+ * @param cleanMetadata
+ * @param newInstantToRetain
+ * @return
+ */
+ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
+ Option<HoodieInstant> newInstantToRetain) {
+ LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ + ". New Instant to retain : " + newInstantToRetain);
+ return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
+ instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
+ HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+ newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+ HoodieCommitMetadata.class);
+ return commitMetadata.getPartitionToWriteStats().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }).distinct().collect(Collectors.toList());
+ }
+
+ /**
+ * Scan and list all paritions for cleaning.
+ * @return all partitions paths for the dataset.
+ * @throws IOException
+ */
+ private List<String> getPartitionPathsForFullCleaning() throws IOException {
+ // Go to brute force mode of scanning all partitions
+ return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(),
+ config.shouldAssumeDatePartitioning());
}
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index f6ad230..dee545a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -108,7 +108,8 @@ public class TestCleaner extends TestHoodieClientBase {
*/
private String insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
+ HoodieCleaningPolicy cleaningPolicy) throws Exception {
/*
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -131,10 +132,15 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieTable table = HoodieTable.getHoodieTable(metaClient, client.getConfig(), jsc);
assertFalse(table.getCompletedCommitsTimeline().empty());
- String commitTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
- assertFalse(table.getCompletedCleanTimeline().empty());
- assertEquals("The clean instant should be the same as the commit instant", commitTime,
- table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
+ if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
+ // We no longer write empty cleaner plans when there are not enough commits present
+ assertTrue(table.getCompletedCleanTimeline().empty());
+ } else {
+ String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
+ assertFalse(table.getCompletedCleanTimeline().empty());
+ assertEquals("The clean instant should be the same as the commit instant", instantTime,
+ table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
+ }
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
@@ -205,7 +211,8 @@ public class TestCleaner extends TestHoodieClientBase {
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
- insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
+ insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
+ HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -353,7 +360,7 @@ public class TestCleaner extends TestHoodieClientBase {
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
@@ -365,7 +372,8 @@ public class TestCleaner extends TestHoodieClientBase {
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
- insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
+ insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> {
@@ -380,7 +388,9 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
- Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
+ // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
+ // commit
+ Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
if (earliestRetainedCommit.isPresent()) {
acceptableCommits
@@ -753,12 +763,7 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
- assertEquals("Must not clean any files", 0,
- getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
- .size());
- assertEquals("Must not clean any files", 0,
- getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
- .size());
+ assertEquals("Must not scan any partitions and clean any files", 0, hoodieCleanStatsOne.size());
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
@@ -788,12 +793,7 @@ public class TestCleaner extends TestHoodieClientBase {
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
- assertEquals("Must not clean any files", 0,
- getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
- .size());
- assertEquals("Must not clean any files", 0,
- getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
- .size());
+ assertEquals("Must not scan any partitions and clean any files", 0, hoodieCleanStatsTwo.size());
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",