You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/04/19 08:10:05 UTC

[hudi] branch master updated: [HUDI-6056] Validate archival configs alignment with cleaner configs with policy based on hours (#8422)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f9f110695fc [HUDI-6056] Validate archival configs alignment with cleaner configs with policy based on hours (#8422)
f9f110695fc is described below

commit f9f110695fc69f2d7085648a6610888bb10ad8e4
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Wed Apr 19 01:09:48 2023 -0700

    [HUDI-6056] Validate archival configs alignment with cleaner configs with policy based on hours (#8422)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 33 ++++++++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 38 +++++-----
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 84 ++++++++++++++++++++++
 3 files changed, 134 insertions(+), 21 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index f9f734a9437..74e0a2565f8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -74,10 +75,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,6 +92,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
@@ -203,6 +210,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
    * 2. Do merge.
    * 3. Delete all the candidates.
    * 4. Delete the merge plan.
+   *
    * @param context HoodieEngineContext
    * @throws IOException
    */
@@ -241,8 +249,9 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
    * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files.
    * Because we need to keep the original order of archive files which is important when loading archived instants with time filter.
    * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
+   *
    * @param smallFileLimitBytes small File Limit Bytes
-   * @param fsStatuses Sort by version suffix in reverse
+   * @param fsStatuses          Sort by version suffix in reverse
    * @return merge candidates
    */
   private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
@@ -266,6 +275,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
 
   /**
    * Check/Solve if there is any failed and unfinished merge small archive files operation
+   *
    * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary.
    * @throws IOException
    */
@@ -475,7 +485,24 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
                       HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
           );
-      return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
+      List<HoodieInstant> instantsToArchive = instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep).collect(Collectors.toList());
+      // If cleaner is based on hours, lets ensure hudi does not archive commits yet to cleaned by the cleaner.
+      if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS && !instantsToArchive.isEmpty()) {
+        String latestCommitToArchive = instantsToArchive.get(instantsToArchive.size() - 1).getTimestamp();
+        try {
+          Instant latestCommitInstant = HoodieActiveTimeline.parseDateFromInstantTime(commitTimeline.lastInstant().get().getTimestamp()).toInstant();
+          ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(latestCommitInstant, ZoneId.systemDefault());
+          String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(config.getCleanerHoursRetained()).toInstant()));
+          if (HoodieTimeline.compareTimestamps(latestCommitToArchive, GREATER_THAN_OR_EQUALS, earliestTimeToRetain)) {
+            throw new HoodieIOException("Please align your archival configs based on cleaner configs. 'hoodie.keep.min.commits' : "
+                + config.getMinCommitsToKeep() + " + should be greater than "
+                + " 'hoodie.cleaner.hours.retained' : " + config.getCleanerHoursRetained());
+          }
+        } catch (ParseException e) {
+          throw new HoodieIOException("Failed to parse latest commit instant time " + commitTimeline.lastInstant().get().getTimestamp() + e.getMessage());
+        }
+      }
+      return instantsToArchive.stream();
     } else {
       return Stream.empty();
     }
@@ -543,7 +570,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
 
     return instants.flatMap(hoodieInstant -> {
       List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
-                HoodieInstant.getComparableAction(hoodieInstant.getAction())));
+          HoodieInstant.getComparableAction(hoodieInstant.getAction())));
       if (instantsToStream != null) {
         return instantsToStream.stream();
       } else {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 415e208268e..3009316bb91 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3099,24 +3099,26 @@ public class HoodieWriteConfig extends HoodieConfig {
             .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
       }
 
-      HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
-      // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
-      // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
-      int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
-      int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
-      int cleanerCommitsRetained =
-          Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
-      checkArgument(maxInstantsToKeep > minInstantsToKeep,
-          String.format(
-              "Increase %s=%d to be greater than %s=%d.",
-              HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
-              HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
-      checkArgument(minInstantsToKeep > cleanerCommitsRetained,
-          String.format(
-              "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
-                  + "missing data from few instants.",
-              HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
-              HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
+      HoodieCleaningPolicy cleaningPolicy = HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
+      if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+        // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
+        // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
+        int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
+        int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
+        int cleanerCommitsRetained =
+            Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
+        checkArgument(maxInstantsToKeep > minInstantsToKeep,
+            String.format(
+                "Increase %s=%d to be greater than %s=%d.",
+                HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
+                HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
+        checkArgument(minInstantsToKeep > cleanerCommitsRetained,
+            String.format(
+                "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
+                    + "missing data from few instants.",
+                HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
+                HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
+      }
 
       boolean inlineCompact = writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
       boolean inlineCompactSchedule = writeConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index aa86b126ebd..6e254e3146d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -25,9 +25,11 @@ import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -42,6 +44,7 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -51,6 +54,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -62,6 +66,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -72,14 +77,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -90,6 +101,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
 import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -304,6 +316,78 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     }
   }
 
+  @Test
+  public void testArchivalWithCleanBasedOnHours() throws Exception {
+    init();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(1)
+            .retainCommits(2)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3,4).build())
+        .build();
+
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    String p0 = "2020/01/01";
+    String p1 = "2020/01/02";
+    Instant instant = Instant.now();
+    ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
+
+    triggerCommit(p0, p1, commitDateTime, 5, testTable);
+    triggerCommit(p0, p1, commitDateTime, 10, testTable);
+    triggerCommit(p0, p1, commitDateTime, 20, testTable);
+    triggerCommit(p0, p1, commitDateTime, 30, testTable);
+    triggerCommit(p0, p1, commitDateTime, 40, testTable);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // lets trigger archival. should fail since archival configs does not align w/ cleaner configs.
+    try {
+      archiveAndGetCommitsList(config);
+      Assertions.fail("Should have failed archival since archival configs are not aligned with cleaner configs. ");
+    } catch (HoodieIOException e) {
+      assertTrue(e.getMessage().contains("Please align your archival configs based on cleaner configs"));
+    }
+  }
+
+  private void triggerCommit(String p0, String p1, ZonedDateTime curDateTime, int minutesForCommit, HoodieTestTable testTable) throws Exception {
+
+    String file1P0C0 = UUID.randomUUID().toString();
+    String file1P1C0 = UUID.randomUUID().toString();
+    String commitTs = HoodieActiveTimeline.formatDate(Date.from(curDateTime.minusMinutes(minutesForCommit).toInstant()));
+    testTable.addInflightCommit(commitTs).withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+    HoodieCommitMetadata commitMetadata = generateCommitMetadata(commitTs,
+        Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+          {
+            put(p0, CollectionUtils.createImmutableList(file1P0C0));
+            put(p1, CollectionUtils.createImmutableList(file1P1C0));
+          }
+        })
+    );
+    metaClient.getActiveTimeline().saveAsComplete(
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTs),
+        Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+  }
+
+  protected static HoodieCommitMetadata generateCommitMetadata(
+      String instantTime, Map<String, List<String>> partitionToFilePaths) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA);
+    partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> {
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPartitionPath(partitionPath);
+      writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f));
+      writeStat.setFileId(f);
+      writeStat.setTotalWriteBytes(1);
+      writeStat.setFileSizeInBytes(1);
+      metadata.addWriteStat(partitionPath, writeStat);
+    }));
+    return metadata;
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {