You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/10/05 10:59:26 UTC

[hudi] branch master updated: [HUDI-4980] Calculate avg record size using commit only (#6864)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f5d16529d [HUDI-4980] Calculate avg record size using commit only (#6864)
9f5d16529d is described below

commit 9f5d16529d3e347097518bc18caefad5fbe27ac5
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Wed Oct 5 18:59:19 2022 +0800

    [HUDI-4980] Calculate avg record size using commit only (#6864)
    
    Calculate average record size for Spark upsert partitioner
    based on commit instants only. Previously it's based on
    commit and replacecommit, of which the latter may be
    created by clustering which has inaccurately smaller
    average record sizes, which could result in OOM
    due to size underestimation.
---
 .../hudi/table/action/commit/UpsertPartitioner.java      | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index c2f5a43066..134cfd8d2c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -36,6 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -54,6 +56,8 @@ import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+
 /**
  * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
  */
@@ -158,13 +162,17 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
   private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
     // for new inserts, compute buckets depending on how many records we have for each partition
     Set<String> partitionPaths = profile.getPartitionPaths();
-    long averageRecordSize =
-        averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
-            config);
+    /*
+     * NOTE: we only use commit instants to calculate average record size because replacecommit can be
+     * created by clustering, which has smaller average record size, which affects assigning inserts and
+     * may result in OOM by making spark underestimate the actual input record sizes.
+     */
+    long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config);
     LOG.info("AvgRecordSize => " + averageRecordSize);
 
     Map<String, List<SmallFile>> partitionSmallFilesMap =
-        getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
+        getSmallFilesForPartitions(new ArrayList<>(partitionPaths), context);
 
     Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId = getPartitionPathToPendingClusteringFileGroupsId();