You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/18 03:21:18 UTC
[hudi] branch master updated: Clean the marker files for flink compaction (#5611)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a1017c66aa Clean the marker files for flink compaction (#5611)
a1017c66aa is described below
commit a1017c66aaa377dad7e5e62f773bb714d53fc353
Author: luokey <85...@qq.com>
AuthorDate: Wed May 18 11:21:14 2022 +0800
Clean the marker files for flink compaction (#5611)
Co-authored-by: 854194341@qq.com <loukey_7821>
---
.../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 48d4f48989..338352d4b0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables;
@@ -134,6 +135,9 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
+ WriteMarkersFactory
+ .get(table.getConfig().getMarkersType(), table, compactionInstantTime)
+ .deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
}