You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/21 11:36:27 UTC

[GitHub] [hudi] chenshzh commented on a diff in pull request #5991: [HUDI-4329] Add separate control for Flink compaction operation sync/async mode

chenshzh commented on code in PR #5991:
URL: https://github.com/apache/hudi/pull/5991#discussion_r1001686558


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java:
##########
@@ -74,14 +74,14 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
 
   public CompactFunction(Configuration conf) {
     this.conf = conf;
-    this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+    this.asyncCompactionOperation = OptionsResolver.needsAsyncCompactionOperation(conf);
   }

Review Comment:
   Normally we use compaction.async.enabled to turn on compaction. But we could not make it sync because it's already been true.
   ```java
       if (asyncCompaction) {
         // executes the compaction task asynchronously to not block the checkpoint barrier propagate.
         executor.execute(
             () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
             (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
             "Execute compaction for instant %s from task %d", instantTime, taskID);
       } else {
         // executes the compaction task synchronously for batch mode.
         LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
         doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
       }
   ```
   
   
   > support sync compaction for bounded source
   
   We will use sync compaction mode for unbounded source in some scenarios. And actually the bounded source sync compaction seems weird. It use compaction.async.enabled  true to turn on compaction, and then switch it to fasle for sync mode.
   
   ```java  
        // compaction
         if (OptionsResolver.needsAsyncCompaction(conf)) {   // here FlinkOptions.COMPACTION_ASYNC_ENABLED decides that we need compaction
           // use synchronous compaction for bounded source.
           if (context.isBounded()) {
             conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // we come here because it is true, and it's so weird to turn  it false, actually we just want the operation to be executed sync.
           }
           return Pipelines.compact(conf, pipeline);
         } else {
           return Pipelines.clean(conf, pipeline);
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org