You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/04/13 12:05:57 UTC

[hudi] branch master updated: [HUDI-3870] Add timeout rollback for flink online compaction (#5314)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f9b02decb [HUDI-3870] Add timeout rollback for flink online compaction (#5314)
6f9b02decb is described below

commit 6f9b02decb5bb2b83709b1b6ec04a97e4d102c11
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Apr 13 20:05:48 2022 +0800

    [HUDI-3870] Add timeout rollback for flink online compaction (#5314)
---
 .../apache/hudi/sink/compact/CompactionPlanOperator.java    | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 6df11fe224..48d4f48989 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -88,8 +88,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
       // when the earliest inflight instant has timed out, assumes it has failed
       // already and just rolls it back.
 
-      // comment out: do we really need the timeout rollback ?
-      // CompactionUtil.rollbackEarliestCompaction(table, conf);
+      CompactionUtil.rollbackEarliestCompaction(table, conf);
       scheduleCompaction(table, checkpointId);
     } catch (Throwable throwable) {
       // make it fail-safe
@@ -99,7 +98,8 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
 
   private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
     // the first instant takes the highest priority.
-    Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
+    HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+    Option<HoodieInstant> firstRequested = pendingCompactionTimeline
         .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
     if (!firstRequested.isPresent()) {
       // do nothing.
@@ -107,6 +107,13 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
       return;
     }
 
+    Option<HoodieInstant> firstInflight = pendingCompactionTimeline
+        .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
+    if (firstInflight.isPresent()) {
+      LOG.warn("Waiting for pending compaction instant : " + firstInflight + " to complete, skip scheduling new compaction plans");
+      return;
+    }
+
     String compactionInstantTime = firstRequested.get().getTimestamp();
 
     // generate compaction plan