You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:54 UTC
[hudi] 13/17: [HUDI-4924] Auto-tune dedup parallelism (#6802)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b8b0256095a649d4cdfe5592295df153f6746ea4
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Sep 28 08:03:41 2022 -0700
[HUDI-4924] Auto-tune dedup parallelism (#6802)
---
.../table/action/commit/HoodieWriteHelper.java | 5 ++-
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 5 +++
.../TestHoodieClientOnCopyOnWriteStorage.java | 14 ++++++--
.../org/apache/hudi/data/TestHoodieJavaRDD.java | 40 ++++++++++++++++++++++
.../org/apache/hudi/common/data/HoodieData.java | 9 +++--
.../apache/hudi/common/data/HoodieListData.java | 5 +++
.../hudi/common/data/TestHoodieListData.java | 8 +++++
7 files changed, 80 insertions(+), 6 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 80762b1de8..b359550e8a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -54,6 +54,9 @@ public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWri
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
+ // Auto-tunes the parallelism for reduce transformation based on the number of data partitions
+ // in engine-specific representation
+ int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism));
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
@@ -65,7 +68,7 @@ public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWri
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
return new HoodieAvroRecord<>(reducedKey, reducedData);
- }, parallelism).map(Pair::getRight);
+ }, reduceParallelism).map(Pair::getRight);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 3964fa2d6b..ed9613bc15 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -102,6 +102,11 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
return rddData.count();
}
+ @Override
+ public int getNumPartitions() {
+ return rddData.getNumPartitions();
+ }
+
@Override
public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
return HoodieJavaRDD.of(rddData.map(func::apply));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 4b7a0139dd..8aafbcd9f6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -460,11 +460,17 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
+ HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .combineInput(true, true);
+ addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
- List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+ int dedupParallelism = records.getNumPartitions() + 100;
+ HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism);
+ List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
+ assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);
@@ -472,13 +478,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
- dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+ dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism);
+ dedupedRecs = dedupedRecsRdd.collectAsList();
+ assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
// Perform write-action and check
JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
- HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.combineInput(true, true);
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
new file mode 100644
index 0000000000..7595888304
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieJavaRDD extends HoodieClientTestBase {
+ @Test
+ public void testGetNumPartitions() {
+ int numPartitions = 6;
+ HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+ IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions));
+ assertEquals(numPartitions, rddData.getNumPartitions());
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 2d24e7dd12..1d56e63fad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -67,14 +67,19 @@ public interface HoodieData<T> extends Serializable {
/**
* Returns number of objects held in the collection
- *
+ * <p>
* NOTE: This is a terminal operation
*/
long count();
+ /**
+ * @return the number of data partitions in the engine-specific representation.
+ */
+ int getNumPartitions();
+
/**
* Maps every element in the collection using provided mapping {@code func}.
- *
+ * <p>
* This is an intermediate operation
*
* @param func serializable map function
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 0be9ec9fa7..b2a503a85b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -175,6 +175,11 @@ public class HoodieListData<T> extends HoodieBaseListData<T> implements HoodieDa
return super.count();
}
+ @Override
+ public int getNumPartitions() {
+ return 1;
+ }
+
@Override
public List<T> collectAsList() {
return super.collectAsList();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 8da8be1338..ea19f128d1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -64,4 +65,11 @@ class TestHoodieListData {
assertEquals(3, originalListData.count());
assertEquals(sourceList, originalListData.collectAsList());
}
+
+ @Test
+ public void testGetNumPartitions() {
+ HoodieData<Integer> listData = HoodieListData.eager(
+ IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()));
+ assertEquals(1, listData.getNumPartitions());
+ }
}