You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "boneanxs (via GitHub)" <gi...@apache.org> on 2023/03/01 01:37:34 UTC

[GitHub] [hudi] boneanxs commented on a diff in pull request #7978: [HUDI-5812] Optimize the data size check in HoodieBaseParquetWriter

boneanxs commented on code in PR #7978:
URL: https://github.com/apache/hudi/pull/7978#discussion_r1121011620


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java:
##########
@@ -123,6 +125,45 @@ public void testProperWriting(boolean parquetWriteLegacyFormatEnabled) throws Ex
     });
   }
 
+  @Test
+  @Timeout(value = 60)
+  public void testFileSizeCheck() throws Exception {
+    HoodieWriteConfig.Builder writeConfigBuilder =
+        SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort);
+
+    HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, true);
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+    // Make the written records could exceed maxRowCountForSizeCheck
+    cfg.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(5 * 1024 * 1024));
+    HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport,
+        CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
+        writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
+
+    Path filePath = new Path(basePath + "/check_file_size.parquet");
+
+    try (HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig)) {
+
+      while (writer.canWrite()) {
+        Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, 100,
+            HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, false);
+        StructType schema = inputRows.schema();
+        List<InternalRow> rows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
+        for (InternalRow row : rows) {
+          if (writer.canWrite()) {
+            writer.writeRow(row.getUTF8String(schema.fieldIndex("record_key")), row);
+          } else {
+            break;
+          }
+        }
+      }
+
+      long avgRecordSize = writer.getDataSize() / writer.getWrittenRecordCount();
+      assertTrue(writer.getDataSize() > cfg.getParquetMaxFileSize() - avgRecordSize * 2,
+          "The writer stops write new records while the file doesn't hint the max file size limit");
+    }

Review Comment:
   yea, let me change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org