You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2021/04/20 11:02:32 UTC

[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #895: Parquet-2030: Expose page size row check configurations to ParquetWri…

gszadovszky commented on a change in pull request #895:
URL: https://github.com/apache/parquet-mr/pull/895#discussion_r616573508



##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
##########
@@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws IOException {
         LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
     }
   }
+
+  @Test
+  public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(3)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 1);
+  }
+
+  @Test
+  public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(2)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedAfterEachRecord() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(1)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 3);
+  }
+
+  @Test
+  public void testParquetFileNotFlushingAllRows() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(2)
+      .withMaxRowCountForPageSizeCheck(3)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 2, 1);
+  }
+
+  private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType schema,
+                                                               TestOutputFile file) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+    return ExampleParquetWriter.builder(file)
+      .withConf(conf)
+      // Set row group size to 1, to make sure we flush every time
+      // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+      .withRowGroupSize(1);
+  }
+
+  private void writeRecords(ParquetWriter<Group> writer, MessageType schema) throws IOException {
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    writer.write(factory.newGroup().append("str", "foo"));
+    writer.write(factory.newGroup().append("str", "bar"));
+    writer.write(factory.newGroup().append("str", "baz"));
+  }
+
+  private void assertHasNotFlushed(TestOutputFile file) {
+    int emptyFileLength = ParquetFileWriter.MAGIC.length;
+    assertEquals(emptyFileLength, file.stats.getBytesWritten());
+  }
+
+  private void assertHasFlushed(TestOutputFile file, int numWrites, int numFlushes) {

Review comment:
       The method name suggests that it validates if the file was flushed `numFlushes` times. But it is executed only once after all writes so it only validates that it flushed once and all data is written.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
##########
@@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws IOException {
         LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
     }
   }
+
+  @Test
+  public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(3)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 1);
+  }
+
+  @Test
+  public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(2)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedAfterEachRecord() throws IOException {

Review comment:
       This test does not validate each flush but only one at the end. See my last comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org