You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/10/09 18:53:34 UTC
[parquet-mr] branch master updated: PARQUET-2176: Column index/statistics truncation in ParquetWriter (#989)
This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 704ef93ff PARQUET-2176: Column index/statistics truncation in ParquetWriter (#989)
704ef93ff is described below
commit 704ef93ff6db938f6f693f8d662380de91b94a65
Author: patchwork01 <11...@users.noreply.github.com>
AuthorDate: Sun Oct 9 19:53:27 2022 +0100
PARQUET-2176: Column index/statistics truncation in ParquetWriter (#989)
* PARQUET-2176 Set column index truncate length
* PARQUET-2176 Set statistics truncate length
* PARQUET-2176 Refactor creating test temp files
---
.../apache/parquet/hadoop/ParquetFileWriter.java | 8 +-
.../org/apache/parquet/hadoop/ParquetWriter.java | 22 ++++
.../hadoop/TestParquetWriterTruncation.java | 125 +++++++++++++++++++++
3 files changed, 150 insertions(+), 5 deletions(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index e2df7af33..a05a8898f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1193,7 +1193,7 @@ public class ParquetFileWriter {
serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out, fileEncryptor);
+ serializeFooter(footer, out, fileEncryptor, metadataConverter);
out.close();
}
@@ -1322,9 +1322,7 @@ public class ParquetFileWriter {
}
private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
- InternalFileEncryptor fileEncryptor) throws IOException {
-
- ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+ InternalFileEncryptor fileEncryptor, ParquetMetadataConverter metadataConverter) throws IOException {
// Unencrypted file
if (null == fileEncryptor) {
@@ -1499,7 +1497,7 @@ public class ParquetFileWriter {
throws IOException {
PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
metadata.write(MAGIC);
- serializeFooter(metadataFooter, metadata, null);
+ serializeFooter(metadataFooter, metadata, null, new ParquetMetadataConverter());
metadata.close();
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index d1056320d..f18d507ba 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -654,6 +654,28 @@ public class ParquetWriter<T> implements Closeable {
return self();
}
+ /**
+ * Sets the length to be used for truncating binary values in a binary column index.
+ *
+ * @param length the length to truncate to
+ * @return this builder for method chaining
+ */
+ public SELF withColumnIndexTruncateLength(int length) {
+ encodingPropsBuilder.withColumnIndexTruncateLength(length);
+ return self();
+ }
+
+ /**
+ * Sets the length which the min/max binary values in row groups are truncated to.
+ *
+ * @param length the length to truncate to
+ * @return this builder for method chaining
+ */
+ public SELF withStatisticsTruncateLength(int length) {
+ encodingPropsBuilder.withStatisticsTruncateLength(length);
+ return self();
+ }
+
/**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterTruncation.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterTruncation.java
new file mode 100644
index 000000000..dadc55cbc
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterTruncation.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetWriterTruncation {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testTruncateColumnIndex() throws IOException {
+ MessageType schema = Types.buildMessage().
+ required(BINARY).as(stringType()).named("name").named("msg");
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ GroupFactory factory = new SimpleGroupFactory(schema);
+ Path path = newTempPath();
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withPageRowCountLimit(10)
+ .withConf(conf)
+ .withDictionaryEncoding(false)
+ .withColumnIndexTruncateLength(10)
+ .build()) {
+
+ writer.write(factory.newGroup().append("name", "1234567890abcdefghijklmnopqrstuvwxyz"));
+ }
+
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+
+ ColumnChunkMetaData column = reader.getFooter().getBlocks().get(0).getColumns().get(0);
+ ColumnIndex index = reader.readColumnIndex(column);
+ assertEquals(Collections.singletonList("1234567890"), asStrings(index.getMinValues()));
+ assertEquals(Collections.singletonList("1234567891"), asStrings(index.getMaxValues()));
+ }
+ }
+
+ @Test
+ public void testTruncateStatistics() throws IOException {
+ MessageType schema = Types.buildMessage().
+ required(BINARY).as(stringType()).named("name").named("msg");
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ GroupFactory factory = new SimpleGroupFactory(schema);
+ Path path = newTempPath();
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withPageRowCountLimit(10)
+ .withConf(conf)
+ .withDictionaryEncoding(false)
+ .withStatisticsTruncateLength(10)
+ .build()) {
+
+ writer.write(factory.newGroup().append("name", "1234567890abcdefghijklmnopqrstuvwxyz"));
+ }
+
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+
+ ColumnChunkMetaData column = reader.getFooter().getBlocks().get(0).getColumns().get(0);
+ Statistics<?> statistics = column.getStatistics();
+ assertEquals("1234567890", new String(statistics.getMinBytes()));
+ assertEquals("1234567891", new String(statistics.getMaxBytes()));
+ }
+ }
+
+ private Path newTempPath() throws IOException {
+ File file = temp.newFile();
+ Preconditions.checkArgument(file.delete(), "Could not remove temp file");
+ return new Path(file.getAbsolutePath());
+ }
+
+ private static List<String> asStrings(List<ByteBuffer> buffers) {
+ return buffers.stream()
+ .map(buffer -> new String(buffer.array()))
+ .collect(Collectors.toList());
+ }
+
+}