You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2021/04/20 12:52:20 UTC

[GitHub] [parquet-mr] Miksu82 commented on a change in pull request #895: Parquet-2030: Expose page size row check configurations to ParquetWri…

Miksu82 commented on a change in pull request #895:
URL: https://github.com/apache/parquet-mr/pull/895#discussion_r616653104



##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
##########
@@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws IOException {
         LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
     }
   }
+
+  @Test
+  public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(3)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 1);
+  }
+
+  @Test
+  public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(4)
+      .withMaxRowCountForPageSizeCheck(2)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasNotFlushed(file);
+  }
+
+  @Test
+  public void testParquetFileIsFlushedAfterEachRecord() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(1)
+      .withMaxRowCountForPageSizeCheck(4)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 3, 3);
+  }
+
+  @Test
+  public void testParquetFileNotFlushingAllRows() throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    TestOutputFile file = new TestOutputFile();
+    ParquetWriter<Group> writer = getParquetWriterBuilder(schema, file)
+      .withMinRowCountForPageSizeCheck(2)
+      .withMaxRowCountForPageSizeCheck(3)
+      .build();
+
+    writeRecords(writer, schema);
+
+    assertHasFlushed(file, 2, 1);
+  }
+
+  private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType schema,
+                                                               TestOutputFile file) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+    return ExampleParquetWriter.builder(file)
+      .withConf(conf)
+      // Set row group size to 1, to make sure we flush every time
+      // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+      .withRowGroupSize(1);
+  }
+
+  private void writeRecords(ParquetWriter<Group> writer, MessageType schema) throws IOException {
+    SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+    writer.write(factory.newGroup().append("str", "foo"));
+    writer.write(factory.newGroup().append("str", "bar"));
+    writer.write(factory.newGroup().append("str", "baz"));
+  }
+
+  private void assertHasNotFlushed(TestOutputFile file) {
+    int emptyFileLength = ParquetFileWriter.MAGIC.length;
+    assertEquals(emptyFileLength, file.stats.getBytesWritten());
+  }
+
+  private void assertHasFlushed(TestOutputFile file, int numWrites, int numFlushes) {

Review comment:
       Hmm..no. Everytime a record is flushed it writes the actual record + the metadata to indicate that a row group has ended (the writer is created by defining `rowGroupSize=1`).
   
   It works like this
   
   ```
   write("foo") // writes 7B
   flush() // writes 23B
   write("bar") // writes 7B
   flush() // writes 23B
   
   // Totally has written:  60B
   ```
   
   but
   
   ```
   write("foo") // writes 7B
   write("bar") // writes 7B
   flush()  // writes 23B
   
   // Totally has written: 37
   ```
   
   And by defining the `page.size.row.check.min` and `page.size.row.check.max` I can control how often the `InternalParquetRecordWriter` flushes the records, and I can assert that by calculating the number of bytes written in total.
   
   But you are absolutely right, that this is complicated and in the wrong place so I'll remove these test and just assert that the properties have been set.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org