You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/06/27 01:56:33 UTC

[hudi] branch master updated: [HUDI-5303] Allow users to control the concurrency to submit jobs in clustering (#7343)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eafe17a6a2 [HUDI-5303] Allow users to control the concurrency to submit jobs in clustering (#7343)
8eafe17a6a2 is described below

commit 8eafe17a6a276b1384d2e4b528fd0abdf190bd84
Author: Rex(Hui) An <bo...@gmail.com>
AuthorDate: Tue Jun 27 09:56:25 2023 +0800

    [HUDI-5303] Allow users to control the concurrency to submit jobs in clustering (#7343)
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  9 +++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../MultipleSparkJobExecutionStrategy.java         | 66 +++++++++++++---------
 3 files changed, 53 insertions(+), 26 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index cafed2febc6..e9ff847a6f0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -156,6 +156,15 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.9.0")
       .withDocumentation("Config to control frequency of async clustering");
 
+  public static final ConfigProperty<Integer> CLUSTERING_MAX_PARALLELISM = ConfigProperty
+      .key("hoodie.clustering.max.parallelism")
+      .defaultValue(15)
+      .sinceVersion("0.14.0")
+      .withDocumentation("Maximum number of parallelism jobs submitted in clustering operation. "
+          + "If the resource is sufficient(Like Spark engine has enough idle executors), increasing this "
+          + "value will let the clustering job run faster, while it will give additional pressure to the "
+          + "execution engines to manage more concurrent running jobs.");
+
   public static final ConfigProperty<String> PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.skipfromlatest.partitions")
       .defaultValue("0")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index eba9728777f..7b672abf241 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1634,6 +1634,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME);
   }
 
+  public int getClusteringMaxParallelism() {
+    return getInt(HoodieClusteringConfig.CLUSTERING_MAX_PARALLELISM);
+  }
+
   public ClusteringPlanPartitionFilterMode getClusteringPlanPartitionFilterMode() {
     String mode = getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME);
     return ClusteringPlanPartitionFilterMode.valueOf(mode);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 540da42fd78..c6a1df9105e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.FutureUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -82,6 +83,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -105,30 +108,39 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
   public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
     JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
     boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
-    // execute clustering for each group async and collect WriteStatus
-    Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
-            clusteringPlan.getInputGroups().stream()
-                .map(inputGroup -> {
-                  if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
-                    return runClusteringForGroupAsyncAsRow(inputGroup,
+    ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
+        Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),
+        new CustomizedThreadFactory("clustering-job-group", true));
+    try {
+      // execute clustering for each group async and collect WriteStatus
+      Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
+              clusteringPlan.getInputGroups().stream()
+                  .map(inputGroup -> {
+                    if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
+                      return runClusteringForGroupAsyncAsRow(inputGroup,
+                          clusteringPlan.getStrategy().getStrategyParams(),
+                          shouldPreserveMetadata,
+                          instantTime,
+                          clusteringExecutorService);
+                    }
+                    return runClusteringForGroupAsync(inputGroup,
                         clusteringPlan.getStrategy().getStrategyParams(),
                         shouldPreserveMetadata,
-                        instantTime);
-                  }
-                  return runClusteringForGroupAsync(inputGroup,
-                      clusteringPlan.getStrategy().getStrategyParams(),
-                      shouldPreserveMetadata,
-                      instantTime);
-                })
-                .collect(Collectors.toList()))
-        .join()
-        .stream();
-    JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
-    JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
-
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
-    writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
-    return writeMetadata;
+                        instantTime,
+                        clusteringExecutorService);
+                  })
+                  .collect(Collectors.toList()))
+          .join()
+          .stream();
+      JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
+      JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
+
+      HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
+      writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
+      return writeMetadata;
+    } finally {
+      clusteringExecutorService.shutdown();
+    }
   }
 
   /**
@@ -216,7 +228,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
    * Submit job to execute clustering for the group using Avro/HoodieRecord representation.
    */
   private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
-                                                                                boolean preserveHoodieMetadata, String instantTime) {
+                                                                                boolean preserveHoodieMetadata, String instantTime,
+                                                                                ExecutorService clusteringExecutorService) {
     return CompletableFuture.supplyAsync(() -> {
       JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
       HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
@@ -229,7 +242,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
           .collect(Collectors.toList());
       return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata,
           clusteringGroup.getExtraMetadata());
-    });
+    }, clusteringExecutorService);
   }
 
   /**
@@ -238,7 +251,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
   private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,
                                                                                      Map<String, String> strategyParams,
                                                                                      boolean shouldPreserveHoodieMetadata,
-                                                                                     String instantTime) {
+                                                                                     String instantTime,
+                                                                                     ExecutorService clusteringExecutorService) {
     return CompletableFuture.supplyAsync(() -> {
       JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
       Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
@@ -248,7 +262,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
           .collect(Collectors.toList());
       return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
           clusteringGroup.getExtraMetadata());
-    });
+    }, clusteringExecutorService);
   }
 
   /**