You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/31 13:18:26 UTC

[hudi] branch master updated: [HUDI-6010] Always write parquets for insert overwrite operation (#8339)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c664fd1f82 [HUDI-6010] Always write parquets for insert overwrite operation (#8339)
7c664fd1f82 is described below

commit 7c664fd1f8251ca616aa340d6d9ec605e302b955
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Mar 31 21:18:14 2023 +0800

    [HUDI-6010] Always write parquets for insert overwrite operation (#8339)
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 24 +++++++++++++++-------
 .../apache/hudi/io/FlinkWriteHandleFactory.java    |  6 +++++-
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 28327192a89..0498d190fc8 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -203,7 +203,7 @@ public class HoodieFlinkWriteClient<T> extends
     preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
     // create the write handle if not exists
     HoodieWriteMetadata<List<WriteStatus>> result;
-    try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+    try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true)) {
       result = ((HoodieFlinkTable<T>) table).insertOverwrite(context, closeableHandle.getWriteHandle(), instantTime, records);
     }
     return postWrite(result, instantTime, table);
@@ -223,7 +223,7 @@ public class HoodieFlinkWriteClient<T> extends
     preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
     // create the write handle if not exists
     HoodieWriteMetadata<List<WriteStatus>> result;
-    try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+    try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true)) {
       result = ((HoodieFlinkTable<T>) table).insertOverwriteTable(context, closeableHandle.getWriteHandle(), instantTime, records);
     }
     return postWrite(result, instantTime, table);
@@ -443,6 +443,8 @@ public class HoodieFlinkWriteClient<T> extends
    * @param instantTime The instant time
    * @param table       The table
    * @param recordItr   Record iterator
+   * @param overwrite   Whether this is an overwrite operation
+   *
    * @return Existing write handle or create a new one
    */
   private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
@@ -450,12 +452,13 @@ public class HoodieFlinkWriteClient<T> extends
       HoodieWriteConfig config,
       String instantTime,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-      Iterator<HoodieRecord<T>> recordItr) {
+      Iterator<HoodieRecord<T>> recordItr,
+      boolean overwrite) {
     // caution: it's not a good practice to modify the handles internal.
     FlinkWriteHandleFactory.Factory<T,
         List<HoodieRecord<T>>,
         List<HoodieKey>,
-        List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config);
+        List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config, overwrite);
     return writeHandleFactory.create(this.bucketToHandles, record, config, instantTime, table, recordItr);
   }
 
@@ -503,9 +506,16 @@ public class HoodieFlinkWriteClient<T> extends
     AutoCloseableWriteHandle(
         List<HoodieRecord<T>> records,
         String instantTime,
-        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table
-    ) {
-      this.writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator());
+        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
+      this (records, instantTime, table, false);
+    }
+
+    AutoCloseableWriteHandle(
+        List<HoodieRecord<T>> records,
+        String instantTime,
+        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+        boolean overwrite) {
+      this.writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator(), overwrite);
     }
 
     HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
index f5f5e935dec..1842e827fab 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -41,7 +41,11 @@ public class FlinkWriteHandleFactory {
    */
   public static <T, I, K, O> Factory<T, I, K, O> getFactory(
       HoodieTableConfig tableConfig,
-      HoodieWriteConfig writeConfig) {
+      HoodieWriteConfig writeConfig,
+      boolean overwrite) {
+    if (overwrite) {
+      return CommitWriteHandleFactory.getInstance();
+    }
     if (writeConfig.allowDuplicateInserts()) {
       return ClusterWriteHandleFactory.getInstance();
     }