You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ge...@apache.org on 2022/06/29 19:21:52 UTC

[parquet-mr] branch master updated: PARQUET-2161: Fix row index generation in combination with range filtering (#978)

This is an automated email from the ASF dual-hosted git repository.

gershinsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 5290bd5e0 PARQUET-2161: Fix row index generation in combination with range filtering (#978)
5290bd5e0 is described below

commit 5290bd5e0ee5dc30db0576e2bfc6eea335c465cf
Author: Ala Luszczak <al...@databricks.com>
AuthorDate: Wed Jun 29 21:21:45 2022 +0200

    PARQUET-2161: Fix row index generation in combination with range filtering (#978)
    
    * PARQUET-2161: Fix row index generation
    
    The row indexes introduced in PARQUET-2117 are not computed correctly
    when:
    (1) range or offset metadata filter is applied, and
    (2) the first row group was eliminated by the filter
    
    For example, if a file has two row groups with 10 rows each, and we
    attempt to only read the 2nd row group, we are going to produce row
    indexes 0, 1, 2, ..., 9 instead of expected 10, 11, ..., 19.
    
    This happens because functions `filterFileMetaDataByStart`
    and `filterFileMetaDataByMidpoint` modify their input `FileMetaData`.
    To return correct result, `generateRowGroupOffsets` has to be computed
    before these filters are applied.
    
    * Adjust assert message
---
 .../format/converter/ParquetMetadataConverter.java  |  8 ++++++--
 .../filter2/recordlevel/PhoneBookWriter.java        |  2 +-
 .../apache/parquet/hadoop/TestParquetReader.java    | 21 +++++++++++++++++++--
 3 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 77856eea5..4409d5f68 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1450,15 +1450,19 @@ public class ParquetMetadataConverter {
       @Override
       public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
         FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+        // We must generate the map *before* filtering because it modifies `fileMetadata`.
+        Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
         FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
-        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
+        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
       }
 
       @Override
       public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
         FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+        // We must generate the map *before* filtering because it modifies `fileMetadata`.
+        Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
         FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
-        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
+        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
       }
     });
     FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 1e74353e2..f68dce425 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -359,7 +359,7 @@ public class PhoneBookWriter {
       User u = userFromGroup(group);
       users.add(u);
       if (validateRowIndexes) {
-        assertEquals(reader.getCurrentRowIndex(), u.id);
+        assertEquals("Row index should be equal to User id", u.id, reader.getCurrentRowIndex());
       }
     }
     return users;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
index 86f14a862..2cb1f5452 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
@@ -51,6 +51,7 @@ public class TestParquetReader {
   private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(makeUsers(1000));
 
   private final Path file;
+  private final long fileSize;
 
   private static Path createPathFromCP(String path) {
     try {
@@ -60,8 +61,9 @@ public class TestParquetReader {
     }
   }
 
-  public TestParquetReader(Path file) {
+  public TestParquetReader(Path file) throws IOException {
     this.file = file;
+    this.fileSize = file.getFileSystem(new Configuration()).getFileStatus(file).getLen();
   }
 
   @Parameterized.Parameters
@@ -126,13 +128,19 @@ public class TestParquetReader {
   }
 
   private List<PhoneBookWriter.User> readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+    throws IOException {
+    return readUsers(filter, useOtherFiltering, useColumnIndexFilter, 0, this.fileSize);
+  }
+
+  private List<PhoneBookWriter.User> readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter, long rangeStart, long rangeEnd)
     throws IOException {
     return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
       .withFilter(filter)
       .useDictionaryFilter(useOtherFiltering)
       .useStatsFilter(useOtherFiltering)
       .useRecordFilter(useOtherFiltering)
-      .useColumnIndexFilter(useColumnIndexFilter), true);
+      .useColumnIndexFilter(useColumnIndexFilter)
+      .withFileRange(rangeStart, rangeEnd), true);
   }
 
   @Test
@@ -157,6 +165,15 @@ public class TestParquetReader {
     assertEquals(reader.getCurrentRowIndex(), -1);
   }
 
+  @Test
+  public void testRangeFiltering() throws Exception {
+    // The readUsers also validates the rowIndex for each returned row.
+    readUsers(FilterCompat.NOOP, false, false, this.fileSize / 2, this.fileSize);
+    readUsers(FilterCompat.NOOP, true, false, this.fileSize / 3, this.fileSize * 3 / 4);
+    readUsers(FilterCompat.NOOP, false, true, this.fileSize / 4, this.fileSize / 2);
+    readUsers(FilterCompat.NOOP, true, true, this.fileSize * 3 / 4, this.fileSize);
+  }
+
   @Test
   public void testSimpleFiltering() throws Exception {
     Set<Long> idSet = new HashSet<>();