You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:18:52 UTC

[hudi] 09/40: [HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ea48ca9b62a2f216460a0795ef9d519481510cd7
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Fri May 1 21:37:21 2020 -0700

    [HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576)
---
 .../java/org/apache/hudi/table/CleanHelper.java    | 85 ++++++++++++++++------
 .../java/org/apache/hudi/table/TestCleaner.java    | 42 +++++------
 2 files changed, 85 insertions(+), 42 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
index 3c73c7e..38eddd7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java
@@ -38,13 +38,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -90,34 +90,77 @@ public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializab
    * @throws IOException when underlying file-system throws this exception
    */
   public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
-    if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
-        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
-      Option<HoodieInstant> lastClean =
-          hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+    switch (config.getCleanerPolicy()) {
+      case KEEP_LATEST_COMMITS:
+        return getPartitionPathsForCleanByCommits(newInstantToRetain);
+      case KEEP_LATEST_FILE_VERSIONS:
+        return getPartitionPathsForFullCleaning();
+      default:
+        throw new IllegalStateException("Unknown Cleaner Policy");
+    }
+  }
+
+  /**
+   * Return partition paths for cleaning by commits mode.
+   * @param instantToRetain Earliest Instant to retain
+   * @return list of partitions
+   * @throws IOException
+   */
+  private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
+    if (!instantToRetain.isPresent()) {
+      LOG.info("No earliest commit to retain. No need to scan partitions !!");
+      return Collections.emptyList();
+    }
+
+    if (config.incrementalCleanerModeEnabled()) {
+      Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
       if (lastClean.isPresent()) {
         HoodieCleanMetadata cleanMetadata = AvroUtils
             .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
         if ((cleanMetadata.getEarliestCommitToRetain() != null)
             && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
-          LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
-              + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
-              + ". New Instant to retain : " + newInstantToRetain);
-          return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
-              HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
-              newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
-                try {
-                  HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
-                  return commitMetadata.getPartitionToWriteStats().keySet().stream();
-                } catch (IOException e) {
-                  throw new HoodieIOException(e.getMessage(), e);
-                }
-              }).distinct().collect(Collectors.toList());
+          return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
         }
       }
     }
-    // Otherwise go to brute force mode of scanning all partitions
-    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
-        hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
+    return getPartitionPathsForFullCleaning();
+  }
+
+  /**
+   * Use Incremental Mode for finding partition paths.
+   * @param cleanMetadata
+   * @param newInstantToRetain
+   * @return
+   */
+  private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
+      Option<HoodieInstant> newInstantToRetain) {
+    LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+        + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+        + ". New Instant to retain : " + newInstantToRetain);
+    return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
+        instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
+                HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+            newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
+              try {
+                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                    .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+                        HoodieCommitMetadata.class);
+                return commitMetadata.getPartitionToWriteStats().keySet().stream();
+              } catch (IOException e) {
+                throw new HoodieIOException(e.getMessage(), e);
+              }
+            }).distinct().collect(Collectors.toList());
+  }
+
+  /**
+   * Scan and list all paritions for cleaning.
+   * @return all partitions paths for the dataset.
+   * @throws IOException
+   */
+  private List<String> getPartitionPathsForFullCleaning() throws IOException {
+    // Go to brute force mode of scanning all partitions
+    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(),
+        config.shouldAssumeDatePartitioning());
   }
 
   /**
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index f6ad230..dee545a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -108,7 +108,8 @@ public class TestCleaner extends TestHoodieClientBase {
    */
   private String insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client,
       Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
+      HoodieCleaningPolicy cleaningPolicy) throws Exception {
 
     /*
      * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -131,10 +132,15 @@ public class TestCleaner extends TestHoodieClientBase {
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, client.getConfig(), jsc);
 
     assertFalse(table.getCompletedCommitsTimeline().empty());
-    String commitTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
-    assertFalse(table.getCompletedCleanTimeline().empty());
-    assertEquals("The clean instant should be the same as the commit instant", commitTime,
-        table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
+    if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
+      // We no longer write empty cleaner plans when there are not enough commits present
+      assertTrue(table.getCompletedCleanTimeline().empty());
+    } else {
+      String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
+      assertFalse(table.getCompletedCleanTimeline().empty());
+      assertEquals("The clean instant should be the same as the commit instant", instantTime,
+              table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
+    }
 
     HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
     List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
@@ -205,7 +211,8 @@ public class TestCleaner extends TestHoodieClientBase {
       final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
           generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
 
-      insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
+      insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
+          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
 
       Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -353,7 +360,7 @@ public class TestCleaner extends TestHoodieClientBase {
     int maxCommits = 3; // keep upto 3 commits from the past
     HoodieWriteConfig cfg = getConfigBuilder()
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .build();
@@ -365,7 +372,8 @@ public class TestCleaner extends TestHoodieClientBase {
     final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
         generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
 
-    insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
+    insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
 
     // Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
     HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> {
@@ -380,7 +388,9 @@ public class TestCleaner extends TestHoodieClientBase {
         metaClient = HoodieTableMetaClient.reload(metaClient);
         HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
         HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
-        Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
+        // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
+        // commit
+        Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
         Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
         if (earliestRetainedCommit.isPresent()) {
           acceptableCommits
@@ -753,12 +763,7 @@ public class TestCleaner extends TestHoodieClientBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
-    assertEquals("Must not clean any files", 0,
-        getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size());
-    assertEquals("Must not clean any files", 0,
-        getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
-            .size());
+    assertEquals("Must not scan any partitions and clean any files", 0, hoodieCleanStatsOne.size());
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
         file1P0C0));
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
@@ -788,12 +793,7 @@ public class TestCleaner extends TestHoodieClientBase {
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
-    assertEquals("Must not clean any files", 0,
-        getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size());
-    assertEquals("Must not clean any files", 0,
-        getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
-            .size());
+    assertEquals("Must not scan any partitions and clean any files", 0, hoodieCleanStatsTwo.size());
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
         file2P0C1));
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",