You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/04 06:21:44 UTC

[GitHub] [hudi] TJX2014 commented on a diff in pull request #6025: [HUDI-4351] Improve HoodieFlinkCompactor

TJX2014 commented on code in PR #6025:
URL: https://github.com/apache/hudi/pull/6025#discussion_r912660211


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java:
##########
@@ -48,47 +60,154 @@
  *   as the instant time.</li>
  * </ul>
  */
-public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
+public class CompactionPlanSourceFunction
+    extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
+  private static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The interval between consecutive path scans.
+   */
+  private final long interval;
 
-  protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);
+  private volatile boolean isRunning = true;
+
+  private final Configuration conf;
+
+  protected HoodieFlinkWriteClient writeClient;
 
   /**
-   * Compaction instant time.
+   * The hoodie table.
    */
-  private final String compactionInstantTime;
+  private transient HoodieFlinkTable<?> table;
 
   /**
-   * The compaction plan.
+   * The path to monitor.
    */
-  private final HoodieCompactionPlan compactionPlan;
+  private final transient Path path;
+
+  private final Boolean isStreamingMode;
 
-  public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
-    this.compactionPlan = compactionPlan;
-    this.compactionInstantTime = compactionInstantTime;
+  public CompactionPlanSourceFunction(
+      Configuration conf,
+      String path,
+      Boolean isStreamingMode) {
+    this.conf = conf;
+    this.path = new Path(path);
+    this.isStreamingMode = isStreamingMode;
+    this.interval = conf.getInteger(FlinkOptions.COMPACTION_STREAMING_CHECK_INTERVAL);
   }
 
   @Override
   public void open(Configuration parameters) throws Exception {
-    // no operation
+    super.open(parameters);
+    if (writeClient == null) {
+      this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
+    }
+    this.table = this.writeClient.getHoodieTable();
   }
 
   @Override
-  public void run(SourceContext sourceContext) throws Exception {
-    List<CompactionOperation> operations = this.compactionPlan.getOperations().stream()
+  public void run(SourceContext<CompactionPlanEvent> context) throws Exception {
+    if (isStreamingMode) {
+      while (isRunning) {
+        monitorCompactionPlan(context);
+        TimeUnit.SECONDS.sleep(interval);
+      }
+    } else {
+      monitorCompactionPlan(context);
+    }
+  }
+
+  public void monitorCompactionPlan(SourceContext<CompactionPlanEvent> context) throws IOException {
+    table.getMetaClient().reloadActiveTimeline();
+
+    // checks the compaction plan and do compaction.
+    if (OptionsResolver.needsScheduleCompaction(conf)) {
+      Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(table.getMetaClient());
+      if (compactionInstantTimeOption.isPresent()) {
+        boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
+        if (!scheduled) {
+          // do nothing.
+          LOG.info("No compaction plan for this job ");
+          return;
+        }
+        table.getMetaClient().reloadActiveTimeline();
+      }
+    }
+
+    // fetch the instant based on the configured execution sequence
+    String compactionSeq = conf.getString(FlinkOptions.COMPACTION_SEQUENCE);
+    HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+    Option<HoodieInstant> requested = CompactionUtil.isLIFO(compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
+    if (!requested.isPresent()) {
+      // do nothing.
+      LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
+      return;
+    }
+
+    String compactionInstantTime = requested.get().getTimestamp();
+
+    HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
+    if (timeline.containsInstant(inflightInstant)) {
+      LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
+      table.rollbackInflightCompaction(inflightInstant);
+      table.getMetaClient().reloadActiveTimeline();

Review Comment:
   It will rollback it self in the next round.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org