You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2023/04/29 14:25:17 UTC

[hudi] branch master updated: [HUDI-6035] Make simple index parallelism auto inferred (#8468)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78ad883a067 [HUDI-6035] Make simple index parallelism auto inferred (#8468)
78ad883a067 is described below

commit 78ad883a067537bfef866dd5388faa4922efbd58
Author: clownxc <59...@qq.com>
AuthorDate: Sat Apr 29 22:25:07 2023 +0800

    [HUDI-6035] Make simple index parallelism auto inferred (#8468)
    
    
    ---------
    
    Co-authored-by: ClownXC <ch...@163.com>
    Co-authored-by: Raymond Xu <xu...@gmail.com>
---
 .../main/java/org/apache/hudi/config/HoodieIndexConfig.java    | 10 +++++-----
 .../java/org/apache/hudi/index/simple/HoodieSimpleIndex.java   |  7 ++++++-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index fd50fdb0f6d..dc0b1cd5f4a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -189,14 +189,14 @@ public class HoodieIndexConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM = ConfigProperty
       .key("hoodie.simple.index.parallelism")
-      .defaultValue("100")
+      .defaultValue("0")
       .markAdvanced()
       .withDocumentation("Only applies if index type is SIMPLE. "
           + "This limits the parallelism of fetching records from the base files of affected "
-          + "partitions. The index picks the configured parallelism if the number of base "
-          + "files is larger than this configured value; otherwise, the number of base files "
-          + "is used as the parallelism. If the indexing stage is slow due to the limited "
-          + "parallelism, you can increase this to tune the performance.");
+          + "partitions. By default, this is auto computed based on input workload characteristics. "
+          + "If the parallelism is explicitly configured by the user, the user-configured "
+          + "value is used in defining the actual parallelism. If the indexing stage is slow "
+          + "due to the limited parallelism, you can increase this to tune the performance.");
 
   public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM = ConfigProperty
       .key("hoodie.global.simple.index.parallelism")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index 95823ff51e3..dbc49d0655f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -107,11 +107,16 @@ public class HoodieSimpleIndex
           .getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
     }
 
+    int inputParallelism = inputRecords.getNumPartitions();
+    int configuredSimpleIndexParallelism = config.getSimpleIndexParallelism();
+    // NOTE: Target parallelism could be overridden by the config
+    int targetParallelism =
+        configuredSimpleIndexParallelism > 0 ? configuredSimpleIndexParallelism : inputParallelism;
     HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
         inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
     HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
         fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), context, hoodieTable,
-            config.getSimpleIndexParallelism());
+            targetParallelism);
 
     HoodieData<HoodieRecord<R>> taggedRecords =
         keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> {