You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ud...@apache.org on 2021/08/19 20:25:17 UTC

[hudi] branch release-0.9.0 updated: [HUDI-2167] HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException

This is an automated email from the ASF dual-hosted git repository.

uditme pushed a commit to branch release-0.9.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.9.0 by this push:
     new 85a933c  [HUDI-2167] HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException
85a933c is described below

commit 85a933c53dac93a711886af079d4fe4341603f29
Author: leiqiang <ts...@gmail.com>
AuthorDate: Tue Jul 13 11:26:12 2021 +0800

    [HUDI-2167] HoodieCompactionConfig get HoodieCleaningPolicy NullPointerException
    
    close apache/hudi#3402
---
 hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 06305e2..0fac2cc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -42,8 +43,8 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
-import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.transform.ChainedTransformer;
+import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 
@@ -168,6 +169,7 @@ public class StreamerUtil {
                     // actually Flink cleaning is always with parallelism 1 now
                     .withCleanerParallelism(20)
                     .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
+                    .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
                     .build())
             .withMemoryConfig(
                 HoodieMemoryConfig.newBuilder()
@@ -372,7 +374,7 @@ public class StreamerUtil {
 
     return fileStatus.getLen() > 0;
   }
-  
+
   public static boolean allowDuplicateInserts(Configuration conf) {
     WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
     return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);