You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/04 03:36:12 UTC
[hudi] branch master updated: [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter (#7978)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2a52bc03d90 [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter (#7978)
2a52bc03d90 is described below
commit 2a52bc03d90d88c518d5ab377dc01e717813522b
Author: Rex(Hui) An <bo...@gmail.com>
AuthorDate: Sat Mar 4 11:36:06 2023 +0800
[HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter (#7978)
Use a exponentially elastic algorithm to probe the .canWrite flag.
---
.../hudi/io/storage/HoodieBaseParquetWriter.java | 38 +++++--
.../io/storage/TestHoodieBaseParquetWriter.java | 122 +++++++++++++++++++++
2 files changed, 150 insertions(+), 10 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
index e38b41d422a..a82c26bae92 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
@@ -28,6 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
+
/**
* Base class of Hudi's custom {@link ParquetWriter} implementations
*
@@ -36,11 +40,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
- private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000;
-
private final AtomicLong writtenRecordCount = new AtomicLong(0);
private final long maxFileSize;
- private long lastCachedDataSize = -1;
+ private long recordCountForNextSizeCheck;
public HoodieBaseParquetWriter(Path file,
HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
@@ -62,17 +64,28 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
+ this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
}
public boolean canWrite() {
- // TODO we can actually do evaluation more accurately:
- // if we cache last data size check, since we account for how many records
- // were written we can accurately project avg record size, and therefore
- // estimate how many more records we can write before cut off
- if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
- lastCachedDataSize = getDataSize();
+ long writtenCount = getWrittenRecordCount();
+ if (writtenCount >= recordCountForNextSizeCheck) {
+ long dataSize = getDataSize();
+ // In some very extreme cases, like all records are same value, then it's possible
+ // the dataSize is much lower than the writtenRecordCount(high compression ratio),
+ // causing avgRecordSize to 0, we'll force the avgRecordSize to 1 for such cases.
+ long avgRecordSize = Math.max(dataSize / writtenCount, 1);
+ // Follow the parquet block size check logic here, return false
+ // if it is within ~2 records of the limit
+ if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+ return false;
+ }
+ recordCountForNextSizeCheck = writtenCount + Math.min(
+ // Do check it in the halfway
+ Math.max(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2),
+ DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
}
- return lastCachedDataSize < maxFileSize;
+ return true;
}
@Override
@@ -84,4 +97,9 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
protected long getWrittenRecordCount() {
return writtenRecordCount.get();
}
+
+ @VisibleForTesting
+ protected long getRecordCountForNextSizeCheck() {
+ return recordCountForNextSizeCheck;
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
new file mode 100644
index 00000000000..36bd0b5d4af
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieBaseParquetWriter {
+
+ private static class MockHoodieParquetWriter extends HoodieBaseParquetWriter<IndexedRecord> {
+
+ long writtenRecordCount = 0L;
+ long currentDataSize = 0L;
+
+ public MockHoodieParquetWriter(Path file, HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig) throws IOException {
+ super(file, (HoodieParquetConfig) parquetConfig);
+ }
+
+ @Override
+ public long getDataSize() {
+ return currentDataSize;
+ }
+
+ @Override
+ public long getWrittenRecordCount() {
+ return writtenRecordCount;
+ }
+
+ public void setWrittenRecordCount(long writtenCount) {
+ this.writtenRecordCount = writtenCount;
+ }
+
+ public void setCurrentDataSize(long currentDataSize) {
+ this.currentDataSize = currentDataSize;
+ }
+ }
+
+ @TempDir
+ public java.nio.file.Path tempDir;
+
+ @Test
+ public void testCanWrite() throws IOException {
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000,
+ BloomFilterTypeCode.DYNAMIC_V0.name());
+ Configuration hadoopConf = new Configuration();
+
+ Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
+ schema, Option.of(filter));
+
+ long maxFileSize = 2 * 1024 * 1024;
+ HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
+ new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true);
+
+ Path filePath = new Path(new Path(tempDir.toUri()), "test_fileSize.parquet");
+ try (MockHoodieParquetWriter writer = new MockHoodieParquetWriter(filePath, parquetConfig)) {
+ // doesn't start write, should return true
+ assertTrue(writer.canWrite());
+ // recordCountForNextSizeCheck should be DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK
+ assertEquals(writer.getRecordCountForNextSizeCheck(), DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK);
+ // 10 bytes per record
+ writer.setCurrentDataSize(1000);
+ writer.setWrittenRecordCount(writer.getRecordCountForNextSizeCheck());
+
+ assertTrue(writer.canWrite());
+ // Should check it with more DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK records
+ assertEquals(writer.getRecordCountForNextSizeCheck(), writer.getWrittenRecordCount() + DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
+
+ // 80 bytes per record
+ writer.setCurrentDataSize(808000);
+ writer.setWrittenRecordCount(writer.getRecordCountForNextSizeCheck());
+ assertTrue(writer.canWrite());
+ // Should check it half way, not DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK
+ long avgRecordSize = writer.getDataSize() / writer.getWrittenRecordCount();
+ long recordsDelta = (maxFileSize / avgRecordSize - writer.getWrittenRecordCount()) / 2;
+ assertEquals(writer.getRecordCountForNextSizeCheck(), writer.getWrittenRecordCount() + recordsDelta);
+
+ writer.setCurrentDataSize(maxFileSize);
+ writer.setWrittenRecordCount(writer.getRecordCountForNextSizeCheck());
+ assertFalse(writer.canWrite(),
+ "The writer stops write new records while the file doesn't reach the max file size limit");
+ }
+ }
+}