You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2021/04/23 17:05:43 UTC
[parquet-mr] branch master updated: PARQUET-2027: Fix calculating
directory offset for merge (#896)
This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 2ce35c7 PARQUET-2027: Fix calculating directory offset for merge (#896)
2ce35c7 is described below
commit 2ce35c73746cf091ed223da150daefd323a9ad3a
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Apr 23 19:05:34 2021 +0200
PARQUET-2027: Fix calculating directory offset for merge (#896)
---
.../java/org/apache/parquet/hadoop/Offsets.java | 8 ++-
.../hadoop/TestParquetWriterAppendBlocks.java | 63 ++++++++++++++++++++-
.../src/test/resources/test-append_1.parquet | Bin 0 -> 7375 bytes
.../src/test/resources/test-append_2.parquet | Bin 0 -> 7374 bytes
4 files changed, 65 insertions(+), 6 deletions(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
index fa25943..2edc585 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
@@ -55,7 +55,7 @@ class Offsets {
* (0 cannot be a valid offset because of the MAGIC bytes)
* - The firstDataPageOffset might point to the dictionary page
*/
- dictionaryPageSize = readDictionaryPageSize(input, newChunkStart);
+ dictionaryPageSize = readDictionaryPageSize(input, chunk);
} else {
dictionaryPageSize = chunk.getFirstDataPageOffset() - chunk.getDictionaryPageOffset();
}
@@ -68,12 +68,14 @@ class Offsets {
return new Offsets(firstDataPageOffset, dictionaryPageOffset);
}
- private static long readDictionaryPageSize(SeekableInputStream in, long pos) throws IOException {
+ private static long readDictionaryPageSize(SeekableInputStream in, ColumnChunkMetaData chunk) throws IOException {
long origPos = -1;
try {
origPos = in.getPos();
+ in.seek(chunk.getStartingPos());
+ long headerStart = in.getPos();
PageHeader header = Util.readPageHeader(in);
- long headerSize = in.getPos() - origPos;
+ long headerSize = in.getPos() - headerStart;
return headerSize + header.getCompressed_page_size();
} finally {
if (origPos != -1) {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
index bda5333..82d48f4 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterAppendBlocks.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -68,6 +69,17 @@ public class TestParquetWriterAppendBlocks {
public static final SimpleGroupFactory GROUP_FACTORY =
new SimpleGroupFactory(FILE_SCHEMA);
+ private static final Path STATIC_FILE_1 = createPathFromCP("/test-append_1.parquet");
+ private static final Path STATIC_FILE_2 = createPathFromCP("/test-append_2.parquet");
+
+ private static Path createPathFromCP(String path) {
+ try {
+ return new Path(TestParquetWriterAppendBlocks.class.getResource(path).toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public Path file1;
public List<Group> file1content = new ArrayList<Group>();
public Path file2;
@@ -134,6 +146,51 @@ public class TestParquetWriterAppendBlocks {
Assert.assertEquals("All records should be present", 0, expected.size());
}
+ /**
+ * This test is similar to {@link #testBasicBehavior()} only that it uses static files generated by a previous release
+ * (1.11.1). This test is to validate the fix of PARQUET-2027.
+ */
+ @Test
+ public void testBasicBehaviorWithStaticFiles() throws IOException {
+ List<Group> expected = new ArrayList<>();
+ readAll(STATIC_FILE_1, expected);
+ readAll(STATIC_FILE_2, expected);
+
+ Path combinedFile = newTemp();
+ ParquetFileWriter writer = new ParquetFileWriter(
+ CONF, FILE_SCHEMA, combinedFile);
+ writer.start();
+ writer.appendFile(CONF, STATIC_FILE_1);
+ writer.appendFile(CONF, STATIC_FILE_2);
+ writer.end(EMPTY_METADATA);
+
+ try (ParquetReader<Group> reader = ParquetReader
+ .builder(new GroupReadSupport(), combinedFile)
+ .build()) {
+
+ for (Group expectedNext : expected) {
+ Group next = reader.read();
+ // check each value; equals is not supported for simple records
+ Assert.assertEquals("Each id should match",
+ expectedNext.getInteger("id", 0), next.getInteger("id", 0));
+ Assert.assertEquals("Each string should match",
+ expectedNext.getString("string", 0), next.getString("string", 0));
+ }
+ Assert.assertNull("No extra records should be present", reader.read());
+ }
+
+ }
+
+ private void readAll(Path file, List<Group> values) throws IOException {
+ try (ParquetReader<Group> reader = ParquetReader
+ .builder(new GroupReadSupport(), file)
+ .build()) {
+ for (Group g = reader.read(); g != null; g = reader.read()) {
+ values.add(g);
+ }
+ }
+ }
+
@Test
public void testMergedMetadata() throws IOException {
Path combinedFile = newTemp();
diff --git a/parquet-hadoop/src/test/resources/test-append_1.parquet b/parquet-hadoop/src/test/resources/test-append_1.parquet
new file mode 100644
index 0000000..a255f86
Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-append_1.parquet differ
diff --git a/parquet-hadoop/src/test/resources/test-append_2.parquet b/parquet-hadoop/src/test/resources/test-append_2.parquet
new file mode 100644
index 0000000..3081f89
Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-append_2.parquet differ