You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/03/20 00:00:30 UTC

[parquet-mr] branch master updated: PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader (#945)

This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new c7bff51  PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader (#945)
c7bff51 is described below

commit c7bff519094920a8609df6cbd98821a43ed779e3
Author: Prakhar Jain <pr...@gmail.com>
AuthorDate: Sat Mar 19 17:00:24 2022 -0700

    PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader (#945)
    
    * PARQUET-2117: Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader
    
     - Add and populate rowIndexOffset field in BlockMetaData
     - Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader
     - Add new unit tests and extend all the ColumnIndexFiltering and BloomFiltering unit tests to validate row indexes also.
    
    * address review comments
    
    * add test based on old parquet file without column indexes
    
    * address review comments - Return -1 when row index info not available, document the same, Return -1 when rowIndexOffset info not available in BlockMetadata
    
    * address review comments - Fix java doc style
    
    * address review comments from ggershinsky - early return and reduce indentation
    
    * fix build
---
 .../apache/parquet/column/page/PageReadStore.java  |   8 +
 .../format/converter/ParquetMetadataConverter.java |  63 +++++--
 .../parquet/hadoop/ColumnChunkPageReadStore.java   |  18 +-
 .../hadoop/InternalParquetRecordReader.java        |  55 ++++++-
 .../apache/parquet/hadoop/ParquetFileReader.java   |   4 +-
 .../org/apache/parquet/hadoop/ParquetReader.java   |  10 ++
 .../apache/parquet/hadoop/ParquetRecordReader.java |   7 +
 .../parquet/hadoop/metadata/BlockMetaData.java     |  19 ++-
 .../filter2/recordlevel/PhoneBookWriter.java       |  19 ++-
 .../apache/parquet/hadoop/TestBloomFiltering.java  |   2 +-
 .../parquet/hadoop/TestColumnIndexFiltering.java   |   4 +-
 .../apache/parquet/hadoop/TestParquetReader.java   | 181 +++++++++++++++++++++
 .../test-file-with-no-column-indexes-1.parquet     | Bin 0 -> 35855 bytes
 13 files changed, 370 insertions(+), 20 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
index 753bda8..796cf17 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -44,6 +44,14 @@ public interface PageReadStore {
   long getRowCount();
 
   /**
+   * @return the optional of the long representing the row index offset of this row group or an empty optional if the
+   *         related data is not available
+   */
+  default Optional<Long> getRowIndexOffset() {
+    return Optional.empty();
+  }
+
+  /**
    * Returns the indexes of the rows to be read/built if the related data is available. All the rows which index is not
    * returned shall be skipped.
    *
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 96980a4..0ea75f3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1400,6 +1400,31 @@ public class ParquetMetadataConverter {
     return readParquetMetadata(from, filter, null, false, 0);
   }
 
+  private Map<RowGroup, Long> generateRowGroupOffsets(FileMetaData metaData) {
+    Map<RowGroup, Long> rowGroupOrdinalToRowIdx = new HashMap<>();
+    List<RowGroup> rowGroups = metaData.getRow_groups();
+    if (rowGroups != null) {
+      long rowIdxSum = 0;
+      for (int i = 0; i < rowGroups.size(); i++) {
+        rowGroupOrdinalToRowIdx.put(rowGroups.get(i), rowIdxSum);
+        rowIdxSum += rowGroups.get(i).getNum_rows();
+      }
+    }
+    return rowGroupOrdinalToRowIdx;
+  }
+
+  /**
+   * A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map.
+   */
+  private class FileMetaDataAndRowGroupOffsetInfo {
+    final FileMetaData fileMetadata;
+    final Map<RowGroup, Long> rowGroupToRowIndexOffsetMap;
+    public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata, Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) {
+      this.fileMetadata = fileMetadata;
+      this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap;
+    }
+  }
+
   public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter,
       final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter,
       final int combinedFooterLength) throws IOException {
@@ -1407,27 +1432,35 @@ public class ParquetMetadataConverter {
     final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null);
     final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
 
-    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
+    FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
       @Override
-      public FileMetaData visit(NoFilter filter) throws IOException {
-        return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+      public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
+        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+        return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
       }
 
       @Override
-      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
-        return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+      public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
+        FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+        return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
       }
 
       @Override
-      public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
-        return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
+      public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
+        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+        FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
+        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
       }
 
       @Override
-      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
-        return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
+      public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
+        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+        FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
+        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
       }
     });
