You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/31 13:18:26 UTC
[hudi] branch master updated: [HUDI-6010] Always write parquets for insert overwrite operation (#8339)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7c664fd1f82 [HUDI-6010] Always write parquets for insert overwrite operation (#8339)
7c664fd1f82 is described below
commit 7c664fd1f8251ca616aa340d6d9ec605e302b955
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Mar 31 21:18:14 2023 +0800
[HUDI-6010] Always write parquets for insert overwrite operation (#8339)
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 24 +++++++++++++++-------
.../apache/hudi/io/FlinkWriteHandleFactory.java | 6 +++++-
2 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 28327192a89..0498d190fc8 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -203,7 +203,7 @@ public class HoodieFlinkWriteClient<T> extends
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
// create the write handle if not exists
HoodieWriteMetadata<List<WriteStatus>> result;
- try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true)) {
result = ((HoodieFlinkTable<T>) table).insertOverwrite(context, closeableHandle.getWriteHandle(), instantTime, records);
}
return postWrite(result, instantTime, table);
@@ -223,7 +223,7 @@ public class HoodieFlinkWriteClient<T> extends
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
// create the write handle if not exists
HoodieWriteMetadata<List<WriteStatus>> result;
- try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true)) {
result = ((HoodieFlinkTable<T>) table).insertOverwriteTable(context, closeableHandle.getWriteHandle(), instantTime, records);
}
return postWrite(result, instantTime, table);
@@ -443,6 +443,8 @@ public class HoodieFlinkWriteClient<T> extends
* @param instantTime The instant time
* @param table The table
* @param recordItr Record iterator
+ * @param overwrite Whether this is an overwrite operation
+ *
* @return Existing write handle or create a new one
*/
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
@@ -450,12 +452,13 @@ public class HoodieFlinkWriteClient<T> extends
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- Iterator<HoodieRecord<T>> recordItr) {
+ Iterator<HoodieRecord<T>> recordItr,
+ boolean overwrite) {
// caution: it's not a good practice to modify the handles internal.
FlinkWriteHandleFactory.Factory<T,
List<HoodieRecord<T>>,
List<HoodieKey>,
- List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config);
+ List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config, overwrite);
return writeHandleFactory.create(this.bucketToHandles, record, config, instantTime, table, recordItr);
}
@@ -503,9 +506,16 @@ public class HoodieFlinkWriteClient<T> extends
AutoCloseableWriteHandle(
List<HoodieRecord<T>> records,
String instantTime,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table
- ) {
- this.writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator());
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
+ this (records, instantTime, table, false);
+ }
+
+ AutoCloseableWriteHandle(
+ List<HoodieRecord<T>> records,
+ String instantTime,
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ boolean overwrite) {
+ this.writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator(), overwrite);
}
HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
index f5f5e935dec..1842e827fab 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -41,7 +41,11 @@ public class FlinkWriteHandleFactory {
*/
public static <T, I, K, O> Factory<T, I, K, O> getFactory(
HoodieTableConfig tableConfig,
- HoodieWriteConfig writeConfig) {
+ HoodieWriteConfig writeConfig,
+ boolean overwrite) {
+ if (overwrite) {
+ return CommitWriteHandleFactory.getInstance();
+ }
if (writeConfig.allowDuplicateInserts()) {
return ClusterWriteHandleFactory.getInstance();
}