You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/04/22 07:32:25 UTC
[parquet-mr] branch master updated: PARQUET-2030: Expose page size
row check configurations to ParquetWriter.Builder (#895)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 8e40e69 PARQUET-2030: Expose page size row check configurations to ParquetWriter.Builder (#895)
8e40e69 is described below
commit 8e40e69ce3e02cf4f8f0c624ff9e6173509961ee
Author: Miksu82 <22...@users.noreply.github.com>
AuthorDate: Thu Apr 22 10:32:13 2021 +0300
PARQUET-2030: Expose page size row check configurations to ParquetWriter.Builder (#895)
---
.../org/apache/parquet/hadoop/ParquetWriter.java | 22 ++++++++++
.../apache/parquet/hadoop/TestParquetWriter.java | 47 ++++++++++++++++++++++
2 files changed, 69 insertions(+)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index e1afaca..696fec3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -616,6 +616,28 @@ public class ParquetWriter<T> implements Closeable {
}
/**
+ * Sets the minimum number of rows to write before a page size check is done.
+ *
+ * @param min writes at least `min` rows before invoking a page size check
+ * @return this builder for method chaining
+ */
+ public SELF withMinRowCountForPageSizeCheck(int min) {
+ encodingPropsBuilder.withMinRowCountForPageSizeCheck(min);
+ return self();
+ }
+
+ /**
+ * Sets the maximum number of rows to write before a page size check is done.
+ *
+ * @param max makes a page size check after `max` rows have been written
+ * @return this builder for method chaining
+ */
+ public SELF withMaxRowCountForPageSizeCheck(int max) {
+ encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max);
+ return self();
+ }
+
+ /**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
*
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index de53e96..9e9b735 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Callable;
import net.openhft.hashing.LongHashFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
@@ -279,4 +280,50 @@ public class TestParquetWriter {
LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
}
}
+
+ @Test
+ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
+ testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+ ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
+ 1);
+ testParquetFileNumberOfBlocks(1, 1, 3);
+
+ }
+
+ private void testParquetFileNumberOfBlocks(int minRowCountForPageSizeCheck,
+ int maxRowCountForPageSizeCheck,
+ int expectedNumberOfBlocks) throws IOException {
+ MessageType schema = Types
+ .buildMessage()
+ .required(BINARY)
+ .as(stringType())
+ .named("str")
+ .named("msg");
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ File file = temp.newFile();
+ temp.delete();
+ Path path = new Path(file.getAbsolutePath());
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ // Set row group size to 1, to make sure we flush every time
+ // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded
+ .withRowGroupSize(1)
+ .withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)
+ .withMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ writer.write(factory.newGroup().append("str", "foo"));
+ writer.write(factory.newGroup().append("str", "bar"));
+ writer.write(factory.newGroup().append("str", "baz"));
+ }
+
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
+ ParquetMetadata footer = reader.getFooter();
+ assertEquals(expectedNumberOfBlocks, footer.getBlocks().size());
+ }
+ }
}