+    FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata;
+    Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = fileMetaDataAndRowGroupInfo.rowGroupToRowIndexOffsetMap;
     LOG.debug("{}", fileMetaData);
 
     if (!encryptedFooter && null != fileDecryptor) {
@@ -1447,7 +1480,7 @@ public class ParquetMetadataConverter {
       }
     }
 
-    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter);
+    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter, rowGroupToRowIndexOffsetMap);
     if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
     return parquetMetadata;
   }
@@ -1476,6 +1509,13 @@ public class ParquetMetadataConverter {
 
   public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
       InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException {
+    return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, new HashMap<RowGroup, Long>());
+  }
+
+  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
+                                             InternalFileDecryptor fileDecryptor,
+                                             boolean encryptedFooter,
+                                             Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) throws IOException {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
@@ -1485,6 +1525,9 @@ public class ParquetMetadataConverter {
         BlockMetaData blockMetaData = new BlockMetaData();
         blockMetaData.setRowCount(rowGroup.getNum_rows());
         blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        if (rowGroupToRowIndexOffsetMap.containsKey(rowGroup)) {
+          blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup));
+        }
         // not set in legacy files
         if (rowGroup.isSetOrdinal()) {
           blockMetaData.setOrdinal(rowGroup.getOrdinal());
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 3d1bafe..85ba98c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -248,15 +248,26 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
 
   private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
   private final long rowCount;
+  private final long rowIndexOffset;
   private final RowRanges rowRanges;
 
   public ColumnChunkPageReadStore(long rowCount) {
+    this(rowCount, -1);
+  }
+
+  ColumnChunkPageReadStore(RowRanges rowRanges) {
+    this(rowRanges, -1);
+  }
+
+  ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) {
     this.rowCount = rowCount;
+    this.rowIndexOffset = rowIndexOffset;
     rowRanges = null;
   }
 
-  ColumnChunkPageReadStore(RowRanges rowRanges) {
+  ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) {
     this.rowRanges = rowRanges;
+    this.rowIndexOffset = rowIndexOffset;
     rowCount = rowRanges.rowCount();
   }
 
@@ -266,6 +277,11 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
   }
 
   @Override
