You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/08 23:47:57 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4974: [HUDI-3494] Consider triggering condition of MOR compaction during archival

nsivabalan commented on a change in pull request #4974:
URL: https://github.com/apache/hudi/pull/4974#discussion_r822159191



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
##########
@@ -395,6 +399,18 @@ public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOExcept
     // made after the first savepoint present.
     Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
     if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
+      // For Merge-On-Read table, inline or async compaction is enabled
+      // We need to make sure that there are enough delta commits in the active timeline
+      // to trigger compaction scheduling, when the trigger strategy of compaction is
+      // NUM_COMMITS or NUM_AND_TIME.
+      Option<HoodieInstant> oldestInstantToKeepForCompaction =

Review comment:
       minor: oldestInstantToRetain

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -946,6 +961,152 @@ public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enableMetadata) throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
+        enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
+
+    // When max archival commits is set to 4, even after 8 delta commits, since the number of delta
+    // commits is still smaller than 8, the archival should not kick in.
+    // The archival should only kick in after the 9th delta commit
+    // instant "00000001" to "00000009"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      if (i <= 8) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else {
+        assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+        assertFalse(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+        IntStream.range(2, 10).forEach(j ->
+            assertTrue(commitsAfterArchival.contains(
+                new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+      }
+    }
+
+    testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
+
+    // instant "00000011" to "00000019"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i == 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      // first 9 delta commits before the completed compaction should be archived
+      IntStream.range(1, 10).forEach(j ->

Review comment:
       guess we are doing this assertion for all iterations of 11 to 19. we can probably do just once at 11. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
##########
@@ -128,27 +129,25 @@ private HoodieCompactionPlan scheduleCompaction() {
     return new HoodieCompactionPlan();
   }
 
-  private Pair<Integer, String> getLatestDeltaCommitInfo() {
-    Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
-        .filterCompletedInstants().lastInstant();
-    HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
-
-    String latestInstantTs;
-    final int deltaCommitsSinceLastCompaction;
-    if (lastCompaction.isPresent()) {
-      latestInstantTs = lastCompaction.get().getTimestamp();
-      deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants();
-    } else {
-      latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
-      deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants();
+  private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
+    if (deltaCommitsInfo.isPresent()) {
+      return Option.of(Pair.of(
+          deltaCommitsInfo.get().getLeft().countInstants(),
+          deltaCommitsInfo.get().getRight().getTimestamp()));
     }
-    return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
+    return Option.empty();
   }
 
   private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
     boolean compactable;
     // get deltaCommitsSinceLastCompaction and lastCompactionTs
-    Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo();
+    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = getLatestDeltaCommitInfo();
+    if (!latestDeltaCommitInfoOption.isPresent()) {
+      return false;

Review comment:
       is this referring to an empty table where in there is no delta commit only?
   
   does this also refer to scenarios, where there is no delta commits after the last compaction ? 
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
##########
@@ -195,10 +196,76 @@ public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaC
 
   /**
    * Return all pending compaction instant times.
-   * 
+   *
    * @return
    */
   public static List<HoodieInstant> getPendingCompactionInstantTimes(HoodieTableMetaClient metaClient) {
     return metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
   }
+
+  /**
+   * Returns a pair of (timeline containing the delta commits after the latest completed
+   * compaction commit, the completed compaction commit instant), if the latest completed
+   * compaction commit is present; a pair of (timeline containing all the delta commits,
+   * the first delta commit instant), if there is no completed compaction commit.
+   *
+   * @param activeTimeline Active timeline of a table.
+   * @return Pair of timeline containing delta commits and an instant.
+   */
+  public static Option<Pair<HoodieTimeline, HoodieInstant>> getDeltaCommitsSinceLatestCompaction(
+      HoodieActiveTimeline activeTimeline) {
+    Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline()
+        .filterCompletedInstants().lastInstant();
+    HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline();
+
+    HoodieInstant latestInstant;
+    if (lastCompaction.isPresent()) {
+      latestInstant = lastCompaction.get();
+      // timeline containing the delta commits after the latest completed compaction commit,
+      // and the completed compaction commit instant
+      return Option.of(Pair.of(deltaCommits.findInstantsAfter(
+          latestInstant.getTimestamp(), Integer.MAX_VALUE), lastCompaction.get()));
+    } else {
+      if (deltaCommits.countInstants() > 0) {
+        latestInstant = deltaCommits.firstInstant().get();
+        // timeline containing all the delta commits, and the first delta commit instant
+        return Option.of(Pair.of(deltaCommits.findInstantsAfterOrEquals(
+            latestInstant.getTimestamp(), Integer.MAX_VALUE), latestInstant));
+      } else {
+        return Option.empty();
+      }
+    }
+  }
+
+  /**
+   * Gets the oldest instant to keep for MOR compaction.
+   * If there is no completed compaction,
+   * num delta commits >= "hoodie.compact.inline.max.delta.commits"
+   * If there is a completed compaction,
+   * num delta commits after latest completed compaction >= "hoodie.compact.inline.max.delta.commits"
+   *
+   * @param activeTimeline  Active timeline of a table.
+   * @param maxDeltaCommits Maximum number of delta commits that trigger the compaction plan,
+   *                        i.e., "hoodie.compact.inline.max.delta.commits".
+   * @return the oldest instant to keep for MOR compaction.
+   */
+  public static Option<HoodieInstant> getOldestInstantToKeepForCompaction(
+      HoodieActiveTimeline activeTimeline, int maxDeltaCommits) {

Review comment:
       should we not name this as getOldestInstantToKeepFor**Archival** ?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -946,6 +961,152 @@ public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enableMetadata) throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
+        enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
+
+    // When max archival commits is set to 4, even after 8 delta commits, since the number of delta
+    // commits is still smaller than 8, the archival should not kick in.
+    // The archival should only kick in after the 9th delta commit
+    // instant "00000001" to "00000009"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      if (i <= 8) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else {
+        assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+        assertFalse(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
+        IntStream.range(2, 10).forEach(j ->
+            assertTrue(commitsAfterArchival.contains(
+                new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+      }
+    }
+
+    testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
+
+    // instant "00000011" to "00000019"
+    for (int i = 1; i < 10; i++) {
+      testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i == 1
+          ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+      // archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      // first 9 delta commits before the completed compaction should be archived
+      IntStream.range(1, 10).forEach(j ->
+          assertFalse(commitsAfterArchival.contains(
+              new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
+
+      if (i == 1) {
+        assertEquals(8, originalCommits.size() - commitsAfterArchival.size());
+        // instant from "00000011" should be in the active timeline
+        assertTrue(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010")));
+        assertTrue(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000011")));
+      } else if (i < 8) {
+        assertEquals(originalCommits, commitsAfterArchival);
+      } else {
+        assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
+        assertFalse(commitsAfterArchival.contains(
+            new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010")));
+        // i == 8 -> ["00000011", "00000018"] should be in the active timeline

Review comment:
       is this comment valid ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org