You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/08 01:55:13 UTC
[hudi] branch master updated: [HUDI-4366] Synchronous cleaning for flink bounded source (#6051)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c744848c59 [HUDI-4366] Synchronous cleaning for flink bounded source (#6051)
c744848c59 is described below
commit c744848c5915c6e8b50d77c2502e772e76107ca7
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jul 8 09:55:07 2022 +0800
[HUDI-4366] Synchronous cleaning for flink bounded source (#6051)
---
.../main/java/org/apache/hudi/configuration/OptionsResolver.java | 8 ++++++++
.../src/main/java/org/apache/hudi/sink/CleanFunction.java | 4 ++--
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 4cfa0bc92a..64bd91f480 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -164,4 +164,12 @@ public class OptionsResolver {
public static boolean sortClusteringEnabled(Configuration conf) {
return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS));
}
+
+ /**
+ * Returns whether the operation is INSERT OVERWRITE (table or partition).
+ */
+ public static boolean isInsertOverwrite(Configuration conf) {
+ return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
+ || conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 65f07d7c7a..1c827517ff 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -19,9 +19,9 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
@@ -65,7 +65,7 @@ public class CleanFunction<T> extends AbstractRichFunction
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
- if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) {
+ if (OptionsResolver.isInsertOverwrite(conf)) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info(String.format("exec sync clean with instant time %s...", instantTime));
executor.execute(() -> writeClient.clean(instantTime), "wait for sync cleaning finish");