You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/13 12:03:40 UTC

[hudi] 03/03: [HUDI-3120] Cache compactionPlan in buffer (#4463)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 91253ef05d2af82df2d530c847cd1440956b95e8
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Fri Dec 31 13:12:32 2021 +0800

    [HUDI-3120] Cache compactionPlan in buffer (#4463)
    
    Co-authored-by: yuzhaojing <yu...@bytedance.com>
---
 .../hudi/sink/compact/CompactionCommitSink.java      | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 0309278..5312735 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
@@ -68,6 +69,12 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
   private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
 
   /**
+   * Cache to store compaction plan for each instant.
+   * Stores the mapping of instant_time -> compactionPlan.
+   */
+  private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
+
+  /**
    * The hoodie table.
    */
   private transient HoodieFlinkTable<?> table;
@@ -84,6 +91,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
       this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
     }
     this.commitBuffer = new HashMap<>();
+    this.compactionPlanCache = new HashMap<>();
     this.table = this.writeClient.getHoodieTable();
   }
 
@@ -108,8 +116,15 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
    * @param events  Commit events ever received for the instant
    */
   private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException {
-    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
-        this.writeClient.getHoodieTable().getMetaClient(), instant);
+    HoodieCompactionPlan compactionPlan = compactionPlanCache.computeIfAbsent(instant, k -> {
+      try {
+        return CompactionUtils.getCompactionPlan(
+            this.writeClient.getHoodieTable().getMetaClient(), instant);
+      } catch (IOException e) {
+        throw new HoodieException(e);
+      }
+    });
+
     boolean isReady = compactionPlan.getOperations().size() == events.size();
     if (!isReady) {
       return;
@@ -143,5 +158,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
 
   private void reset(String instant) {
     this.commitBuffer.remove(instant);
+    this.compactionPlanCache.remove(instant);
   }
 }