You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/04/26 13:28:47 UTC

[parquet-mr] 02/02: PARQUET-2027: Fix calculating directory offset for merge (#896)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch parquet-1.12.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git

commit 76f35944d5994817a1f60fac5af791e7f515d3cf
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Apr 23 19:05:34 2021 +0200

    PARQUET-2027: Fix calculating directory offset for merge (#896)
    
    (cherry picked from commit 2ce35c73746cf091ed223da150daefd323a9ad3a)
---
 .../java/org/apache/parquet/hadoop/Offsets.java    |   8 ++-
 .../hadoop/TestParquetWriterAppendBlocks.java      |  63 ++++++++++++++++++++-
 .../src/test/resources/test-append_1.parquet       | Bin 0 -> 7375 bytes
 .../src/test/resources/test-append_2.parquet       | Bin 0 -> 7374 bytes
 4 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
index fa25943..2edc585 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
@@ -55,7 +55,7 @@ class Offsets {
          *   (0 cannot be a valid offset because of the MAGIC bytes)
          * - The firstDataPageOffset might point to the dictionary page
          */
-        dictionaryPageSize = readDictionaryPageSize(input, newChunkStart);
+        dictionaryPageSize = readDictionaryPageSize(input, chunk);
       } else {
         dictionaryPageSize = chunk.getFirstDataPageOffset() - chunk.getDictionaryPageOffset();
       }
@@ -68,12 +68,14 @@ class Offsets {
     return new Offsets(firstDataPageOffset, dictionaryPageOffset);
   }
 
-  private static long readDictionaryPageSize(SeekableInputStream in, long pos) throws IOException {
+  private static long readDictionaryPageSize(SeekableInputStream in, ColumnChunkMetaData chunk) throws IOException {
     long origPos = -1;
     try {
       origPos = in.getPos();
+      in.seek(chunk.getStartingPos());
+      long headerStart = in.getPos();
       PageHeader header = Util.readPageHeader(in);
-      long headerSize = in.getPos() - origPos;
+      long headerSize = in.getPos() - headerStart;
       return headerSize + header.getCompressed_page_size();
     } finally {
       if (origPos != -1) {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
index bda5333..82d48f4 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -68,6 +69,17 @@ public class TestParquetWriterAppendBlocks {
   public static final SimpleGroupFactory GROUP_FACTORY =
       new SimpleGroupFactory(FILE_SCHEMA);
 
+  private static final Path STATIC_FILE_1 = createPathFromCP("/test-append_1.parquet");
+  private static final Path STATIC_FILE_2 = createPathFromCP("/test-append_2.parquet");
+
+  private static Path createPathFromCP(String path) {
+    try {
+      return new Path(TestParquetWriterAppendBlocks.class.getResource(path).toURI());
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public Path file1;
   public List<Group> file1content = new ArrayList<Group>();
   public Path file2;
@@ -134,6 +146,51 @@ public class TestParquetWriterAppendBlocks {
     Assert.assertEquals("All records should be present", 0, expected.size());
   }
 
+  /**
+   * This test is similar to {@link #testBasicBehavior()} only that it uses static files generated by a previous release
+   * (1.11.1). This test is to validate the fix of PARQUET-2027.
+   */
+  @Test
+  public void testBasicBehaviorWithStaticFiles() throws IOException {
+    List<Group> expected = new ArrayList<>();
+    readAll(STATIC_FILE_1, expected);
+    readAll(STATIC_FILE_2, expected);
+
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+        CONF, FILE_SCHEMA, combinedFile);
+    writer.start();
+    writer.appendFile(CONF, STATIC_FILE_1);
+    writer.appendFile(CONF, STATIC_FILE_2);
+    writer.end(EMPTY_METADATA);
+
+    try (ParquetReader<Group> reader = ParquetReader
+        .builder(new GroupReadSupport(), combinedFile)
+        .build()) {
+
+      for (Group expectedNext : expected) {
+        Group next = reader.read();
+        // check each value; equals is not supported for simple records
+        Assert.assertEquals("Each id should match",
+            expectedNext.getInteger("id", 0), next.getInteger("id", 0));
+        Assert.assertEquals("Each string should match",
+            expectedNext.getString("string", 0), next.getString("string", 0));
+      }
+      Assert.assertNull("No extra records should be present", reader.read());
+    }
+
+  }
+
+  private void readAll(Path file, List<Group> values) throws IOException {
+    try (ParquetReader<Group> reader = ParquetReader
+        .builder(new GroupReadSupport(), file)
+        .build()) {
+      for (Group g = reader.read(); g != null; g = reader.read()) {
+        values.add(g);
+      }
+    }
+  }
+
   @Test
   public void testMergedMetadata() throws IOException {
     Path combinedFile = newTemp();
diff --git a/parquet-hadoop/src/test/resources/test-append_1.parquet b/parquet-hadoop/src/test/resources/test-append_1.parquet
new file mode 100644
index 0000000..a255f86
Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-append_1.parquet differ
diff --git a/parquet-hadoop/src/test/resources/test-append_2.parquet b/parquet-hadoop/src/test/resources/test-append_2.parquet
new file mode 100644
index 0000000..3081f89
Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-append_2.parquet differ