+  public Optional<Long> getRowIndexOffset() {
+    return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset);
+  }
+
+  @Override
   public PageReader getPageReader(ColumnDescriptor path) {
     final PageReader pageReader = readers.get(path);
     if (pageReader == null) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 8ffe19f..8203e90 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
+import java.util.PrimitiveIterator;
 import java.util.Set;
+import java.util.stream.LongStream;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -69,6 +71,8 @@ class InternalParquetRecordReader<T> {
   private long current = 0;
   private int currentBlock = -1;
   private ParquetFileReader reader;
+  private long currentRowIdx = -1;
+  private PrimitiveIterator.OfLong rowIdxInFileItr;
   private org.apache.parquet.io.RecordReader<T> recordReader;
   private boolean strictTypeChecking;
 
@@ -127,6 +131,7 @@ class InternalParquetRecordReader<T> {
       if (pages == null) {
         throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
       }
+      resetRowIndexIterator(pages);
       long timeSpentReading = System.currentTimeMillis() - t0;
       totalTimeSpentReadingBytes += timeSpentReading;
       BenchmarkCounter.incrementTime(timeSpentReading);
@@ -227,6 +232,11 @@ class InternalParquetRecordReader<T> {
 
         try {
           currentValue = recordReader.read();
+          if (rowIdxInFileItr != null && rowIdxInFileItr.hasNext()) {
+            currentRowIdx = rowIdxInFileItr.next();
+          } else {
+            currentRowIdx = -1;
+          }
         } catch (RecordMaterializationException e) {
           // this might throw, but it's fatal if it does.
           unmaterializableRecordCounter.incErrors(e);
@@ -265,4 +275,47 @@ class InternalParquetRecordReader<T> {
     return Collections.unmodifiableMap(setMultiMap);
   }
 
+  /**
+   * Returns the row index of the current row. If no row has been processed or if the
+   * row index information is unavailable from the underlying @{@link PageReadStore}, returns -1.
+   */
+  public long getCurrentRowIndex() {
+    if (current == 0L || rowIdxInFileItr == null) {
+      return -1;
+    }
+    return currentRowIdx;
+  }
+
+  /**
+   * Resets the row index iterator based on the current processed row group.
+   */
+  private void resetRowIndexIterator(PageReadStore pages) {
+    Optional<Long> rowGroupRowIdxOffset = pages.getRowIndexOffset();
+    if (!rowGroupRowIdxOffset.isPresent()) {
+      this.rowIdxInFileItr = null;
+      return;
+    }
+
+    currentRowIdx = -1;
+    final PrimitiveIterator.OfLong rowIdxInRowGroupItr;
+    if (pages.getRowIndexes().isPresent()) {
+      rowIdxInRowGroupItr = pages.getRowIndexes().get();
+    } else {
+      rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator();
+    }
+    // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator.
+    this.rowIdxInFileItr = new PrimitiveIterator.OfLong() {
+      public long nextLong() {
+        return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong();
+      }
+
+      public boolean hasNext() {
+        return rowIdxInRowGroupItr.hasNext();
+      }
+
+      public Long next() {
+        return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next();
+      }
+    };
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 63a22d1..97fe86d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -929,7 +929,7 @@ public class ParquetFileReader implements Closeable {
     if (block.getRowCount() == 0) {
       throw new RuntimeException("Illegal row group of 0 rows");
     }
-    ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount());
+    ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
     // prepare the list of consecutive parts to read them in one scan
     List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
     ConsecutivePartList currentParts = null;
@@ -1044,7 +1044,7 @@ public class ParquetFileReader implements Closeable {
   }
 
   private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
-    ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges);
+    ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset());
     // prepare the list of consecutive parts to read them in one scan
     ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     List<ConsecutivePartList> allParts = new ArrayList<>();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index c215f5e..6d76723 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -140,6 +140,16 @@ public class ParquetReader<T> implements Closeable {
     }
   }
 
+  /**
+   * @return the row index of the last read row. If no row has been processed, returns -1.
+   */
+  public long getCurrentRowIndex() {
+    if (reader == null) {
+      return -1;
+    }
+    return reader.getCurrentRowIndex();
+  }
+
   private void initReader() throws IOException {
     if (reader != null) {
       reader.close();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 4653410..e46ccdd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -207,6 +207,13 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
     return internalReader.nextKeyValue();
   }
 
+  /**
+   * @return the row index of the current row. If no row has been processed, returns -1.
+   */
+  public long getCurrentRowIndex() throws IOException {
+    return internalReader.getCurrentRowIndex();
+  }
+
   private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
     if (split instanceof ParquetInputSplit) {
       return (ParquetInputSplit) split;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
index ce204dc..4f9fd14 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -33,6 +33,7 @@ public class BlockMetaData {
   private long totalByteSize;
   private String path;
   private int ordinal;
+  private long rowIndexOffset = -1;
 
   public BlockMetaData() {
   }
@@ -66,6 +67,18 @@ public class BlockMetaData {
   }
 
   /**
+   * @return -1 if the rowIndexOffset for the {@link BlockMetaData} is unavailable else returns the actual rowIndexOffset
+   */
+  public long getRowIndexOffset() { return rowIndexOffset; }
+
+  /**
+   * @param rowIndexOffset the rowIndexOffset to set
+   */
+  public void setRowIndexOffset(long rowIndexOffset) {
+    this.rowIndexOffset = rowIndexOffset;
+  }
+
+  /**
    * @return the totalByteSize
    */
   public long getTotalByteSize() {
@@ -105,7 +118,11 @@ public class BlockMetaData {
   
   @Override
   public String toString() {
-    return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
+    String rowIndexOffsetStr = "";
+    if (rowIndexOffset != -1) {
+      rowIndexOffsetStr = ", rowIndexOffset = " + rowIndexOffset;
+    }
+    return "BlockMetaData{" + rowCount + ", " + totalByteSize + rowIndexOffsetStr + " " + columns + "}";
   }
 
   /**
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 6355f35..1e74353 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.filter2.recordlevel;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -315,7 +317,7 @@ public class PhoneBookWriter {
     }
   }
 
-  private static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
+  public static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
     Configuration conf = new Configuration();
     GroupWriteSupport.setSchema(schema, conf);
 
@@ -341,11 +343,24 @@ public class PhoneBookWriter {
   }
 
   public static List<User> readUsers(ParquetReader.Builder<Group> builder) throws IOException {
+    return readUsers(builder, false);
+  }
+
+  /**
+   * Returns a list of users from the underlying [[ParquetReader]] builder.
+   * If `validateRowIndexes` is set to true, this method will also validate the ROW_INDEXes for the
+   * rows read from ParquetReader - ROW_INDEX for a row should be same as underlying user id.
+   */
+  public static List<User> readUsers(ParquetReader.Builder<Group> builder, boolean validateRowIndexes) throws IOException {
     ParquetReader<Group> reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build();
 
     List<User> users = new ArrayList<>();
     for (Group group = reader.read(); group != null; group = reader.read()) {
-      users.add(userFromGroup(group));
+      User u = userFromGroup(group);
+      users.add(u);
+      if (validateRowIndexes) {
+        assertEquals(reader.getCurrentRowIndex(), u.id);
+      }
     }
     return users;
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index b07fccd..68a4e34 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -200,7 +200,7 @@ public class TestBloomFiltering {
       .useStatsFilter(useOtherFiltering)
       .useRecordFilter(useOtherFiltering)
       .useBloomFilter(useBloomFilter)
-      .useColumnIndexFilter(useOtherFiltering));
+      .useColumnIndexFilter(useOtherFiltering), true);
   }
 
   // Assumes that both lists are in the same order
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index 5e18105..0678cbf 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -248,7 +248,7 @@ public class TestColumnIndexFiltering {
         .useDictionaryFilter(useOtherFiltering)
         .useStatsFilter(useOtherFiltering)
         .useRecordFilter(useOtherFiltering)
-        .useColumnIndexFilter(useColumnIndexFilter));
+        .useColumnIndexFilter(useColumnIndexFilter), true);
   }
 
   private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering,
@@ -261,7 +261,7 @@ public class TestColumnIndexFiltering {
         .useStatsFilter(useOtherFiltering)
         .useRecordFilter(useOtherFiltering)
         .useColumnIndexFilter(useColumnIndexFilter)
-        .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
+        .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()), true);
   }
 
   private FileDecryptionProperties getFileDecryptionProperties() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
new file mode 100644
index 0000000..86f14a8
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestParquetReader {
+
+  private static final Path FILE_V1 = createTempFile();
+  private static final Path FILE_V2 = createTempFile();
+  private static final Path STATIC_FILE_WITHOUT_COL_INDEXES = createPathFromCP("/test-file-with-no-column-indexes-1.parquet");
+  private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(makeUsers(1000));
+
+  private final Path file;
+
+  private static Path createPathFromCP(String path) {
+    try {
+      return new Path(TestParquetReader.class.getResource(path).toURI());
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public TestParquetReader(Path file) {
+    this.file = file;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+      { FILE_V1 },
+      { FILE_V2 },
+      { STATIC_FILE_WITHOUT_COL_INDEXES } };
+    return Arrays.asList(data);
+  }
+
+  @BeforeClass
+  public static void createFiles() throws IOException {
+    writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0);
+    writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
+  @AfterClass
+  public static void deleteFiles() throws IOException {
+    deleteFile(FILE_V1);
+    deleteFile(FILE_V2);
+  }
+
+  private static void deleteFile(Path file) throws IOException {
+    file.getFileSystem(new Configuration()).delete(file, false);
+  }
+
+  public static List<PhoneBookWriter.User> makeUsers(int rowCount) {
+    List<PhoneBookWriter.User> users = new ArrayList<>();
+    for (int i = 0; i < rowCount; i++) {
+      PhoneBookWriter.Location location = null;
+      if (i % 3 == 1) {
+        location = new PhoneBookWriter.Location((double)i, (double) i * 2);
+      }
+      if (i % 3 == 2) {
+        location = new PhoneBookWriter.Location((double)i, null);
+      }
+      // row index of each row in the file is same as the user id.
+      users.add(new PhoneBookWriter.User(i, "p" + i, Arrays.asList(new PhoneBookWriter.PhoneNumber(i, "cell")), location));
+    }
+    return users;
+  }
+
+  private static Path createTempFile() {
+    try {
+      return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVersion parquetVersion) throws IOException {
+    int pageSize = DATA.size() / 10;     // Ensure that several pages will be created
+    int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+
+    PhoneBookWriter.write(ExampleParquetWriter.builder(file)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withWriterVersion(parquetVersion),
+      DATA);
+  }
+
+  private List<PhoneBookWriter.User> readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+    throws IOException {
+    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+      .withFilter(filter)
+      .useDictionaryFilter(useOtherFiltering)
+      .useStatsFilter(useOtherFiltering)
+      .useRecordFilter(useOtherFiltering)
+      .useColumnIndexFilter(useColumnIndexFilter), true);
+  }
+
+  @Test
+  public void testCurrentRowIndex() throws Exception {
+    ParquetReader<Group> reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP);
+    // Fetch row index without processing any row.
+    assertEquals(reader.getCurrentRowIndex(), -1);
+    reader.read();
+    assertEquals(reader.getCurrentRowIndex(), 0);
+    // calling the same API again and again should return same result.
+    assertEquals(reader.getCurrentRowIndex(), 0);
+
+    reader.read();
+    assertEquals(reader.getCurrentRowIndex(), 1);
+    assertEquals(reader.getCurrentRowIndex(), 1);
+    long expectedCurrentRowIndex = 2L;
+    while(reader.read() != null) {
+      assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex);
+      expectedCurrentRowIndex++;
+    }
+    // reader.read() returned null and so reader doesn't have any more rows.
+    assertEquals(reader.getCurrentRowIndex(), -1);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws Exception {
+    Set<Long> idSet = new HashSet<>();
+    idSet.add(123l);
+    idSet.add(567l);
+    // The readUsers also validates the rowIndex for each returned row.
+    List<PhoneBookWriter.User> filteredUsers1 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true);
+    assertEquals(filteredUsers1.size(), 2L);
+    List<PhoneBookWriter.User> filteredUsers2 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false);
+    assertEquals(filteredUsers2.size(), 2L);
+    List<PhoneBookWriter.User> filteredUsers3 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false);
+    assertEquals(filteredUsers3.size(), 1000L);
+  }
+
+  @Test
+  public void testNoFiltering() throws Exception {
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, false, false));
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, true, false));
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, false, true));
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, true, true));
+  }
+}
diff --git a/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet
new file mode 100644
index 0000000..722e687
Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet differ