You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:54 UTC

[hudi] 13/17: [HUDI-4924] Auto-tune dedup parallelism (#6802)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b8b0256095a649d4cdfe5592295df153f6746ea4
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Sep 28 08:03:41 2022 -0700

    [HUDI-4924] Auto-tune dedup parallelism (#6802)
---
 .../table/action/commit/HoodieWriteHelper.java     |  5 ++-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  5 +++
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 14 ++++++--
 .../org/apache/hudi/data/TestHoodieJavaRDD.java    | 40 ++++++++++++++++++++++
 .../org/apache/hudi/common/data/HoodieData.java    |  9 +++--
 .../apache/hudi/common/data/HoodieListData.java    |  5 +++
 .../hudi/common/data/TestHoodieListData.java       |  8 +++++
 7 files changed, 80 insertions(+), 6 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 80762b1de8..b359550e8a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -54,6 +54,9 @@ public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWri
   public HoodieData<HoodieRecord<T>> deduplicateRecords(
       HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
     boolean isIndexingGlobal = index.isGlobal();
+    // Auto-tunes the parallelism for reduce transformation based on the number of data partitions
+    // in engine-specific representation
+    int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism));
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their partitionPath
@@ -65,7 +68,7 @@ public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWri
       HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
 
       return new HoodieAvroRecord<>(reducedKey, reducedData);
-    }, parallelism).map(Pair::getRight);
+    }, reduceParallelism).map(Pair::getRight);
   }
 
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 3964fa2d6b..ed9613bc15 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -102,6 +102,11 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
     return rddData.count();
   }
 
+  @Override
+  public int getNumPartitions() {
+    return rddData.getNumPartitions();
+  }
+
   @Override
   public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
     return HoodieJavaRDD.of(rddData.map(func::apply));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 4b7a0139dd..8aafbcd9f6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -460,11 +460,17 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
 
     HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
         jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
+    HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .combineInput(true, true);
+    addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
 
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+    int dedupParallelism = records.getNumPartitions() + 100;
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism);
+    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
+    assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
     assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
     assertNodupesWithinPartition(dedupedRecs);
@@ -472,13 +478,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // non-Global dedup should be done based on both recordKey and partitionPath
     index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(false);
-    dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+    dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism);
+    dedupedRecs = dedupedRecsRdd.collectAsList();
+    assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(2, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 
     // Perform write-action and check
     JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
-    HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+    configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .combineInput(true, true);
     addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
new file mode 100644
index 0000000000..7595888304
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieJavaRDD extends HoodieClientTestBase {
+  @Test
+  public void testGetNumPartitions() {
+    int numPartitions = 6;
+    HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+        IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions));
+    assertEquals(numPartitions, rddData.getNumPartitions());
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 2d24e7dd12..1d56e63fad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -67,14 +67,19 @@ public interface HoodieData<T> extends Serializable {
 
   /**
    * Returns number of objects held in the collection
-   *
+   * <p>
    * NOTE: This is a terminal operation
    */
   long count();
 
+  /**
+   * @return the number of data partitions in the engine-specific representation.
+   */
+  int getNumPartitions();
+
   /**
    * Maps every element in the collection using provided mapping {@code func}.
-   *
+   * <p>
    * This is an intermediate operation
    *
    * @param func serializable map function
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 0be9ec9fa7..b2a503a85b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -175,6 +175,11 @@ public class HoodieListData<T> extends HoodieBaseListData<T> implements HoodieDa
     return super.count();
   }
 
+  @Override
+  public int getNumPartitions() {
+    return 1;
+  }
+
   @Override
   public List<T> collectAsList() {
     return super.collectAsList();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 8da8be1338..ea19f128d1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -64,4 +65,11 @@ class TestHoodieListData {
     assertEquals(3, originalListData.count());
     assertEquals(sourceList, originalListData.collectAsList());
   }
+
+  @Test
+  public void testGetNumPartitions() {
+    HoodieData<Integer> listData = HoodieListData.eager(
+        IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()));
+    assertEquals(1, listData.getNumPartitions());
+  }
 }