You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/10/09 18:53:34 UTC

[parquet-mr] branch master updated: PARQUET-2176: Column index/statistics truncation in ParquetWriter (#989)

This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 704ef93ff PARQUET-2176: Column index/statistics truncation in ParquetWriter (#989)
704ef93ff is described below

commit 704ef93ff6db938f6f693f8d662380de91b94a65
Author: patchwork01 <11...@users.noreply.github.com>
AuthorDate: Sun Oct 9 19:53:27 2022 +0100

    PARQUET-2176: Column index/statistics truncation in ParquetWriter (#989)
    
    * PARQUET-2176 Set column index truncate length
    
    * PARQUET-2176 Set statistics truncate length
    
    * PARQUET-2176 Refactor creating test temp files
---
 .../apache/parquet/hadoop/ParquetFileWriter.java   |   8 +-
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  22 ++++
 .../hadoop/TestParquetWriterTruncation.java        | 125 +++++++++++++++++++++
 3 files changed, 150 insertions(+), 5 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index e2df7af33..a05a8898f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1193,7 +1193,7 @@ public class ParquetFileWriter {
     serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
     LOG.debug("{}: end", out.getPos());
     this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
-    serializeFooter(footer, out, fileEncryptor);
+    serializeFooter(footer, out, fileEncryptor, metadataConverter);
     out.close();
   }
 
@@ -1322,9 +1322,7 @@ public class ParquetFileWriter {
   }
 
   private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
-      InternalFileEncryptor fileEncryptor) throws IOException {
-
-    ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+      InternalFileEncryptor fileEncryptor, ParquetMetadataConverter metadataConverter) throws IOException {
 
     // Unencrypted file
     if (null == fileEncryptor) {
@@ -1499,7 +1497,7 @@ public class ParquetFileWriter {
       throws IOException {
     PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
     metadata.write(MAGIC);
-    serializeFooter(metadataFooter, metadata, null);
+    serializeFooter(metadataFooter, metadata, null, new ParquetMetadataConverter());
     metadata.close();
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index d1056320d..f18d507ba 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -654,6 +654,28 @@ public class ParquetWriter<T> implements Closeable {
       return self();
     }
 
+    /**
+     * Sets the length to be used for truncating binary values in a binary column index.
+     *
+     * @param length the length to truncate to
+     * @return this builder for method chaining
+     */
+    public SELF withColumnIndexTruncateLength(int length) {
+      encodingPropsBuilder.withColumnIndexTruncateLength(length);
+      return self();
+    }
+
+    /**
+     * Sets the length which the min/max binary values in row groups are truncated to.
+     *
+     * @param length the length to truncate to
+     * @return this builder for method chaining
+     */
+    public SELF withStatisticsTruncateLength(int length) {
+      encodingPropsBuilder.withStatisticsTruncateLength(length);
+      return self();
+    }
+
     /**
      * Set a property that will be available to the read path. For writers that use a Hadoop
      * configuration, this is the recommended way to add configuration values.
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterTruncation.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterTruncation.java
new file mode 100644
index 000000000..dadc55cbc
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterTruncation.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetWriterTruncation {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testTruncateColumnIndex() throws IOException {
+    MessageType schema = Types.buildMessage().
+      required(BINARY).as(stringType()).named("name").named("msg");
+
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    GroupFactory factory = new SimpleGroupFactory(schema);
+    Path path = newTempPath();
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withPageRowCountLimit(10)
+      .withConf(conf)
+      .withDictionaryEncoding(false)
+      .withColumnIndexTruncateLength(10)
+      .build()) {
+
+      writer.write(factory.newGroup().append("name", "1234567890abcdefghijklmnopqrstuvwxyz"));
+    }
+
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+
+      ColumnChunkMetaData column = reader.getFooter().getBlocks().get(0).getColumns().get(0);
+      ColumnIndex index = reader.readColumnIndex(column);
+      assertEquals(Collections.singletonList("1234567890"), asStrings(index.getMinValues()));
+      assertEquals(Collections.singletonList("1234567891"), asStrings(index.getMaxValues()));
+    }
+  }
+
+  @Test
+  public void testTruncateStatistics() throws IOException {
+    MessageType schema = Types.buildMessage().
+      required(BINARY).as(stringType()).named("name").named("msg");
+
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    GroupFactory factory = new SimpleGroupFactory(schema);
+    Path path = newTempPath();
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withPageRowCountLimit(10)
+      .withConf(conf)
+      .withDictionaryEncoding(false)
+      .withStatisticsTruncateLength(10)
+      .build()) {
+
+      writer.write(factory.newGroup().append("name", "1234567890abcdefghijklmnopqrstuvwxyz"));
+    }
+
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+
+      ColumnChunkMetaData column = reader.getFooter().getBlocks().get(0).getColumns().get(0);
+      Statistics<?> statistics = column.getStatistics();
+      assertEquals("1234567890", new String(statistics.getMinBytes()));
+      assertEquals("1234567891", new String(statistics.getMaxBytes()));
+    }
+  }
+
+  private Path newTempPath() throws IOException {
+    File file = temp.newFile();
+    Preconditions.checkArgument(file.delete(), "Could not remove temp file");
+    return new Path(file.getAbsolutePath());
+  }
+
+  private static List<String> asStrings(List<ByteBuffer> buffers) {
+    return buffers.stream()
+      .map(buffer -> new String(buffer.array()))
+      .collect(Collectors.toList());
+  }
+
+}