You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/04 03:36:12 UTC

[hudi] branch master updated: [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter (#7978)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a52bc03d90 [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter (#7978)
2a52bc03d90 is described below

commit 2a52bc03d90d88c518d5ab377dc01e717813522b
Author: Rex(Hui) An <bo...@gmail.com>
AuthorDate: Sat Mar 4 11:36:06 2023 +0800

    [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter (#7978)
    
    Use a exponentially elastic algorithm to probe the .canWrite flag.
---
 .../hudi/io/storage/HoodieBaseParquetWriter.java   |  38 +++++--
 .../io/storage/TestHoodieBaseParquetWriter.java    | 122 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 10 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
index e38b41d422a..a82c26bae92 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -28,6 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
+
 /**
  * Base class of Hudi's custom {@link ParquetWriter} implementations
  *
@@ -36,11 +40,9 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
 
-  private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000;
-
   private final AtomicLong writtenRecordCount = new AtomicLong(0);
   private final long maxFileSize;
-  private long lastCachedDataSize = -1;
+  private long recordCountForNextSizeCheck;
 
   public HoodieBaseParquetWriter(Path file,
                                  HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
@@ -62,17 +64,28 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
     // stream and the actual file size reported by HDFS
     this.maxFileSize = parquetConfig.getMaxFileSize()
         + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
+    this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
   }
 
   public boolean canWrite() {
-    // TODO we can actually do evaluation more accurately:
-    //      if we cache last data size check, since we account for how many records
-    //      were written we can accurately project avg record size, and therefore
-    //      estimate how many more records we can write before cut off
-    if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
-      lastCachedDataSize = getDataSize();
+    long writtenCount = getWrittenRecordCount();
+    if (writtenCount >= recordCountForNextSizeCheck) {
+      long dataSize = getDataSize();
+      // In some very extreme cases, like all records are same value, then it's possible
+      // the dataSize is much lower than the writtenRecordCount(high compression ratio),
+      // causing avgRecordSize to 0, we'll force the avgRecordSize to 1 for such cases.
+      long avgRecordSize = Math.max(dataSize / writtenCount, 1);
+      // Follow the parquet block size check logic here, return false
+      // if it is within ~2 records of the limit
+      if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+        return false;
+      }
+      recordCountForNextSizeCheck = writtenCount + Math.min(
+          // Do check it in the halfway
+          Math.max(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2),
+          DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
     }
-    return lastCachedDataSize < maxFileSize;
+    return true;
   }
 
   @Override
@@ -84,4 +97,9 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
   protected long getWrittenRecordCount() {
     return writtenRecordCount.get();
   }
+
+  @VisibleForTesting
+  protected long getRecordCountForNextSizeCheck() {
+    return recordCountForNextSizeCheck;
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
new file mode 100644
index 00000000000..36bd0b5d4af
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieBaseParquetWriter {
+
+  private static class MockHoodieParquetWriter extends HoodieBaseParquetWriter<IndexedRecord> {
+
+    long writtenRecordCount = 0L;
+    long currentDataSize = 0L;
+
+    public MockHoodieParquetWriter(Path file, HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig) throws IOException {
+      super(file, (HoodieParquetConfig) parquetConfig);
+    }
+
+    @Override
+    public long getDataSize() {
+      return currentDataSize;
+    }
+
+    @Override
+    public long getWrittenRecordCount() {
+      return writtenRecordCount;
+    }
+
+    public void setWrittenRecordCount(long writtenCount) {
+      this.writtenRecordCount = writtenCount;
+    }
+
+    public void setCurrentDataSize(long currentDataSize) {
+      this.currentDataSize = currentDataSize;
+    }
+  }
+
+  @TempDir
+  public java.nio.file.Path tempDir;
+
+  @Test
+  public void testCanWrite() throws IOException {
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000,
+        BloomFilterTypeCode.DYNAMIC_V0.name());
+    Configuration hadoopConf = new Configuration();
+
+    Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
+        schema, Option.of(filter));
+
+    long maxFileSize = 2 * 1024 * 1024;
+    HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
+        new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
+            ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true);
+
+    Path filePath = new Path(new Path(tempDir.toUri()), "test_fileSize.parquet");
+    try (MockHoodieParquetWriter writer = new MockHoodieParquetWriter(filePath, parquetConfig)) {
+      // doesn't start write, should return true
+      assertTrue(writer.canWrite());
+      // recordCountForNextSizeCheck should be DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK
+      assertEquals(writer.getRecordCountForNextSizeCheck(), DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK);
+      // 10 bytes per record
+      writer.setCurrentDataSize(1000);
+      writer.setWrittenRecordCount(writer.getRecordCountForNextSizeCheck());
+
+      assertTrue(writer.canWrite());
+      // Should check it with more DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK records
+      assertEquals(writer.getRecordCountForNextSizeCheck(), writer.getWrittenRecordCount() + DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
+
+      // 80 bytes per record
+      writer.setCurrentDataSize(808000);
+      writer.setWrittenRecordCount(writer.getRecordCountForNextSizeCheck());
+      assertTrue(writer.canWrite());
+      // Should check it half way, not DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK
+      long avgRecordSize = writer.getDataSize() / writer.getWrittenRecordCount();
+      long recordsDelta = (maxFileSize / avgRecordSize - writer.getWrittenRecordCount()) / 2;
+      assertEquals(writer.getRecordCountForNextSizeCheck(), writer.getWrittenRecordCount() + recordsDelta);
+
+      writer.setCurrentDataSize(maxFileSize);
+      writer.setWrittenRecordCount(writer.getRecordCountForNextSizeCheck());
+      assertFalse(writer.canWrite(),
+          "The writer stops write new records while the file doesn't reach the max file size limit");
+    }
+  }
+}