You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:37 UTC

[hudi] 10/20: [HUDI-5343] HoodieFlinkStreamer supports async clustering for append mode (#7403)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 59094436c9d6fae5b729a38e1d192993446ed504
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Dec 28 20:14:51 2022 +0800

    [HUDI-5343] HoodieFlinkStreamer supports async clustering for append mode (#7403)
    
    (cherry picked from commit f2b2ec9539d97bae3952c00085cdfc6c786e9239)
---
 .../sink/clustering/FlinkClusteringConfig.java     | 37 +++++++--------
 .../hudi/sink/compact/FlinkCompactionConfig.java   | 30 ++++++------
 .../apache/hudi/streamer/FlinkStreamerConfig.java  | 53 ++++++++++++++++++++--
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  | 21 +++++++--
 4 files changed, 95 insertions(+), 46 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
index 3bbae38e00e..739cb29052a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -54,70 +54,67 @@ public class FlinkClusteringConfig extends Configuration {
   // ------------------------------------------------------------------------
   //  Clustering Options
   // ------------------------------------------------------------------------
-  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 4 commits", required = false)
+  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 1 commit")
   public Integer clusteringDeltaCommits = 1;
 
-  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1", required = false)
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1")
   public Integer clusteringTasks = -1;
 
-  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.")
   public Integer compactionMaxMemory = 100;
 
   @Parameter(names = {"--clean-retain-commits"},
       description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
-          + "This also directly translates into how much you can incrementally pull on this table, default 10",
-      required = false)
+          + "This also directly translates into how much you can incrementally pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
   @Parameter(names = {"--archive-min-commits"},
-      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
-      required = false)
+      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.")
   public Integer archiveMinCommits = 20;
 
   @Parameter(names = {"--archive-max-commits"},
-      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
-      required = false)
+      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.")
   public Integer archiveMaxCommits = 30;
 
   @Parameter(names = {"--schedule", "-sc"}, description = "Schedule the clustering plan in this job.\n"
-      + "Default is false", required = false)
+      + "Default is false")
   public Boolean schedule = false;
 
   @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time")
   public String clusteringInstantTime = null;
 
-  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, disabled by default", required = false)
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, disabled by default")
   public Boolean cleanAsyncEnable = false;
 
-  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan")
   public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
 
-  @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan", required = false)
+  @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan")
   public String planPartitionFilterMode = "NONE";
 
-  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB")
   public Long targetFileMaxBytes = 1024 * 1024 * 1024L;
 
-  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false)
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB")
   public Long smallFileLimit = 600L;
 
-  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false)
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0")
   public Integer skipFromLatestPartitions = 0;
 
-  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.", required = false)
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.")
   public String sortColumns = "";
 
-  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30", required = false)
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30")
   public Integer maxNumGroups = 30;
 
-  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2", required = false)
+  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2")
   public Integer targetPartitions = 2;
 
   public static final String SEQ_FIFO = "FIFO";
   public static final String SEQ_LIFO = "LIFO";
   @Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n"
       + "1). FIFO: execute the oldest plan first;\n"
-      + "2). LIFO: execute the latest plan first, by default FIFO", required = false)
+      + "2). LIFO: execute the latest plan first, by default FIFO")
   public String clusteringSeq = SEQ_FIFO;
 
   @Parameter(names = {"--service"}, description = "Flink Clustering runs in service mode, disable by default")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 449b0684615..0308d246333 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -53,55 +53,51 @@ public class FlinkCompactionConfig extends Configuration {
           + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
           + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
           + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
-          + "Default is 'num_commits'",
-      required = false)
+          + "Default is 'num_commits'")
   public String compactionTriggerStrategy = NUM_COMMITS;
 
-  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits", required = false)
+  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 1 commit")
   public Integer compactionDeltaCommits = 1;
 
-  @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour", required = false)
+  @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour")
   public Integer compactionDeltaSeconds = 3600;
 
-  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
   public Boolean cleanAsyncEnable = false;
 
   @Parameter(names = {"--clean-retain-commits"},
       description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
-          + "This also directly translates into how much you can incrementally pull on this table, default 10",
-      required = false)
+          + "This also directly translates into how much you can incrementally pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
   @Parameter(names = {"--archive-min-commits"},
-      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
-      required = false)
+      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.")
   public Integer archiveMinCommits = 20;
 
   @Parameter(names = {"--archive-max-commits"},
-      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
-      required = false)
+      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.")
   public Integer archiveMaxCommits = 30;
 
