You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ud...@apache.org on 2021/08/19 20:25:17 UTC
[hudi] branch release-0.9.0 updated: [HUDI-2167]
HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException
This is an automated email from the ASF dual-hosted git repository.
uditme pushed a commit to branch release-0.9.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.9.0 by this push:
new 85a933c [HUDI-2167] HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException
85a933c is described below
commit 85a933c53dac93a711886af079d4fe4341603f29
Author: leiqiang <ts...@gmail.com>
AuthorDate: Tue Jul 13 11:26:12 2021 +0800
[HUDI-2167] HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException
close apache/hudi#3402
---
hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 06305e2..0fac2cc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -42,8 +43,8 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.schema.FilebasedSchemaProvider;
-import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.transform.ChainedTransformer;
+import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -168,6 +169,7 @@ public class StreamerUtil {
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()
@@ -372,7 +374,7 @@ public class StreamerUtil {
return fileStatus.getLen() > 0;
}
-
+
public static boolean allowDuplicateInserts(Configuration conf) {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);