You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:33 UTC

[hudi] 06/20: [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b0a0912cfaf869234453e89d0d0ccf60c8dde98b
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Jan 11 09:52:42 2023 +0800

    [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)
    
    [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy
    
    (cherry picked from commit b9c32fb943595605a1026d568f0c1d084a3751c3)
---
 .../SparkSizeBasedClusteringPlanStrategy.java      | 14 +++-
 .../SparkSortAndSizeExecutionStrategy.java         |  6 +-
 .../TestSparkSizeBasedClusteringPlanStrategy.java  | 94 ++++++++++++++++++++++
 .../hudi/functional/TestLayoutOptimization.scala   |  3 +-
 .../hudi/procedure/TestClusteringProcedure.scala   |  2 +
 5 files changed, 112 insertions(+), 7 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 6629569d096..07e58c05fa4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -72,11 +72,19 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
 
     List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
     List<FileSlice> currentGroup = new ArrayList<>();
+
+    // Sort fileSlices before dividing, which makes dividing more compact
+    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+    sortedFileSlices.sort((o1, o2) -> (int)
+        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())
+            - (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
     long totalSizeSoFar = 0;
 
-    for (FileSlice currentSlice : fileSlices) {
+    for (FileSlice currentSlice : sortedFileSlices) {
+      long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
       // check if max size is reached and create new group, if needed.
-      if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+      if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
         int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
         LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
             + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
@@ -88,7 +96,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
       // Add to the current file-group
       currentGroup.add(currentSlice);
       // assume each file group size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
+      totalSizeSoFar += currentSize;
     }
 
     if (!currentGroup.isEmpty()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 35c8f288bc8..e33b2ef7960 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -68,7 +68,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
         .withBulkInsertParallelism(numOutputGroups)
         .withProps(getWriteConfig().getProps()).build();
 
-    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
         getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
@@ -88,7 +88,9 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
     HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
         .withBulkInsertParallelism(numOutputGroups)
         .withProps(getWriteConfig().getProps()).build();
-    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+
+    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
+
     return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(),
         newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 00000000000..99cafdceea4
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TestSparkSizeBasedClusteringPlanStrategy {
+
+  @Mock
+  HoodieSparkCopyOnWriteTable table;
+  @Mock
+  HoodieSparkEngineContext context;
+
+  @Test
+  public void testBuildClusteringGroup() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(2000)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(500)
+            .build())
+        .build();
+
+    SparkSizeBasedClusteringPlanStrategy planStrategy = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+    ArrayList<FileSlice> fileSlices = new ArrayList<>();
+    fileSlices.add(createFileSlice(200));
+    fileSlices.add(createFileSlice(200));
+    fileSlices.add(createFileSlice(300));
+    fileSlices.add(createFileSlice(300));
+    fileSlices.add(createFileSlice(400));
+    fileSlices.add(createFileSlice(400));
+    fileSlices.add(createFileSlice(400));
+    fileSlices.add(createFileSlice(400));
+
+    Stream<HoodieClusteringGroup> clusteringGroupStream = planStrategy.buildClusteringGroupsForPartition("p0", fileSlices);
+    List<HoodieClusteringGroup> clusteringGroups = clusteringGroupStream.collect(Collectors.toList());
+
+    // FileSlices will be divided into two clusteringGroups
+    Assertions.assertEquals(2, clusteringGroups.size());
+
+    // First group: 400, 400, 400, 400, 300, and they will be merged into 2 files
+    Assertions.assertEquals(5, clusteringGroups.get(0).getSlices().size());
+    Assertions.assertEquals(2, clusteringGroups.get(0).getNumOutputFileGroups());
+
+    // Second group: 300, 200, 200, and they will be merged into 1 file
+    Assertions.assertEquals(3, clusteringGroups.get(1).getSlices().size());
+    Assertions.assertEquals(1, clusteringGroups.get(1).getNumOutputFileGroups());
+  }
+
+  private FileSlice createFileSlice(long baseFileSize) {
+    String fileId = FSUtils.createNewFileId(FSUtils.createNewFileIdPfx(), 0);
+    FileSlice fs = new FileSlice("p0", "001", fileId);
+    HoodieBaseFile f = new HoodieBaseFile(fileId);
+    f.setFileLen(baseFileSize);
+    fs.setBaseFile(f);
+    return fs;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
index 7d54959d2ea..b87c813a0fa 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
@@ -111,8 +111,7 @@ class TestLayoutOptimization extends HoodieClientTestBase {
       .option("hoodie.clustering.inline.max.commits", "1")
       .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
       .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
-      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
-      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
+      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", "2147483648")
       .option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key(), clusteringAsRow)
       .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), layoutOptimizationStrategy)
       .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD.key(), curveCompositionStrategy)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index c10c56f3350..c9fbb0f5d9f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -572,6 +572,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
            | hoodie.parquet.max.file.size=${avgSize * avgCount},
            | hoodie.parquet.small.file.limit=0,
            | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.clustering.plan.strategy.max.bytes.per.group=${4 * avgSize * avgCount},
            | hoodie.metadata.enable=true,
            | hoodie.metadata.index.column.stats.enable=true
            |")""".stripMargin)
@@ -588,6 +589,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
            | hoodie.parquet.max.file.size=${avgSize * avgCount},
            | hoodie.parquet.small.file.limit=0,
            | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.clustering.plan.strategy.max.bytes.per.group=${4 * avgSize * avgCount},
            | hoodie.metadata.enable=true,
            | hoodie.metadata.index.column.stats.enable=true
            |")""".stripMargin)