You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/02 03:18:40 UTC
[hudi] branch master updated: [HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3b2da9f [HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)
3b2da9f is described below
commit 3b2da9f13847475be3dcef13b3d25df8818cecc7
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Wed Mar 2 11:18:17 2022 +0800
[HUDI-2631] In CompactFunction, set up the write schema each time with the latest schema (#4000)
Co-authored-by: yuzhaojing <yu...@bytedance.com>
---
.../apache/hudi/sink/compact/CompactFunction.java | 23 ++++++++++++++++------
.../java/org/apache/hudi/util/CompactionUtil.java | 13 ++++++++++++
2 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 560b5ff..a43fcd5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -21,9 +21,11 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
/**
* Write Client.
*/
- private transient HoodieFlinkWriteClient writeClient;
+ private transient HoodieFlinkWriteClient<?> writeClient;
/**
* Whether to execute compaction asynchronously.
@@ -89,21 +91,24 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
- () -> doCompaction(instantTime, compactionOperation, collector),
+ () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
- doCompaction(instantTime, compactionOperation, collector);
+ doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
}
}
- private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
- HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
+ private void doCompaction(String instantTime,
+ CompactionOperation compactionOperation,
+ Collector<CompactionCommitEvent> collector,
+ HoodieWriteConfig writeConfig) throws IOException {
+ HoodieFlinkMergeOnReadTableCompactor<?> compactor = new HoodieFlinkMergeOnReadTableCompactor<>();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
- writeClient.getConfig(),
+ writeConfig,
writeClient.getEngineContext(),
writeClient.getHoodieTable().getMetaClient()),
writeClient.getHoodieTable().getMetaClient(),
@@ -114,6 +119,12 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
}
+ private HoodieWriteConfig reloadWriteConfig() throws Exception {
+ HoodieWriteConfig writeConfig = writeClient.getConfig();
+ CompactionUtil.setAvroSchema(writeConfig, writeClient.getHoodieTable().getMetaClient());
+ return writeConfig;
+ }
+
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index d04937b..74629f9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
@@ -107,6 +108,18 @@ public class CompactionUtil {
}
/**
+ * Sets up the avro schema string into the HoodieWriteConfig {@code HoodieWriteConfig}
+ * through reading from the hoodie table metadata.
+ *
+ * @param writeConfig The HoodieWriteConfig
+ */
+ public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws Exception {
+ TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
+ Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+ writeConfig.setSchema(tableAvroSchema.toString());
+ }
+
+ /**
* Infers the changelog mode based on the data file schema(including metadata fields).
*
* <p>We can improve the code if the changelog mode is set up as table config.