You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/02/21 17:39:03 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7978: [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter

alexeykudinkin commented on code in PR #7978:
URL: https://github.com/apache/hudi/pull/7978#discussion_r1113378703


##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java:
##########
@@ -56,23 +55,34 @@ public HoodieBaseParquetWriter(Path file,
         DEFAULT_WRITER_VERSION,
         FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
 
+    this.parquetConfig = parquetConfig;
+
     // We cannot accurately measure the snappy compressed output file size. We are choosing a
     // conservative 10%
     // TODO - compute this compression ratio dynamically by looking at the bytes written to the
     // stream and the actual file size reported by HDFS
     this.maxFileSize = parquetConfig.getMaxFileSize()
         + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
+
+    this.recordNumForNextCheck = parquetConfig.getMinRowCountForSizeCheck();
   }
 
   public boolean canWrite() {
-    // TODO we can actually do evaluation more accurately:
-    //      if we cache last data size check, since we account for how many records
-    //      were written we can accurately project avg record size, and therefore
-    //      estimate how many more records we can write before cut off
-    if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
-      lastCachedDataSize = getDataSize();
+    if (getWrittenRecordCount() >= recordNumForNextCheck) {
+      long dataSize = getDataSize();
+      long avgRecordSize = dataSize / getWrittenRecordCount();
+      // Follow the parquet block size check logic here, return false
+      // if it is within ~2 records of the limit
+      if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+        return false;
+      }
+      recordNumForNextCheck = Math.min(Math.max(
+          parquetConfig.getMinRowCountForSizeCheck(),
+          // Do check it in the halfway
+          (recordNumForNextCheck + maxFileSize / avgRecordSize) / 2),

Review Comment:
   This check will be a log2 of the # of records in the file. We don't actually need that -- we should keep the frequency of the checks constant



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java:
##########
@@ -56,23 +55,34 @@ public HoodieBaseParquetWriter(Path file,
         DEFAULT_WRITER_VERSION,
         FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
 
+    this.parquetConfig = parquetConfig;
+
     // We cannot accurately measure the snappy compressed output file size. We are choosing a
     // conservative 10%
     // TODO - compute this compression ratio dynamically by looking at the bytes written to the
     // stream and the actual file size reported by HDFS
     this.maxFileSize = parquetConfig.getMaxFileSize()
         + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
+
+    this.recordNumForNextCheck = parquetConfig.getMinRowCountForSizeCheck();
   }
 
   public boolean canWrite() {
-    // TODO we can actually do evaluation more accurately:
-    //      if we cache last data size check, since we account for how many records
-    //      were written we can accurately project avg record size, and therefore
-    //      estimate how many more records we can write before cut off
-    if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
-      lastCachedDataSize = getDataSize();
+    if (getWrittenRecordCount() >= recordNumForNextCheck) {
+      long dataSize = getDataSize();
+      long avgRecordSize = dataSize / getWrittenRecordCount();
+      // Follow the parquet block size check logic here, return false
+      // if it is within ~2 records of the limit
+      if (dataSize > (maxFileSize - avgRecordSize * 2)) {
+        return false;
+      }
+      recordNumForNextCheck = Math.min(Math.max(
+          parquetConfig.getMinRowCountForSizeCheck(),

Review Comment:
   This value is relative, while next one (in max) is absolute. Let's make sure these are aligned



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java:
##########
@@ -52,6 +52,16 @@ public class HoodieStorageConfig extends HoodieConfig {
       .withDocumentation("Parquet page size in bytes. Page is the unit of read within a parquet file. "
           + "Within a block, pages are compressed separately.");
 
+  public static final ConfigProperty<Long> PARQUET_MIN_ROW_COUNT_FOR_SIZE_CHECK = ConfigProperty

Review Comment:
   I don't think we should be exposing this configs to the user:
    - These are extremely low-level and 
    - Instead we should just set sensible defaults for these
   
   On top of that nobody should actually be controlling these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org