You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/02 03:18:40 UTC

[hudi] branch master updated: [HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b2da9f  [HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)
3b2da9f is described below

commit 3b2da9f13847475be3dcef13b3d25df8818cecc7
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Wed Mar 2 11:18:17 2022 +0800

    [HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)
    
    Co-authored-by: yuzhaojing <yu...@bytedance.com>
---
 .../apache/hudi/sink/compact/CompactFunction.java  | 23 ++++++++++++++++------
 .../java/org/apache/hudi/util/CompactionUtil.java  | 13 ++++++++++++
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 560b5ff..a43fcd5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -21,9 +21,11 @@ package org.apache.hudi.sink.compact;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
 import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
   /**
    * Write Client.
    */
-  private transient HoodieFlinkWriteClient writeClient;
+  private transient HoodieFlinkWriteClient<?> writeClient;
 
   /**
    * Whether to execute compaction asynchronously.
@@ -89,21 +91,24 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
     if (asyncCompaction) {
       // executes the compaction task asynchronously to not block the checkpoint barrier propagate.
       executor.execute(
-          () -> doCompaction(instantTime, compactionOperation, collector),
+          () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
           (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
           "Execute compaction for instant %s from task %d", instantTime, taskID);
     } else {
       // executes the compaction task synchronously for batch mode.
       LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
-      doCompaction(instantTime, compactionOperation, collector);
+      doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
     }
   }
 
-  private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
-    HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
+  private void doCompaction(String instantTime,
+                            CompactionOperation compactionOperation,
+                            Collector<CompactionCommitEvent> collector,
+                            HoodieWriteConfig writeConfig) throws IOException {
+    HoodieFlinkMergeOnReadTableCompactor<?> compactor = new HoodieFlinkMergeOnReadTableCompactor<>();
     List<WriteStatus> writeStatuses = compactor.compact(
         new HoodieFlinkCopyOnWriteTable<>(
-            writeClient.getConfig(),
+            writeConfig,
             writeClient.getEngineContext(),
             writeClient.getHoodieTable().getMetaClient()),
         writeClient.getHoodieTable().getMetaClient(),
@@ -114,6 +119,12 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
     collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
   }
 
+  private HoodieWriteConfig reloadWriteConfig() throws Exception {
+    HoodieWriteConfig writeConfig = writeClient.getConfig();
+    CompactionUtil.setAvroSchema(writeConfig, writeClient.getHoodieTable().getMetaClient());
+    return writeConfig;
+  }
+
   @VisibleForTesting
   public void setExecutor(NonThrownExecutor executor) {
     this.executor = executor;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index d04937b..74629f9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.sink.compact.FlinkCompactionConfig;
@@ -107,6 +108,18 @@ public class CompactionUtil {
   }
 
   /**
+   * Sets up the avro schema string into the HoodieWriteConfig {@code HoodieWriteConfig}
+   * through reading from the hoodie table metadata.
+   *
+   * @param writeConfig The HoodieWriteConfig
+   */
+  public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws Exception {
+    TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
+    Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+    writeConfig.setSchema(tableAvroSchema.toString());
+  }
+
+  /**
    * Infers the changelog mode based on the data file schema(including metadata fields).
    *
    * <p>We can improve the code if the changelog mode is set up as table config.