You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/06 12:25:56 UTC

[GitHub] [hudi] yanghua commented on a change in pull request #2260: [HUDI-1381] Schedule compaction based on time elapsed

yanghua commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r552547254



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -60,17 +63,23 @@ protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
     Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+      deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
+      deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, Integer.MAX_VALUE).countInstants();
     }
-
-    int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
+    // judge if we need to compact according to num delta commits and time elapsed
+    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction
+                    && parseTs(lastCompactionTs) + config.getInlineCompactDeltaElapsedTimeMax() > parseTs(instantTime)) {

Review comment:
       Can we define a variable to make the code more readable?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -60,17 +63,23 @@ protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
     Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+      deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
+      deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, Integer.MAX_VALUE).countInstants();
     }
-
-    int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
+    // judge if we need to compact according to num delta commits and time elapsed
+    if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction
+                    && parseTs(lastCompactionTs) + config.getInlineCompactDeltaElapsedTimeMax() > parseTs(instantTime)) {
       LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction
           + " delta commits was found since last compaction " + lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
+          + config.getInlineCompactDeltaCommitMax() + ". Or " + config.getInlineCompactDeltaElapsedTimeMax()
+              + "ms elapsed time need since last compaction " + lastCompactionTs);

Review comment:
       IMO, we can use `String.format(...)` to make the log message more readable, right?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +86,94 @@ public void testSuccessfulCompaction() throws Exception {
   }
 
   @Test
-  public void testCompactionRetryOnFailure() throws Exception {
+  public void testSuccessfulCompactionForTime() throws Exception {

Review comment:
       ditto

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -39,21 +38,23 @@
 
 public class TestInlineCompaction extends CompactionTestBase {
 
-  private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
+  private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int maxDeltaTime) {
     return getConfigBuilder(false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
+            .withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)

Review comment:
       Can we break the second `withXXX ` into a new line?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -62,9 +63,9 @@ public void testCompactionIsNotScheduledEarly() throws Exception {
   }
 
   @Test
-  public void testSuccessfulCompaction() throws Exception {
+  public void testSuccessfulCompactionForNumCommits() throws Exception {

Review comment:
       Replacing `For` to `BasedOn` or `Via` sounds better?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -90,4 +99,13 @@ protected HoodieCompactionPlan scheduleCompaction() {
     }
   }
 
+  public Long parseTs(String time) {

Review comment:
       Please give it a better name?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +86,94 @@ public void testSuccessfulCompaction() throws Exception {
   }
 
   @Test
-  public void testCompactionRetryOnFailure() throws Exception {
+  public void testSuccessfulCompactionForTime() throws Exception {
+    // Given: make one commit
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(5,10);

Review comment:
       whitespace between `5,10`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org