You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/10/25 03:07:05 UTC
[hudi] branch master updated: [HUDI-4753] more accurate record size estimation for log writing and spillable map (#6632)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 927e5bf4ea [HUDI-4753] more accurate record size estimation for log writing and spillable map (#6632)
927e5bf4ea is described below
commit 927e5bf4ea8c34c8f729eb404d27d2a32a593900
Author: Yuwei XIAO <yw...@gmail.com>
AuthorDate: Tue Oct 25 11:06:57 2022 +0800
[HUDI-4753] more accurate record size estimation for log writing and spillable map (#6632)
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 9 +++++++--
.../util/collection/ExternalSpillableMap.java | 22 ++++++++--------------
2 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8db927d569..418c221c1b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -84,6 +84,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
+ private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
protected final String fileId;
// Buffer for holding records in memory before they are flushed to disk
@@ -559,12 +560,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
* Checks if the number of records have reached the set threshold and then flushes the records to disk.
*/
private void flushToDiskIfRequired(HoodieRecord record, boolean appendDeleteBlocks) {
+ if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)
+ || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
+ averageRecordSize = (long) (averageRecordSize * 0.8 + sizeEstimator.sizeEstimate(record) * 0.2);
+ }
+
// Append if max number of records reached to achieve block size
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update existing value with
// avg of new and old
- LOG.info("AvgRecordSize => " + averageRecordSize);
- averageRecordSize = (averageRecordSize + sizeEstimator.sizeEstimate(record)) / 2;
+ LOG.info("Flush log block to disk, the current avgRecordSize => " + averageRecordSize);
// Delete blocks will be appended after appending all the data blocks.
appendDataAndDeleteBlocks(header, appendDeleteBlocks);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 8d2707d604..218f0d9f16 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.util.collection;
-import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieIOException;
@@ -78,8 +77,6 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
private Long currentInMemoryMapSize;
// An estimate of the size of each payload written to this map
private volatile long estimatedPayloadSize = 0;
- // Flag to determine whether to stop re-estimating payload size
- private boolean shouldEstimatePayloadSize = true;
// Base File Path
private final String baseFilePath;
@@ -202,22 +199,19 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
@Override
public R put(T key, R value) {
+ if (this.currentInMemoryMapSize >= maxInMemorySizeInBytes || inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
+ this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9
+ + (keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 0.1);
+ this.currentInMemoryMapSize = this.inMemoryMap.size() * this.estimatedPayloadSize;
+ LOG.info("Update Estimated Payload size to => " + this.estimatedPayloadSize);
+ }
+
if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
- if (shouldEstimatePayloadSize && estimatedPayloadSize == 0) {
+ if (estimatedPayloadSize == 0) {
// At first, use the sizeEstimate of a record being inserted into the spillable map.
// Note, the converter may over estimate the size of a record in the JVM
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
LOG.info("Estimated Payload size => " + estimatedPayloadSize);
- } else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty()
- && (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) {
- // Re-estimate the size of a record by calculating the size of the entire map containing
- // N entries and then dividing by the number of entries present (N). This helps to get a
- // correct estimation of the size of each record in the JVM.
- long totalMapSize = ObjectSizeCalculator.getObjectSize(inMemoryMap);
- this.currentInMemoryMapSize = totalMapSize;
- this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
- shouldEstimatePayloadSize = false;
- LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
}
if (!inMemoryMap.containsKey(key)) {
// TODO : Add support for adjusting payloadSize for updates to the same key