You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/08 01:55:13 UTC

[hudi] branch master updated: [HUDI-4366] Synchronous cleaning for flink bounded source (#6051)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c744848c59 [HUDI-4366] Synchronous cleaning for flink bounded source (#6051)
c744848c59 is described below

commit c744848c5915c6e8b50d77c2502e772e76107ca7
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jul 8 09:55:07 2022 +0800

    [HUDI-4366] Synchronous cleaning for flink bounded source (#6051)
---
 .../main/java/org/apache/hudi/configuration/OptionsResolver.java  | 8 ++++++++
 .../src/main/java/org/apache/hudi/sink/CleanFunction.java         | 4 ++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 4cfa0bc92a..64bd91f480 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -164,4 +164,12 @@ public class OptionsResolver {
   public static boolean sortClusteringEnabled(Configuration conf) {
     return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS));
   }
+
+  /**
+   * Returns whether the operation is INSERT OVERWRITE (table or partition).
+   */
+  public static boolean isInsertOverwrite(Configuration conf) {
+    return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
+        || conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 65f07d7c7a..1c827517ff 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -19,9 +19,9 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -65,7 +65,7 @@ public class CleanFunction<T> extends AbstractRichFunction
       this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
       this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
 
-      if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) {
+      if (OptionsResolver.isInsertOverwrite(conf)) {
         String instantTime = HoodieActiveTimeline.createNewInstantTime();
         LOG.info(String.format("exec sync clean with instant time %s...", instantTime));
         executor.execute(() -> writeClient.clean(instantTime), "wait for sync cleaning finish");