You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/13 12:03:40 UTC
[hudi] 03/03: [HUDI-3120] Cache compactionPlan in buffer (#4463)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 91253ef05d2af82df2d530c847cd1440956b95e8
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Fri Dec 31 13:12:32 2021 +0800
[HUDI-3120] Cache compactionPlan in buffer (#4463)
Co-authored-by: yuzhaojing <yu...@bytedance.com>
---
.../hudi/sink/compact/CompactionCommitSink.java | 20 ++++++++++++++++++--
1 file changed, 18 insertions(+), 2 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 0309278..5312735 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
@@ -68,6 +69,12 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
/**
+ * Cache to store compaction plan for each instant.
+ * Stores the mapping of instant_time -> compactionPlan.
+ */
+ private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
+
+ /**
* The hoodie table.
*/
private transient HoodieFlinkTable<?> table;
@@ -84,6 +91,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
}
this.commitBuffer = new HashMap<>();
+ this.compactionPlanCache = new HashMap<>();
this.table = this.writeClient.getHoodieTable();
}
@@ -108,8 +116,15 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
* @param events Commit events ever received for the instant
*/
private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException {
- HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
- this.writeClient.getHoodieTable().getMetaClient(), instant);
+ HoodieCompactionPlan compactionPlan = compactionPlanCache.computeIfAbsent(instant, k -> {
+ try {
+ return CompactionUtils.getCompactionPlan(
+ this.writeClient.getHoodieTable().getMetaClient(), instant);
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ });
+
boolean isReady = compactionPlan.getOperations().size() == events.size();
if (!isReady) {
return;
@@ -143,5 +158,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
private void reset(String instant) {
this.commitBuffer.remove(instant);
+ this.compactionPlanCache.remove(instant);
}
}