You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:33 UTC
[hudi] 06/20: [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b0a0912cfaf869234453e89d0d0ccf60c8dde98b
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Jan 11 09:52:42 2023 +0800
[HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)
[HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy
(cherry picked from commit b9c32fb943595605a1026d568f0c1d084a3751c3)
---
.../SparkSizeBasedClusteringPlanStrategy.java | 14 +++-
.../SparkSortAndSizeExecutionStrategy.java | 6 +-
.../TestSparkSizeBasedClusteringPlanStrategy.java | 94 ++++++++++++++++++++++
.../hudi/functional/TestLayoutOptimization.scala | 3 +-
.../hudi/procedure/TestClusteringProcedure.scala | 2 +
5 files changed, 112 insertions(+), 7 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 6629569d096..07e58c05fa4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -72,11 +72,19 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
+
+ // Sort fileSlices before dividing, which makes dividing more compact
+ List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+ sortedFileSlices.sort((o1, o2) -> (int)
+ ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())
+ - (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
long totalSizeSoFar = 0;
- for (FileSlice currentSlice : fileSlices) {
+ for (FileSlice currentSlice : sortedFileSlices) {
+ long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
- if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+ if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
@@ -88,7 +96,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
// Add to the current file-group
currentGroup.add(currentSlice);
// assume each file group size is ~= parquet.max.file.size
- totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
+ totalSizeSoFar += currentSize;
}
if (!currentGroup.isEmpty()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 35c8f288bc8..e33b2ef7960 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -68,7 +68,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
- newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+ newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
@@ -88,7 +88,9 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
- newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+
+ newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
+
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(),
newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 00000000000..99cafdceea4
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TestSparkSizeBasedClusteringPlanStrategy {
+
+ @Mock
+ HoodieSparkCopyOnWriteTable table;
+ @Mock
+ HoodieSparkEngineContext context;
+
+ @Test
+ public void testBuildClusteringGroup() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+ .withClusteringMaxBytesInGroup(2000)
+ .withClusteringTargetFileMaxBytes(1000)
+ .withClusteringPlanSmallFileLimit(500)
+ .build())
+ .build();
+
+ SparkSizeBasedClusteringPlanStrategy planStrategy = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+ ArrayList<FileSlice> fileSlices = new ArrayList<>();
+ fileSlices.add(createFileSlice(200));
+ fileSlices.add(createFileSlice(200));
+ fileSlices.add(createFileSlice(300));
+ fileSlices.add(createFileSlice(300));
+ fileSlices.add(createFileSlice(400));
+ fileSlices.add(createFileSlice(400));
+ fileSlices.add(createFileSlice(400));
+ fileSlices.add(createFileSlice(400));
+
+ Stream<HoodieClusteringGroup> clusteringGroupStream = planStrategy.buildClusteringGroupsForPartition("p0", fileSlices);
+ List<HoodieClusteringGroup> clusteringGroups = clusteringGroupStream.collect(Collectors.toList());
+
+ // FileSlices will be divided into two clusteringGroups
+ Assertions.assertEquals(2, clusteringGroups.size());
+
+ // First group: 400, 400, 400, 400, 300, and they will be merged into 2 files
+ Assertions.assertEquals(5, clusteringGroups.get(0).getSlices().size());
+ Assertions.assertEquals(2, clusteringGroups.get(0).getNumOutputFileGroups());
+
+ // Second group: 300, 200, 200, and they will be merged into 1 file
+ Assertions.assertEquals(3, clusteringGroups.get(1).getSlices().size());
+ Assertions.assertEquals(1, clusteringGroups.get(1).getNumOutputFileGroups());
+ }
+
+ private FileSlice createFileSlice(long baseFileSize) {
+ String fileId = FSUtils.createNewFileId(FSUtils.createNewFileIdPfx(), 0);
+ FileSlice fs = new FileSlice("p0", "001", fileId);
+ HoodieBaseFile f = new HoodieBaseFile(fileId);
+ f.setFileLen(baseFileSize);
+ fs.setBaseFile(f);
+ return fs;
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
index 7d54959d2ea..b87c813a0fa 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
@@ -111,8 +111,7 @@ class TestLayoutOptimization extends HoodieClientTestBase {
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
- .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
- .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
+ .option("hoodie.clustering.plan.strategy.max.bytes.per.group", "2147483648")
.option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key(), clusteringAsRow)
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), layoutOptimizationStrategy)
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD.key(), curveCompositionStrategy)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index c10c56f3350..c9fbb0f5d9f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -572,6 +572,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
| hoodie.parquet.max.file.size=${avgSize * avgCount},
| hoodie.parquet.small.file.limit=0,
| hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+ | hoodie.clustering.plan.strategy.max.bytes.per.group=${4 * avgSize * avgCount},
| hoodie.metadata.enable=true,
| hoodie.metadata.index.column.stats.enable=true
|")""".stripMargin)
@@ -588,6 +589,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
| hoodie.parquet.max.file.size=${avgSize * avgCount},
| hoodie.parquet.small.file.limit=0,
| hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+ | hoodie.clustering.plan.strategy.max.bytes.per.group=${4 * avgSize * avgCount},
| hoodie.metadata.enable=true,
| hoodie.metadata.index.column.stats.enable=true
|")""".stripMargin)