You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/06/11 08:23:58 UTC

[parquet-mr] branch master updated: PARQUET-1633: Fix integer overflow (#902)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 98ddadf  PARQUET-1633: Fix integer overflow (#902)
98ddadf is described below

commit 98ddadf0b8f283dec7c45937e01233869eac4467
Author: Edward Wright <ea...@gmail.com>
AuthorDate: Fri Jun 11 09:23:48 2021 +0100

    PARQUET-1633: Fix integer overflow (#902)
    
    Unit test:
    - Updated ParquetWriter to support setting row group size in long
    - Removed Xmx settings in the pom to allow more memory for the tests
    
    Co-authored-by: Gabor Szadovszky <ga...@apache.org>
---
 .../apache/parquet/hadoop/ParquetFileReader.java   |  14 +-
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  16 ++-
 .../parquet/hadoop/TestLargeColumnChunk.java       | 142 +++++++++++++++++++++
 pom.xml                                            |   3 +-
 4 files changed, 164 insertions(+), 11 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 1fa9c1f..3a68e01 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -944,7 +944,7 @@ public class ParquetFileReader implements Closeable {
           currentParts = new ConsecutivePartList(startingPos);
           allParts.add(currentParts);
         }
-        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
+        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
       }
     }
     // actually read all the chunks
@@ -1066,7 +1066,7 @@ public class ParquetFileReader implements Closeable {
             allParts.add(currentParts);
           }
           ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
-              Math.toIntExact(range.getLength()));
+              range.getLength());
           currentParts.addChunk(chunkDescriptor);
           builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
         }
