You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/18 03:21:18 UTC

[hudi] branch master updated: Clean the marker files for flink compaction (#5611)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1017c66aa Clean the marker files for flink compaction (#5611)
a1017c66aa is described below

commit a1017c66aaa377dad7e5e62f773bb714d53fc353
Author: luokey <85...@qq.com>
AuthorDate: Wed May 18 11:21:14 2022 +0800

    Clean the marker files for flink compaction (#5611)
    
    Co-authored-by: 854194341@qq.com <loukey_7821>
---
 .../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java     | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 48d4f48989..338352d4b0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkTables;
 
@@ -134,6 +135,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
       List<CompactionOperation> operations = compactionPlan.getOperations().stream()
           .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
       LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
+      WriteMarkersFactory
+          .get(table.getConfig().getMarkersType(), table, compactionInstantTime)
+          .deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
       for (CompactionOperation operation : operations) {
         output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
       }