You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/10/25 03:07:05 UTC

[hudi] branch master updated: [HUDI-4753] more accurate record size estimation for log writing and spillable map (#6632)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 927e5bf4ea [HUDI-4753] more accurate record size estimation for log writing and spillable map (#6632)
927e5bf4ea is described below

commit 927e5bf4ea8c34c8f729eb404d27d2a32a593900
Author: Yuwei XIAO <yw...@gmail.com>
AuthorDate: Tue Oct 25 11:06:57 2022 +0800

    [HUDI-4753] more accurate record size estimation for log writing and spillable map (#6632)
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  9 +++++++--
 .../util/collection/ExternalSpillableMap.java      | 22 ++++++++--------------
 2 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8db927d569..418c221c1b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -84,6 +84,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
   private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
   // This acts as the sequenceID for records written
   private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
+  private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
 
   protected final String fileId;
   // Buffer for holding records in memory before they are flushed to disk
@@ -559,12 +560,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
    * Checks if the number of records have reached the set threshold and then flushes the records to disk.
    */
   private void flushToDiskIfRequired(HoodieRecord record, boolean appendDeleteBlocks) {
+    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize) 
+        || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
+      averageRecordSize = (long) (averageRecordSize * 0.8 + sizeEstimator.sizeEstimate(record) * 0.2);
+    }
+
     // Append if max number of records reached to achieve block size
     if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
       // Recompute averageRecordSize before writing a new block and update existing value with
       // avg of new and old
-      LOG.info("AvgRecordSize => " + averageRecordSize);
-      averageRecordSize = (averageRecordSize + sizeEstimator.sizeEstimate(record)) / 2;
+      LOG.info("Flush log block to disk, the current avgRecordSize => " + averageRecordSize);
       // Delete blocks will be appended after appending all the data blocks.
       appendDataAndDeleteBlocks(header, appendDeleteBlocks);
       estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 8d2707d604..218f0d9f16 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.util.collection;
 
-import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -78,8 +77,6 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
   private Long currentInMemoryMapSize;
   // An estimate of the size of each payload written to this map
   private volatile long estimatedPayloadSize = 0;
-  // Flag to determine whether to stop re-estimating payload size
-  private boolean shouldEstimatePayloadSize = true;
   // Base File Path
   private final String baseFilePath;
 
@@ -202,22 +199,19 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
 
   @Override
   public R put(T key, R value) {
+    if (this.currentInMemoryMapSize >= maxInMemorySizeInBytes || inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
+      this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9 
+        + (keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 0.1);
+      this.currentInMemoryMapSize = this.inMemoryMap.size() * this.estimatedPayloadSize;
+      LOG.info("Update Estimated Payload size to => " + this.estimatedPayloadSize);
+    }
+
     if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
-      if (shouldEstimatePayloadSize && estimatedPayloadSize == 0) {
+      if (estimatedPayloadSize == 0) {
         // At first, use the sizeEstimate of a record being inserted into the spillable map.
         // Note, the converter may over estimate the size of a record in the JVM
         this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
         LOG.info("Estimated Payload size => " + estimatedPayloadSize);
-      } else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty()
-          && (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) {
-        // Re-estimate the size of a record by calculating the size of the entire map containing
-        // N entries and then dividing by the number of entries present (N). This helps to get a
-        // correct estimation of the size of each record in the JVM.
-        long totalMapSize = ObjectSizeCalculator.getObjectSize(inMemoryMap);
-        this.currentInMemoryMapSize = totalMapSize;
-        this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
-        shouldEstimatePayloadSize = false;
-        LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
       }
       if (!inMemoryMap.containsKey(key)) {
         // TODO : Add support for adjusting payloadSize for updates to the same key