-  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.")
   public Integer compactionMaxMemory = 100;
 
-  @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false)
+  @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.")
   public Long compactionTargetIo = 512000L;
 
-  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false)
+  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1")
   public Integer compactionTasks = -1;
 
   @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the compaction plan in this job.\n"
       + "There is a risk of losing data when scheduling compaction outside the writer job.\n"
       + "Scheduling compaction in the writer job and only let this job do the compaction execution is recommended.\n"
-      + "Default is false", required = false)
+      + "Default is false")
   public Boolean schedule = false;
 
   public static final String SEQ_FIFO = "FIFO";
   public static final String SEQ_LIFO = "LIFO";
   @Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n"
       + "1). FIFO: execute the oldest plan first;\n"
-      + "2). LIFO: execute the latest plan first, by default LIFO", required = false)
+      + "2). LIFO: execute the latest plan first, by default LIFO")
   public String compactionSeq = SEQ_FIFO;
 
   @Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
@@ -126,7 +122,7 @@ public class FlinkCompactionConfig extends Configuration {
       + "It's only effective for 'instants' plan selection strategy.")
   public String compactionPlanInstant;
 
-  @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
+  @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.")
   public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index b2f72aed7da..5df71c64221 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.streamer;
 
+import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
 import org.apache.hudi.client.utils.OperationConverter;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -239,8 +240,8 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--compaction-async-enabled"}, description = "Async Compaction, enabled by default for MOR")
   public Boolean compactionAsyncEnabled = true;
 
-  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10")
-  public Integer compactionTasks = 10;
+  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1")
+  public Integer compactionTasks = -1;
 
   @Parameter(names = {"--compaction-trigger-strategy"},
       description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
@@ -250,8 +251,8 @@ public class FlinkStreamerConfig extends Configuration {
           + "Default is 'num_commits'")
   public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;
 
-  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
-  public Integer compactionDeltaCommits = 5;
+  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 1 commit")
+  public Integer compactionDeltaCommits = 1;
 
   @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour")
   public Integer compactionDeltaSeconds = 3600;
@@ -262,6 +263,39 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
   public Long compactionTargetIo = 512000L;
 
+  @Parameter(names = {"--clustering-async-enabled"}, description = "Async Clustering, disable by default")
+  public Boolean clusteringAsyncEnabled = false;
+
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1")
+  public Integer clusteringTasks = -1;
+
+  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 1 commit")
+  public Integer clusteringDeltaCommits = 1;
+
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan")
+  public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
+
+  @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan")
+  public String planPartitionFilterMode = "NONE";
+
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB")
+  public Long targetFileMaxBytes = 1024 * 1024 * 1024L;
+
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB")
+  public Long smallFileLimit = 600L;
+
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0")
+  public Integer skipFromLatestPartitions = 0;
+
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.")
+  public String sortColumns = "";
+
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30")
+  public Integer maxNumGroups = 30;
+
+  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2")
+  public Integer targetPartitions = 2;
+
   @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
   public Boolean cleanAsyncEnabled = true;
 
@@ -406,6 +440,17 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
     conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
     conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, config.clusteringAsyncEnabled);
+    conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode);
+    conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
+    conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
+    conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+    conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups);
+    conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, config.targetPartitions);
     conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
     conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
     conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b08eb570ce0..b1249334510 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -99,12 +99,23 @@ public class HoodieFlinkStreamer {
     }
 
     OptionsInference.setupSinkTasks(conf, env.getParallelism());
-    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
-    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
-    if (OptionsResolver.needsAsyncCompaction(conf)) {
-      Pipelines.compact(conf, pipeline);
+    DataStream<Object> pipeline;
+    // Append mode
+    if (OptionsResolver.isAppendMode(conf)) {
+      pipeline = Pipelines.append(conf, rowType, dataStream, false);
+      if (OptionsResolver.needsAsyncClustering(conf)) {
+        Pipelines.cluster(conf, rowType, pipeline);
+      } else {
+        Pipelines.dummySink(pipeline);
+      }
     } else {
-      Pipelines.clean(conf, pipeline);
+      DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
+      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
+      if (OptionsResolver.needsAsyncCompaction(conf)) {
+        Pipelines.compact(conf, pipeline);
+      } else {
+        Pipelines.clean(conf, pipeline);
+      }
     }
 
     String jobName = cfg.targetDatabaseName.isEmpty() ? cfg.targetTableName :