You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/04/22 07:32:25 UTC

[parquet-mr] branch master updated: PARQUET-2030: Expose page size row check configurations to ParquetWriter.Builder (#895)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e40e69  PARQUET-2030: Expose page size row check configurations to ParquetWriter.Builder (#895)
8e40e69 is described below

commit 8e40e69ce3e02cf4f8f0c624ff9e6173509961ee
Author: Miksu82 <22...@users.noreply.github.com>
AuthorDate: Thu Apr 22 10:32:13 2021 +0300

    PARQUET-2030: Expose page size row check configurations to ParquetWriter.Builder (#895)
---
 .../org/apache/parquet/hadoop/ParquetWriter.java   | 22 ++++++++++
 .../apache/parquet/hadoop/TestParquetWriter.java   | 47 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index e1afaca..696fec3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -616,6 +616,28 @@ public class ParquetWriter<T> implements Closeable {
     }
 
     /**
+     * Sets the minimum number of rows to write before a page size check is done.
+     *
+     * @param min writes at least `min` rows before invoking a page size check
+     * @return this builder for method chaining
+     */
+    public SELF withMinRowCountForPageSizeCheck(int min) {
+      encodingPropsBuilder.withMinRowCountForPageSizeCheck(min);
+      return self();
+    }
+
+    /**
+     * Sets the maximum number of rows to write before a page size check is done.
+     *
+     * @param max makes a page size check after `max` rows have been written
+     * @return this builder for method chaining
+     */
+    public SELF withMaxRowCountForPageSizeCheck(int max) {
+      encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max);
+      return self();
+    }
+
+    /**
      * Set a property that will be available to the read path. For writers that use a Hadoop
      * configuration, this is the recommended way to add configuration values.
      *
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index de53e96..9e9b735 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Callable;
 import net.openhft.hashing.LongHashFunction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
@@ -279,4 +280,50 @@ public class TestParquetWriter {
         LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
     }
   }
+
+  @Test
+  public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
+    testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+                                  ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
+                                  1);
+    testParquetFileNumberOfBlocks(1, 1, 3);
+
+  }
+
+  private void testParquetFileNumberOfBlocks(int minRowCountForPageSizeCheck,
+                                             int maxRowCountForPageSizeCheck,
+                                             int expectedNumberOfBlocks) throws IOException {
+    MessageType schema = Types
+      .buildMessage()
+      .required(BINARY)
+      .as(stringType())
+      .named("str")
+      .named("msg");
+
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    File file = temp.newFile();
+    temp.delete();
+    Path path = new Path(file.getAbsolutePath());
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withConf(conf)
+      // Set row group size to 1, to make sure we flush every time
+      // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+      .withRowGroupSize(1)
+      .withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)
+      .withMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck)
+      .build()) {
+
+      SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+      writer.write(factory.newGroup().append("str", "foo"));
+      writer.write(factory.newGroup().append("str", "bar"));
+      writer.write(factory.newGroup().append("str", "baz"));
+    }
+
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
+      ParquetMetadata footer = reader.getFooter();
+      assertEquals(expectedNumberOfBlocks, footer.getBlocks().size());
+    }
+  }
 }