You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2023/04/29 14:25:17 UTC
[hudi] branch master updated: [HUDI-6035] Make simple index parallelism auto inferred (#8468)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 78ad883a067 [HUDI-6035] Make simple index parallelism auto inferred (#8468)
78ad883a067 is described below
commit 78ad883a067537bfef866dd5388faa4922efbd58
Author: clownxc <59...@qq.com>
AuthorDate: Sat Apr 29 22:25:07 2023 +0800
[HUDI-6035] Make simple index parallelism auto inferred (#8468)
---------
Co-authored-by: ClownXC <ch...@163.com>
Co-authored-by: Raymond Xu <xu...@gmail.com>
---
.../main/java/org/apache/hudi/config/HoodieIndexConfig.java | 10 +++++-----
.../java/org/apache/hudi/index/simple/HoodieSimpleIndex.java | 7 ++++++-
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index fd50fdb0f6d..dc0b1cd5f4a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -189,14 +189,14 @@ public class HoodieIndexConfig extends HoodieConfig {
public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM = ConfigProperty
.key("hoodie.simple.index.parallelism")
- .defaultValue("100")
+ .defaultValue("0")
.markAdvanced()
.withDocumentation("Only applies if index type is SIMPLE. "
+ "This limits the parallelism of fetching records from the base files of affected "
- + "partitions. The index picks the configured parallelism if the number of base "
- + "files is larger than this configured value; otherwise, the number of base files "
- + "is used as the parallelism. If the indexing stage is slow due to the limited "
- + "parallelism, you can increase this to tune the performance.");
+ + "partitions. By default, this is auto computed based on input workload characteristics. "
+ + "If the parallelism is explicitly configured by the user, the user-configured "
+ + "value is used in defining the actual parallelism. If the indexing stage is slow "
+ + "due to the limited parallelism, you can increase this to tune the performance.");
public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM = ConfigProperty
.key("hoodie.global.simple.index.parallelism")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index 95823ff51e3..dbc49d0655f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -107,11 +107,16 @@ public class HoodieSimpleIndex
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
+ int inputParallelism = inputRecords.getNumPartitions();
+ int configuredSimpleIndexParallelism = config.getSimpleIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
+ configuredSimpleIndexParallelism > 0 ? configuredSimpleIndexParallelism : inputParallelism;
HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), context, hoodieTable,
- config.getSimpleIndexParallelism());
+ targetParallelism);
HoodieData<HoodieRecord<R>> taggedRecords =
keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> {