You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/06/27 01:56:33 UTC
[hudi] branch master updated: [HUDI-5303] Allow users to control the concurrency to submit jobs in clustering (#7343)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8eafe17a6a2 [HUDI-5303] Allow users to control the concurrency to submit jobs in clustering (#7343)
8eafe17a6a2 is described below
commit 8eafe17a6a276b1384d2e4b528fd0abdf190bd84
Author: Rex(Hui) An <bo...@gmail.com>
AuthorDate: Tue Jun 27 09:56:25 2023 +0800
[HUDI-5303] Allow users to control the concurrency to submit jobs in clustering (#7343)
---
.../apache/hudi/config/HoodieClusteringConfig.java | 9 +++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../MultipleSparkJobExecutionStrategy.java | 66 +++++++++++++---------
3 files changed, 53 insertions(+), 26 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index cafed2febc6..e9ff847a6f0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -156,6 +156,15 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("Config to control frequency of async clustering");
+ public static final ConfigProperty<Integer> CLUSTERING_MAX_PARALLELISM = ConfigProperty
+ .key("hoodie.clustering.max.parallelism")
+ .defaultValue(15)
+ .sinceVersion("0.14.0")
+ .withDocumentation("Maximum number of parallelism jobs submitted in clustering operation. "
+ + "If the resource is sufficient(Like Spark engine has enough idle executors), increasing this "
+ + "value will let the clustering job run faster, while it will give additional pressure to the "
+ + "execution engines to manage more concurrent running jobs.");
+
public static final ConfigProperty<String> PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.skipfromlatest.partitions")
.defaultValue("0")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index eba9728777f..7b672abf241 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1634,6 +1634,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME);
}
+ public int getClusteringMaxParallelism() {
+ return getInt(HoodieClusteringConfig.CLUSTERING_MAX_PARALLELISM);
+ }
+
public ClusteringPlanPartitionFilterMode getClusteringPlanPartitionFilterMode() {
String mode = getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME);
return ClusteringPlanPartitionFilterMode.valueOf(mode);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 540da42fd78..c6a1df9105e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -82,6 +83,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -105,30 +108,39 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
- // execute clustering for each group async and collect WriteStatus
- Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
- clusteringPlan.getInputGroups().stream()
- .map(inputGroup -> {
- if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
- return runClusteringForGroupAsyncAsRow(inputGroup,
+ ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
+ Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),
+ new CustomizedThreadFactory("clustering-job-group", true));
+ try {
+ // execute clustering for each group async and collect WriteStatus
+ Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
+ clusteringPlan.getInputGroups().stream()
+ .map(inputGroup -> {
+ if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
+ return runClusteringForGroupAsyncAsRow(inputGroup,
+ clusteringPlan.getStrategy().getStrategyParams(),
+ shouldPreserveMetadata,
+ instantTime,
+ clusteringExecutorService);
+ }
+ return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
- instantTime);
- }
- return runClusteringForGroupAsync(inputGroup,
- clusteringPlan.getStrategy().getStrategyParams(),
- shouldPreserveMetadata,
- instantTime);
- })
- .collect(Collectors.toList()))
- .join()
- .stream();
- JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
- JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
-
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
- writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
- return writeMetadata;
+ instantTime,
+ clusteringExecutorService);
+ })
+ .collect(Collectors.toList()))
+ .join()
+ .stream();
+ JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
+ JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
+
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
+ writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
+ return writeMetadata;
+ } finally {
+ clusteringExecutorService.shutdown();
+ }
}
/**
@@ -216,7 +228,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
* Submit job to execute clustering for the group using Avro/HoodieRecord representation.
*/
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
- boolean preserveHoodieMetadata, String instantTime) {
+ boolean preserveHoodieMetadata, String instantTime,
+ ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
@@ -229,7 +242,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
.collect(Collectors.toList());
return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
- });
+ }, clusteringExecutorService);
}
/**
@@ -238,7 +251,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,
Map<String, String> strategyParams,
boolean shouldPreserveHoodieMetadata,
- String instantTime) {
+ String instantTime,
+ ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
@@ -248,7 +262,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
.collect(Collectors.toList());
return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
- });
+ }, clusteringExecutorService);
}
/**