@@ -1691,7 +1691,7 @@ public class ParquetFileReader implements Closeable {
     private final ColumnDescriptor col;
     private final ColumnChunkMetaData metadata;
     private final long fileOffset;
-    private final int size;
+    private final long size;
 
     /**
      * @param col column this chunk is part of
@@ -1703,7 +1703,7 @@ public class ParquetFileReader implements Closeable {
         ColumnDescriptor col,
         ColumnChunkMetaData metadata,
         long fileOffset,
-        int size) {
+        long size) {
       super();
       this.col = col;
       this.metadata = metadata;
@@ -1735,7 +1735,7 @@ public class ParquetFileReader implements Closeable {
   private class ConsecutivePartList {
 
     private final long offset;
-    private int length;
+    private long length;
     private final List<ChunkDescriptor> chunks = new ArrayList<>();
 
     /**
@@ -1763,8 +1763,8 @@ public class ParquetFileReader implements Closeable {
     public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
       f.seek(offset);
 
-      int fullAllocations = length / options.getMaxAllocationSize();
-      int lastAllocationSize = length % options.getMaxAllocationSize();
+      int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
+      int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());
 
       int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
       List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 696fec3..b9953a5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -269,7 +269,7 @@ public class ParquetWriter<T> implements Closeable {
       ParquetFileWriter.Mode mode,
       WriteSupport<T> writeSupport,
       CompressionCodecName compressionCodecName,
-      int rowGroupSize,
+      long rowGroupSize,
       boolean validating,
       Configuration conf,
       int maxPaddingSize,
@@ -355,7 +355,7 @@ public class ParquetWriter<T> implements Closeable {
     private Configuration conf = new Configuration();
     private ParquetFileWriter.Mode mode;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
-    private int rowGroupSize = DEFAULT_BLOCK_SIZE;
+    private long rowGroupSize = DEFAULT_BLOCK_SIZE;
     private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
     private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
     private ParquetProperties.Builder encodingPropsBuilder =
@@ -432,8 +432,20 @@ public class ParquetWriter<T> implements Closeable {
      *
      * @param rowGroupSize an integer size in bytes
      * @return this builder for method chaining.
+     * @deprecated Use {@link #withRowGroupSize(long)} instead
      */
+    @Deprecated
     public SELF withRowGroupSize(int rowGroupSize) {
+      return withRowGroupSize((long) rowGroupSize);
+    }
+
+    /**
+     * Set the Parquet format row group size used by the constructed writer.
+     *
+     * @param rowGroupSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public SELF withRowGroupSize(long rowGroupSize) {
       this.rowGroupSize = rowGroupSize;
       return self();
     }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java
new file mode 100644
index 0000000..90015f5
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.buildMessage;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is to test parquet-mr working with potential int overflows (when the sizes are greater than
+ * Integer.MAX_VALUE). The test requires ~3GB memory so it is likely to fail in the CI environment, so these
+ * tests are flagged to be ignored.
+ */
+@Ignore
+public class TestLargeColumnChunk {
+  private static final MessageType SCHEMA = buildMessage().addFields(
+      required(INT64).named("id"),
+      required(BINARY).named("data"))
+      .named("schema");
+  private static final int DATA_SIZE = 256;
+  // Ensure that the size of the column chunk would overflow an int
+  private static final int ROW_COUNT = Integer.MAX_VALUE / DATA_SIZE + 1000;
+  private static final long RANDOM_SEED = 42;
+  private static final int ID_INDEX = SCHEMA.getFieldIndex("id");
+  private static final int DATA_INDEX = SCHEMA.getFieldIndex("data");
+
+  private static final long ID_OF_FILTERED_DATA = ROW_COUNT / 2;
+  private static Binary VALUE_IN_DATA;
+  private static Binary VALUE_NOT_IN_DATA;
+  private static Path file;
+
+  @ClassRule
+  public static TemporaryFolder folder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void createFile() throws IOException {
+    file = new Path(folder.newFile().getAbsolutePath());
+
+    GroupFactory factory = new SimpleGroupFactory(SCHEMA);
+    Random random = new Random(RANDOM_SEED);
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(SCHEMA, conf);
+    try (ParquetWriter<Group> writer = ExampleParquetWriter
+        .builder(HadoopOutputFile.fromPath(file, conf))
+        .withWriteMode(OVERWRITE)
+        .withConf(conf)
+        .withCompressionCodec(UNCOMPRESSED)
+        .withRowGroupSize(4L * 1024 * 1024 * 1024) // 4G to ensure all data goes to one row group
+        .withBloomFilterEnabled(true)
+        .build()) {
+      for (long id = 0; id < ROW_COUNT; ++id) {
+        Group group = factory.newGroup();
+        group.add(ID_INDEX, id);
+        Binary data = nextBinary(random);
+        group.add(DATA_INDEX, data);
+        writer.write(group);
+        if (id == ID_OF_FILTERED_DATA) {
+          VALUE_IN_DATA = data;
+        }
+      }
+    }
+    VALUE_NOT_IN_DATA = nextBinary(random);
+  }
+
+  private static Binary nextBinary(Random random) {
+    byte[] bytes = new byte[DATA_SIZE];
+    random.nextBytes(bytes);
+    return Binary.fromConstantByteArray(bytes);
+  }
+
+  @Test
+  public void validateAllData() throws IOException {
+    Random random = new Random(RANDOM_SEED);
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).build()) {
+      for (long id = 0; id < ROW_COUNT; ++id) {
+        Group group = reader.read();
+        assertEquals(id, group.getLong(ID_INDEX, 0));
+        assertEquals(nextBinary(random), group.getBinary(DATA_INDEX, 0));
+      }
+      assertNull("No more record should be read", reader.read());
+    }
+  }
+
+  @Test
+  public void validateFiltering() throws IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_IN_DATA)))
+        .build()) {
+      Group group = reader.read();
+      assertEquals(ID_OF_FILTERED_DATA, group.getLong(ID_INDEX, 0));
+      assertEquals(VALUE_IN_DATA, group.getBinary(DATA_INDEX, 0));
+      assertNull("No more record should be read", reader.read());
+    }
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_NOT_IN_DATA)))
+        .build()) {
+      assertNull("No record should be read", reader.read());
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 738b527..090ae96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
     <commons-text.version>1.8</commons-text.version>
 
     <!-- properties for the profiles -->
-    <surefire.argLine>-Xmx512m</surefire.argLine>
     <surefire.logLevel>INFO</surefire.logLevel>
   </properties>
 
@@ -562,7 +561,7 @@
       <id>ci-test</id>
       <properties>
         <surefire.logLevel>WARN</surefire.logLevel>
-        <surefire.argLine>-Xmx512m -XX:MaxJavaStackTraceDepth=10</surefire.argLine>
+        <surefire.argLine>-XX:MaxJavaStackTraceDepth=10</surefire.argLine>
       </properties>
     </profile>