You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2021/09/09 18:17:09 UTC

[parquet-mr] branch master updated: PARQUET-2078: Failed to read parquet file after writing with the same … (#925)

This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f40350  PARQUET-2078: Failed to read parquet file after writing with the same … (#925)
5f40350 is described below

commit 5f403501e9de05b6aa48f028191b4e78bb97fb12
Author: loudongfeng <ne...@qq.com>
AuthorDate: Fri Sep 10 02:17:04 2021 +0800

    PARQUET-2078: Failed to read parquet file after writing with the same … (#925)
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    Read path fix that make usage of this information:
    RowGroup[n].file_offset = RowGroup[n-1].file_offset + RowGroup[n-1].total_compressed_size
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    addressing review comments: more check on writer side.
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    taking alignment padding and sumarry file into account
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    only throw exception when: 1.footer(first column of block meta) encrypted and 2.file_offset corrupted
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    only check firstColumnChunk.isSetMeta_data() for the first block
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    address review comments: empty lines
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    check first rowgroup's file_offset too(SPARK-36696)
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    Using Preconditions.checkState instead of assert in write path
    remove summary file footers case check in read path(which will never happen)
    
    * PARQUET-2078 Failed to read parquet file after writing with the same parquet version
    
    more special case for first row group
---
 .../parquet/io/InvalidFileOffsetException.java     | 29 +++++++
 .../format/converter/ParquetMetadataConverter.java | 94 ++++++++++++++++++++--
 .../apache/parquet/hadoop/ParquetFileWriter.java   |  1 +
 .../parquet/hadoop/TestParquetFileWriter.java      | 83 +++++++++++++++++++
 4 files changed, 200 insertions(+), 7 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java
new file mode 100644
index 0000000..9c0cbe3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+public class InvalidFileOffsetException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public InvalidFileOffsetException(String message) {
+    super(message);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3a10b1c..3c6e32c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.statistics.BinaryStatistics;
@@ -119,10 +120,10 @@ import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.internal.hadoop.metadata.IndexReference;
+import org.apache.parquet.io.InvalidFileOffsetException;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
-import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
 import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
@@ -201,8 +202,22 @@ public class ParquetMetadataConverter {
     List<BlockMetaData> blocks = parquetMetadata.getBlocks();
     List<RowGroup> rowGroups = new ArrayList<RowGroup>();
     long numRows = 0;
+    long preBlockStartPos = 0;
+    long preBlockCompressedSize = 0;
     for (BlockMetaData block : blocks) {
       numRows += block.getRowCount();
+      long blockStartPos = block.getStartingPos();
+      // first block
+      if (blockStartPos == 4) {
+        preBlockStartPos = 0;
+        preBlockCompressedSize = 0;
+      }
+      if (preBlockStartPos != 0) {
+        Preconditions.checkState(blockStartPos >= preBlockStartPos + preBlockCompressedSize,
+          "Invalid block starting position:" + blockStartPos);
+      }
+      preBlockStartPos = blockStartPos;
+      preBlockCompressedSize = block.getCompressedSize();
       addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor);
     }
     FileMetaData fileMetaData = new FileMetaData(
@@ -1226,14 +1241,36 @@ public class ParquetMetadataConverter {
   static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
     List<RowGroup> rowGroups = metaData.getRow_groups();
     List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
+    long preStartIndex = 0;
+    long preCompressedSize = 0;
+    boolean firstColumnWithMetadata = true;
+    if (rowGroups != null && rowGroups.size() > 0) {
+      firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data();
+    }
     for (RowGroup rowGroup : rowGroups) {
       long totalSize = 0;
       long startIndex;
+      ColumnChunk columnChunk = rowGroup.getColumns().get(0);
+      if (firstColumnWithMetadata) {
+        startIndex = getOffset(columnChunk);
+      } else {
+        assert rowGroup.isSetFile_offset();
+        assert rowGroup.isSetTotal_compressed_size();
 
-      if (rowGroup.isSetFile_offset()) {
+        //the file_offset of first block always holds the truth, while other blocks don't :
+        //see PARQUET-2078 for details
         startIndex = rowGroup.getFile_offset();
-      } else {
-        startIndex = getOffset(rowGroup.getColumns().get(0));
+        if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) {
+          //first row group's offset is always 4
+          if (preStartIndex == 0) {
+            startIndex = 4;
+          } else {
+            // use minStartIndex(imprecise in case of padding, but good enough for filtering)
+            startIndex = preStartIndex + preCompressedSize;
+          }
+        }
+        preStartIndex = startIndex;
+        preCompressedSize = rowGroup.getTotal_compressed_size();
       }
 
       if (rowGroup.isSetTotal_compressed_size()) {
@@ -1254,16 +1291,59 @@ public class ParquetMetadataConverter {
     return metaData;
   }
 
+  private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) {
+    boolean invalid = false;
+    assert preStartIndex <= startIndex;
+    // checking the first rowGroup
+    if (preStartIndex == 0 && startIndex != 4) {
+      invalid = true;
+      return invalid;
+    }
+
+    //calculate start index for other blocks
+    long minStartIndex = preStartIndex + preCompressedSize;
+    if (startIndex < minStartIndex) {
+      // a bad offset detected, try first column's offset
+      // can not use minStartIndex in case of padding
+      invalid = true;
+    }
+
+    return invalid;
+  }
+
   // Visible for testing
   static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) {
     List<RowGroup> rowGroups = metaData.getRow_groups();
     List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
+    long preStartIndex = 0;
+    long preCompressedSize = 0;
+    boolean firstColumnWithMetadata = true;
+    if (rowGroups != null && rowGroups.size() > 0) {
+      firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data();
+    }
     for (RowGroup rowGroup : rowGroups) {
       long startIndex;
-      if (rowGroup.isSetFile_offset()) {
-        startIndex = rowGroup.getFile_offset();
+      ColumnChunk columnChunk = rowGroup.getColumns().get(0);
+      if (firstColumnWithMetadata) {
+        startIndex = getOffset(columnChunk);
       } else {
-        startIndex = getOffset(rowGroup.getColumns().get(0));
+        assert rowGroup.isSetFile_offset();
+        assert rowGroup.isSetTotal_compressed_size();
+
+        //the file_offset of first block always holds the truth, while other blocks don't :
+        //see PARQUET-2078 for details
+        startIndex = rowGroup.getFile_offset();
+        if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) {
+          //first row group's offset is always 4
+          if (preStartIndex == 0) {
+            startIndex = 4;
+          } else {
+            throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " +
+              "please use file range instead of block offset for split.");
+          }
+        }
+        preStartIndex = startIndex;
+        preCompressedSize = rowGroup.getTotal_compressed_size();
       }
 
       if (filter.contains(startIndex)) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index afbdf76..2e5d55c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -852,6 +852,7 @@ public class ParquetFileWriter {
     this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
     this.uncompressedLength = 0;
     this.compressedLength = 0;
+    this.currentChunkDictionaryPageOffset = 0;
     columnIndexBuilder = null;
     offsetIndexBuilder = null;
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 5b8c5ed..8dcbf4a 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
@@ -30,6 +34,8 @@ import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -67,6 +73,7 @@ import java.util.concurrent.Callable;
 
 import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
 import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.hadoop.ParquetInputFormat.READ_SUPPORT_CLASS;
 import static org.junit.Assert.*;
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
 import static org.apache.parquet.column.Encoding.PLAIN;
@@ -75,6 +82,7 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_S
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.Type.Repetition.*;
 import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroup;
@@ -240,6 +248,81 @@ public class TestParquetFileWriter {
   }
 
   @Test
+  public void testWriteReadWithRecordReader() throws Exception {
+    File testFile = temp.newFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(C1, 5, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 6, CODEC);
+    long c2Starts = w.getPos();
+    w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY));
+    long c2p1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c2Ends = w.getPos();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    long c1Bock2Starts = w.getPos();
+    long c1p1Bock2Starts = w.getPos();
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c1Block2Ends = w.getPos();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
+    BlockMetaData rowGroup = readFooter.getBlocks().get(0);
+    assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize());
+
+    assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset());
+    assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos());
+    assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset());
+    assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset());
+
+    BlockMetaData rowGroup2 = readFooter.getBlocks().get(1);
+    assertEquals(0, rowGroup2.getColumns().get(0).getDictionaryPageOffset());
+    assertEquals(c1Bock2Starts, rowGroup2.getColumns().get(0).getStartingPos());
+    assertEquals(c1p1Bock2Starts, rowGroup2.getColumns().get(0).getFirstDataPageOffset());
+    assertEquals(c1Block2Ends - c1Bock2Starts, rowGroup2.getColumns().get(0).getTotalSize());
+
+    HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
+    expectedEncoding.add(PLAIN);
+    expectedEncoding.add(BIT_PACKED);
+    assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings());
+
+    ParquetInputSplit split = new ParquetInputSplit(path, 0, w.getPos(),null,
+      readFooter.getBlocks(), SCHEMA.toString(),
+      readFooter.getFileMetaData().getSchema().toString(),
+      readFooter.getFileMetaData().getKeyValueMetaData(),
+      null);
+    ParquetInputFormat input = new ParquetInputFormat();
+    configuration.set(READ_SUPPORT_CLASS, GroupReadSupport.class.getName());
+    TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(configuration, taskAttemptID);
+    RecordReader<Void, ArrayWritable> reader = input.createRecordReader(split, taskContext);
+    assertTrue(reader instanceof ParquetRecordReader);
+    //RowGroup.file_offset is checked here
+    reader.initialize(split, taskContext);
+    reader.close();
+  }
+
+  @Test
   public void testWriteEmptyBlock() throws Exception {
     File testFile = temp.newFile();
     testFile.delete();