You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/04/19 07:53:22 UTC

[parquet-mr] branch master updated: PARQUET-1982: Random access to row groups in ParquetFileReader (#871)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f54ba0  PARQUET-1982: Random access to row groups in ParquetFileReader (#871)
3f54ba0 is described below

commit 3f54ba09c36fec835d91d0ba1abfc4fd6e7fef3f
Author: fschmalzel <79...@users.noreply.github.com>
AuthorDate: Mon Apr 19 09:53:11 2021 +0200

    PARQUET-1982: Random access to row groups in ParquetFileReader (#871)
    
    Adds a method readRowGroup(BlockMetaData) to allow random access to
    PageReadStores via BlockMetaData, which can be obtained using the
    getRowGroups() method.
    
    This is similar to the existing method
    getDictionaryReader(BlockMetaData)
    that already exists.
    
    With random access the reader can be reused if for example someone
    needs to go back a row group. This would improve performance
    because we don't need to open the file again and read the metadata.
    
    Add test for filtered random access
    Reads all pages of a row group
    Checks all columns of a page
---
 .../apache/parquet/hadoop/ParquetFileReader.java   | 157 +++++++--
 .../hadoop/TestParquetReaderRandomAccess.java      | 387 +++++++++++++++++++++
 .../parquet/statistics/DataGenerationContext.java  |  85 +++++
 .../apache/parquet/statistics/RandomValues.java    |   4 +-
 .../apache/parquet/statistics/TestStatistics.java  |  56 ---
 5 files changed, 595 insertions(+), 94 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 791f9ef..7edec33 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -889,19 +889,46 @@ public class ParquetFileReader implements Closeable {
   }
 
   /**
+   * Reads all the columns requested from the row group at the specified block.
+   *
+   * @param blockIndex the index of the requested block
+   * @throws IOException if an error occurs while reading
+   * @return the PageReadStore which can provide PageReaders for each column.
+   */
+  public PageReadStore readRowGroup(int blockIndex) throws IOException {
+    return internalReadRowGroup(blockIndex);
+  }
+
+  /**
    * Reads all the columns requested from the row group at the current file position.
    * @throws IOException if an error occurs while reading
    * @return the PageReadStore which can provide PageReaders for each column.
    */
   public PageReadStore readNextRowGroup() throws IOException {
-    if (currentBlock == blocks.size()) {
+    ColumnChunkPageReadStore rowGroup = internalReadRowGroup(currentBlock);
+    if (rowGroup == null) {
       return null;
     }
-    BlockMetaData block = blocks.get(currentBlock);
+    this.currentRowGroup = rowGroup;
+    // avoid re-reading bytes the dictionary reader is used after this call
+    if (nextDictionaryReader != null) {
+      nextDictionaryReader.setRowGroup(currentRowGroup);
+    }
+
+    advanceToNextBlock();
+
+    return currentRowGroup;
+  }
+
+  private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException {
+    if (blockIndex < 0 || blockIndex >= blocks.size()) {
+      return null;
+    }
+    BlockMetaData block = blocks.get(blockIndex);
     if (block.getRowCount() == 0) {
       throw new RuntimeException("Illegal row group of 0 rows");
     }
-    this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount());
+    ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount());
     // prepare the list of consecutive parts to read them in one scan
     List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
     ConsecutivePartList currentParts = null;
@@ -920,22 +947,54 @@ public class ParquetFileReader implements Closeable {
       }
     }
     // actually read all the chunks
-    ChunkListBuilder builder = new ChunkListBuilder();
+    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     for (ConsecutivePartList consecutiveChunks : allParts) {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      readChunkPages(chunk, block);
+      readChunkPages(chunk, block, rowGroup);
     }
 
-    // avoid re-reading bytes the dictionary reader is used after this call
-    if (nextDictionaryReader != null) {
-      nextDictionaryReader.setRowGroup(currentRowGroup);
+    return rowGroup;
+  }
+
+  /**
+   * Reads all the columns requested from the specified row group. It may skip specific pages based on the column
+   * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row
+   * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
+   *
+   * @param blockIndex the index of the requested block
+   * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
+   * @throws IOException if an error occurs while reading
+   */
+  public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {
+    if (blockIndex < 0 || blockIndex >= blocks.size()) {
+      return null;
     }
 
-    advanceToNextBlock();
+    // Filtering not required -> fall back to the non-filtering path
+    if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
+      return internalReadRowGroup(blockIndex);
+    }
 
-    return currentRowGroup;
+    BlockMetaData block = blocks.get(blockIndex);
+    if (block.getRowCount() == 0) {
+      throw new RuntimeException("Illegal row group of 0 rows");
+    }
+
+    RowRanges rowRanges = getRowRanges(blockIndex);
+    long rowCount = rowRanges.rowCount();
+    if (rowCount == 0) {
+      // There are no matching rows -> returning null
+      return null;
+    }
+
+    if (rowCount == block.getRowCount()) {
+      // All rows are matching -> fall back to the non-filtering path
+      return internalReadRowGroup(blockIndex);
+    }
+
+    return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex));
   }
 
   /**
@@ -945,13 +1004,13 @@ public class ParquetFileReader implements Closeable {
    * details.
    *
    * @return the PageReadStore which can provide PageReaders for each column
-   * @throws IOException
-   *           if any I/O error occurs while reading
+   * @throws IOException if an error occurs while reading
    */
   public PageReadStore readNextFilteredRowGroup() throws IOException {
     if (currentBlock == blocks.size()) {
       return null;
     }
+    // Filtering not required -> fall back to the non-filtering path
     if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
       return readNextRowGroup();
     }
@@ -959,7 +1018,6 @@ public class ParquetFileReader implements Closeable {
     if (block.getRowCount() == 0) {
       throw new RuntimeException("Illegal row group of 0 rows");
     }
-    ColumnIndexStore ciStore = getColumnIndexStore(currentBlock);
     RowRanges rowRanges = getRowRanges(currentBlock);
     long rowCount = rowRanges.rowCount();
     if (rowCount == 0) {
@@ -972,9 +1030,22 @@ public class ParquetFileReader implements Closeable {
       return readNextRowGroup();
     }
 
-    this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges);
+    this.currentRowGroup = internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock));
+
+    // avoid re-reading bytes the dictionary reader is used after this call
+    if (nextDictionaryReader != null) {
+      nextDictionaryReader.setRowGroup(currentRowGroup);
+    }
+
+    advanceToNextBlock();
+
+    return this.currentRowGroup;
+  }
+
+  private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
+    ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges);
     // prepare the list of consecutive parts to read them in one scan
-    ChunkListBuilder builder = new ChunkListBuilder();
+    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
     List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
     ConsecutivePartList currentParts = null;
     for (ColumnChunkMetaData mc : block.getColumns()) {
@@ -1005,31 +1076,24 @@ public class ParquetFileReader implements Closeable {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      readChunkPages(chunk, block);
-    }
-
-    // avoid re-reading bytes the dictionary reader is used after this call
-    if (nextDictionaryReader != null) {
-      nextDictionaryReader.setRowGroup(currentRowGroup);
+      readChunkPages(chunk, block, rowGroup);
     }
 
-    advanceToNextBlock();
-
-    return currentRowGroup;
+    return rowGroup;
   }
 
-  private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException {
+  private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageReadStore rowGroup) throws IOException {
     if (null == fileDecryptor || fileDecryptor.plaintextFile()) {
-      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+      rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
       return;
     }
     // Encrypted file
     ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath());
     InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath);
     if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
-      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+      rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
     }  else { // encrypted column
-      currentRowGroup.addColumn(chunk.descriptor.col,
+      rowGroup.addColumn(chunk.descriptor.col,
           chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(),
               fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal()));
     }
@@ -1080,12 +1144,19 @@ public class ParquetFileReader implements Closeable {
    * @return a DictionaryPageReadStore for the next row group
    */
   public DictionaryPageReadStore getNextDictionaryReader() {
-    if (nextDictionaryReader == null && currentBlock < blocks.size()) {
-      this.nextDictionaryReader = getDictionaryReader(blocks.get(currentBlock));
+    if (nextDictionaryReader == null) {
+      this.nextDictionaryReader = getDictionaryReader(currentBlock);
     }
     return nextDictionaryReader;
   }
 
+  public DictionaryPageReader getDictionaryReader(int blockIndex) {
+    if (blockIndex < 0 || blockIndex >= blocks.size()) {
+      return null;
+    }
+    return new DictionaryPageReader(this, blocks.get(blockIndex));
+  }
+
   public DictionaryPageReader getDictionaryReader(BlockMetaData block) {
     return new DictionaryPageReader(this, block);
   }
@@ -1167,6 +1238,13 @@ public class ParquetFileReader implements Closeable {
         converter.getEncoding(dictHeader.getEncoding()));
   }
 
+  public BloomFilterReader getBloomFilterDataReader(int blockIndex) {
+    if (blockIndex < 0 || blockIndex >= blocks.size()) {
+      return null;
+    }
+    return new BloomFilterReader(this, blocks.get(blockIndex));
+  }
+
   public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
     return new BloomFilterReader(this, block);
   }
@@ -1315,8 +1393,13 @@ public class ParquetFileReader implements Closeable {
 
     private final Map<ChunkDescriptor, ChunkData> map = new HashMap<>();
     private ChunkDescriptor lastDescriptor;
+    private final long rowCount;
     private SeekableInputStream f;
 
+    public ChunkListBuilder(long rowCount) {
+      this.rowCount = rowCount;
+    }
+
     void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
       ChunkData data = map.get(descriptor);
       if (data == null) {
@@ -1345,9 +1428,9 @@ public class ParquetFileReader implements Closeable {
         ChunkData data = entry.getValue();
         if (descriptor.equals(lastDescriptor)) {
           // because of a bug, the last chunk might be larger than descriptor.size
-          chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex));
+          chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount));
         } else {
-          chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex));
+          chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount));
         }
       }
       return chunks;
@@ -1362,16 +1445,18 @@ public class ParquetFileReader implements Closeable {
     protected final ChunkDescriptor descriptor;
     protected final ByteBufferInputStream stream;
     final OffsetIndex offsetIndex;
+    final long rowCount;
 
     /**
      * @param descriptor descriptor for the chunk
      * @param buffers ByteBuffers that contain the chunk
      * @param offsetIndex the offset index for this column; might be null
      */
-    public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, OffsetIndex offsetIndex) {
+    public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, OffsetIndex offsetIndex, long rowCount) {
       this.descriptor = descriptor;
       this.stream = ByteBufferInputStream.wrap(buffers);
       this.offsetIndex = offsetIndex;
+      this.rowCount = rowCount;
     }
 
     protected PageHeader readPageHeader() throws IOException {
@@ -1518,7 +1603,7 @@ public class ParquetFileReader implements Closeable {
       }
       BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
       return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
-          blocks.get(currentBlock).getRowCount(), pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal);
+        rowCount, pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal);
     }
 
     private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
@@ -1556,8 +1641,8 @@ public class ParquetFileReader implements Closeable {
      * @param descriptor the descriptor of the chunk
      * @param f the file stream positioned at the end of this chunk
      */
-    private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex) {
-      super(descriptor, buffers, offsetIndex);
+    private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex, long rowCount) {
+      super(descriptor, buffers, offsetIndex, rowCount);
       this.f = f;
     }
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java
new file mode 100644
index 0000000..c2b5986
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.statistics.DataGenerationContext;
+import org.apache.parquet.statistics.RandomValues;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.*;
+
+/**
+ * This tests the random access methods of the ParquetFileReader, specifically:
+ * <ul>
+ *   <li>{@link ParquetFileReader#readRowGroup(int)}</li>
+ *   <li>{@link ParquetFileReader#readFilteredRowGroup(int)}</li>
+ * </ul>
+ *
+ *  For this we use two columns.
+ *  Column "i64" that starts at value 0 and counts up.
+ *  Column "i64_flip" that start at value 1 and flips between 1 and 0.
+ *
+ *  With these two column we can validate the read data without holding the written data in memory.
+ *  The "i64_flip" column is mainly used to test the filtering.
+ *  We filter "i64_flip" to be equal to one, that means all values in "i64" have to be even.
+ */
+public class TestParquetReaderRandomAccess {
+  private static final int KILOBYTE = 1 << 10;
+  private static final long RANDOM_SEED = 7174252115631550700L;
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void test() throws IOException {
+    Random random = new Random(RANDOM_SEED);
+
+    File file = temp.newFile("test_file.parquet");
+    file.delete();
+
+    int blockSize = 50 * KILOBYTE;
+    int pageSize = 2 * KILOBYTE;
+
+    List<DataContext> contexts = new ArrayList<>();
+
+    for (boolean enableDictionary : new boolean[]{false, true}) {
+      for (WriterVersion writerVersion : new WriterVersion[]{WriterVersion.PARQUET_1_0, WriterVersion.PARQUET_2_0}) {
+       contexts.add(
+         new DataContextRandom(random.nextLong(), file, blockSize,
+           pageSize, enableDictionary, writerVersion));
+       contexts.add(
+         new DataContextRandomAndSequential(random.nextLong(), file, blockSize,
+           pageSize, enableDictionary, writerVersion));
+      }
+    }
+
+    for (DataContext context : contexts) {
+      DataGenerationContext.writeAndTest(context);
+    }
+  }
+
+  public static class SequentialLongGenerator extends RandomValues.RandomValueGenerator<Long> {
+    private long value= 0;
+
+    protected SequentialLongGenerator() {
+      super(0L);
+    }
+
+    @Override
+    public Long nextValue() {
+      return value++;
+    }
+  }
+
+  public static class SequentialFlippingLongGenerator extends RandomValues.RandomValueGenerator<Long> {
+    private long value = 0;
+
+    protected SequentialFlippingLongGenerator() {
+      super(0L);
+    }
+
+    @Override
+    public Long nextValue() {
+      value = value == 0 ? 1 : 0;
+      return value;
+    }
+  }
+
+  public static abstract class DataContext extends DataGenerationContext.WriteContext {
+
+    private static final int recordCount = 1_000_000;
+
+    private final List<RandomValues.RandomValueGenerator<?>> randomGenerators;
+    private final Random random;
+    private final FilterCompat.Filter filter;
+
+    public DataContext(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException {
+      super(path, buildSchema(), blockSize, pageSize, enableDictionary, true, version);
+
+      this.random = new Random(seed);
+      this.randomGenerators = Arrays.asList(
+        new SequentialLongGenerator(),
+        new SequentialFlippingLongGenerator());
+
+      this.filter = FilterCompat.get(eq(longColumn("i64_flip"), 1L));
+    }
+
+    private static MessageType buildSchema() {
+      return new MessageType("schema",
+        new PrimitiveType(REQUIRED, INT64, "i64"),
+        new PrimitiveType(REQUIRED, INT64, "i64_flip"));
+    }
+
+    @Override
+    public void write(ParquetWriter<Group> writer) throws IOException {
+      for (int index = 0; index < recordCount; index++) {
+        Group group = new SimpleGroup(super.schema);
+
+        for (int column = 0, columnCnt = schema.getFieldCount(); column < columnCnt; ++column) {
+          Type type = schema.getType(column);
+          RandomValues.RandomValueGenerator<?> generator = randomGenerators.get(column);
+          if (type.isRepetition(OPTIONAL) && generator.shouldGenerateNull()) {
+            continue;
+          }
+          group.append(type.getName(), (Long) generator.nextValue());
+        }
+        writer.write(group);
+      }
+    }
+
+    @Override
+    public void test() throws IOException {
+      Configuration configuration = new Configuration();
+      ParquetReadOptions options = ParquetReadOptions.builder().build();
+
+      ParquetReadOptions filterOptions = ParquetReadOptions.builder()
+        .copy(options)
+        .withRecordFilter(filter)
+        .useDictionaryFilter(true)
+        .useStatsFilter(true)
+        .useRecordFilter(true)
+        .useColumnIndexFilter(true)
+        .build();
+
+      List<Long> fromNumber = new ArrayList<>();
+      List<Long> toNumber = new ArrayList<>();
+      int blockAmount;
+
+      try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) {
+        blockAmount = reader.getRowGroups().size();
+        PageReadStore pages;
+        while ((pages = reader.readNextRowGroup()) != null) {
+          MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema);
+          RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema));
+          long rowCount = pages.getRowCount();
+          long from = recordReader.read().getLong("i64", 0);
+          for (int i = 1; i < rowCount - 1; i++) {
+            recordReader.read();
+          }
+          Group group = recordReader.read();
+          long to;
+          if (group == null) {
+            to = from;
+          } else {
+            to = group.getLong("i64", 0);
+          }
+          fromNumber.add(from);
+          toNumber.add(to);
+        }
+      }
+
+      // Randomize indexes
+      List<Integer> indexes = new ArrayList<>();
+      for (int j = 0; j < 4; j++) {
+        for (int i = 0; i < blockAmount; i++) {
+          indexes.add(i);
+        }
+        indexes.add(-1);
+        indexes.add(blockAmount);
+        indexes.add(blockAmount + 1);
+      }
+
+      Collections.shuffle(indexes, random);
+
+      try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) {
+        test(reader, indexes, fromNumber, toNumber, blockAmount);
+      }
+
+      try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), filterOptions)) {
+        testFiltered(reader, indexes, fromNumber, toNumber, blockAmount);
+      }
+    }
+
+    public void assertValues(PageReadStore pages, List<Long> fromNumber, List<Long> toNumber, int index, int blockAmount) {
+      if (index < 0 || index >= blockAmount) {
+        assertNull(pages);
+        return;
+      }
+
+      long firstValue = fromNumber.get(index);
+      long lastValue = toNumber.get(index);
+
+      MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema);
+      RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema));
+      for (long i = firstValue; i <= lastValue; i++) {
+        Group group = recordReader.read();
+        assertEquals(i, group.getLong("i64", 0));
+        assertEquals((i % 2) == 0 ? 1 : 0, group.getLong("i64_flip", 0));
+      }
+      boolean exceptionThrown = false;
+      try {
+        recordReader.read();
+      } catch (ParquetDecodingException e) {
+        exceptionThrown = true;
+      }
+      assertTrue(exceptionThrown);
+    }
+
+    public void assertFilteredValues(PageReadStore pages, List<Long> fromNumber, List<Long> toNumber, int index, int blockAmount) {
+      if (index < 0 || index >= blockAmount) {
+        assertNull(pages);
+        return;
+      }
+
+      long firstValue = fromNumber.get(index);
+      long lastValue = toNumber.get(index);
+
+      MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema);
+      RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema), filter);
+
+      for (long i = firstValue; i <= lastValue; i++) {
+        Group group = recordReader.read();
+        if ((i % 2) == 0) {
+          assertEquals(i, group.getLong("i64", 0));
+          assertEquals(1, group.getLong("i64_flip", 0));
+        } else {
+          assertTrue(group == null || recordReader.shouldSkipCurrentRecord());
+        }
+      }
+
+      boolean exceptionThrown = false;
+      try {
+        recordReader.read();
+      } catch (ParquetDecodingException e) {
+        exceptionThrown = true;
+      }
+      assertTrue(exceptionThrown);
+    }
+
+    protected abstract void test(ParquetFileReader reader, List<Integer> indexes, List<Long> fromNumber, List<Long> toNumber, int blockAmount) throws IOException;
+    protected abstract void testFiltered(ParquetFileReader reader, List<Integer> indexes, List<Long> fromNumber, List<Long> toNumber, int blockAmount) throws IOException;
+  }
+
+  public static class DataContextRandom extends DataContext {
+
+    public DataContextRandom(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException {
+      super(seed, path, blockSize, pageSize, enableDictionary, version);
+    }
+
+    @Override
+    protected void test(ParquetFileReader reader, List<Integer> indexes, List<Long> fromNumber, List<Long> toNumber, int blockAmount) throws IOException {
+      for (int index: indexes) {
+        PageReadStore pages = reader.readRowGroup(index);
+        assertValues(pages, fromNumber, toNumber, index, blockAmount);
+      }
+    }
+
+    @Override
+    protected void testFiltered(ParquetFileReader reader, List<Integer> indexes, List<Long> fromNumber, List<Long> toNumber, int blockAmount) throws IOException {
+      for (int index: indexes) {
+        PageReadStore pages = reader.readFilteredRowGroup(index);
+        assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount);
+      }
+    }
+  }
+
+  public static class DataContextRandomAndSequential extends DataContext {
+
+    public DataContextRandomAndSequential(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException {
+      super(seed, path, blockSize, pageSize, enableDictionary, version);
+    }
+
+    @Override
+    protected void test(ParquetFileReader reader, List<Integer> indexes, List<Long> fromNumber, List<Long> toNumber, int blockAmount) throws IOException {
+      int splitPoint = indexes.size()/2;
+
+      {
+        PageReadStore pages = reader.readNextRowGroup();
+        assertValues(pages, fromNumber, toNumber, 0 , blockAmount);
+      }
+      for (int i = 0; i < splitPoint; i++) {
+        int index = indexes.get(i);
+        PageReadStore pages = reader.readRowGroup(index);
+        assertValues(pages, fromNumber, toNumber, index, blockAmount);
+      }
+      {
+        PageReadStore pages = reader.readNextRowGroup();
+        assertValues(pages, fromNumber, toNumber, 1 , blockAmount);
+      }
+      for (int i = splitPoint; i < indexes.size(); i++) {
+        int index = indexes.get(i);
+        PageReadStore pages = reader.readRowGroup(index);
+        assertValues(pages, fromNumber, toNumber, index, blockAmount);
+      }
+      {
+        PageReadStore pages = reader.readNextRowGroup();
+        assertValues(pages, fromNumber, toNumber, 2 , blockAmount);
+      }
+    }
+
+    @Override
+    protected void testFiltered(ParquetFileReader reader, List<Integer> indexes, List<Long> fromNumber, List<Long> toNumber, int blockAmount) throws IOException {
+      int splitPoint = indexes.size()/2;
+
+      {
+        PageReadStore pages = reader.readNextFilteredRowGroup();
+        assertFilteredValues(pages, fromNumber, toNumber, 0, blockAmount);
+      }
+      for (int i = 0; i < splitPoint; i++) {
+        int index = indexes.get(i);
+        PageReadStore pages = reader.readFilteredRowGroup(index);
+        assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount);
+      }
+      {
+        PageReadStore pages = reader.readNextFilteredRowGroup();
+        assertFilteredValues(pages, fromNumber, toNumber, 1, blockAmount);
+      }
+      for (int i = splitPoint; i < indexes.size(); i++) {
+        int index = indexes.get(i);
+        PageReadStore pages = reader.readFilteredRowGroup(index);
+        assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount);
+      }
+      {
+        PageReadStore pages = reader.readNextFilteredRowGroup();
+        assertFilteredValues(pages, fromNumber, toNumber, 2, blockAmount);
+      }
+    }
+  }
+
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java
new file mode 100644
index 0000000..c3c61c4
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.statistics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.File;
+import java.io.IOException;
+
+public class DataGenerationContext {
+  public static abstract class WriteContext {
+    protected final File path;
+    protected final Path fsPath;
+    protected final MessageType schema;
+    protected final int blockSize;
+    protected final int pageSize;
+    protected final boolean enableDictionary;
+    protected final boolean enableValidation;
+    protected final ParquetProperties.WriterVersion version;
+
+    public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException {
+      this.path = path;
+      this.fsPath = new Path(path.toString());
+      this.schema = schema;
+      this.blockSize = blockSize;
+      this.pageSize = pageSize;
+      this.enableDictionary = enableDictionary;
+      this.enableValidation = enableValidation;
+      this.version = version;
+    }
+
+    public abstract void write(ParquetWriter<Group> writer) throws IOException;
+
+    public abstract void test() throws IOException;
+  }
+
+  public static void writeAndTest(WriteContext context) throws IOException {
+    // Create the configuration, and then apply the schema to our configuration.
+    Configuration configuration = new Configuration();
+    GroupWriteSupport.setSchema(context.schema, configuration);
+    GroupWriteSupport groupWriteSupport = new GroupWriteSupport();
+
+    // Create the writer properties
+    final int blockSize = context.blockSize;
+    final int pageSize = context.pageSize;
+    final int dictionaryPageSize = pageSize;
+    final boolean enableDictionary = context.enableDictionary;
+    final boolean enableValidation = context.enableValidation;
+    ParquetProperties.WriterVersion writerVersion = context.version;
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+    try (ParquetWriter<Group> writer = new ParquetWriter<Group>(context.fsPath,
+      groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize,
+      enableDictionary, enableValidation, writerVersion, configuration)) {
+      context.write(writer);
+    }
+
+    context.test();
+
+    context.path.delete();
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
index 191e397..9545aef 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
@@ -33,7 +33,7 @@ import org.apache.parquet.io.api.Binary;
 public class RandomValues {
   private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890";
 
-  static abstract class RandomValueGenerator<T extends Comparable<T>> implements Supplier<T> {
+  public static abstract class RandomValueGenerator<T extends Comparable<T>> implements Supplier<T> {
     private final Random random;
 
     protected RandomValueGenerator(long seed) {
@@ -94,7 +94,7 @@ public class RandomValues {
     }
   }
 
-  static abstract class RandomBinaryBase<T extends Comparable<T>> extends RandomValueGenerator<T> {
+  public static abstract class RandomBinaryBase<T extends Comparable<T>> extends RandomValueGenerator<T> {
     protected final int bufferLength;
     protected final byte[] buffer;
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java
index 91e1735..bd4ae4e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java
@@ -20,7 +20,6 @@
 package org.apache.parquet.statistics;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.impl.ColumnReaderImpl;
@@ -36,8 +35,6 @@ import org.apache.parquet.example.data.simple.SimpleGroup;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.PrimitiveConverter;
@@ -72,59 +69,6 @@ public class TestStatistics {
   private static final int MEGABYTE = 1 << 20;
   private static final long RANDOM_SEED = 1441990701846L; //System.currentTimeMillis();
 
-  public static class DataGenerationContext {
-    public static abstract class WriteContext {
-      protected final File path;
-      protected final Path fsPath;
-      protected final MessageType schema;
-      protected final int blockSize;
-      protected final int pageSize;
-      protected final boolean enableDictionary;
-      protected final boolean enableValidation;
-      protected final ParquetProperties.WriterVersion version;
-
-      public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException {
-        this.path = path;
-        this.fsPath = new Path(path.toString());
-        this.schema = schema;
-        this.blockSize = blockSize;
-        this.pageSize = pageSize;
-        this.enableDictionary = enableDictionary;
-        this.enableValidation = enableValidation;
-        this.version = version;
-      }
-
-      public abstract void write(ParquetWriter<Group> writer) throws IOException;
-      public abstract void test() throws IOException;
-    }
-
-    public static void writeAndTest(WriteContext context) throws IOException {
-      // Create the configuration, and then apply the schema to our configuration.
-      Configuration configuration = new Configuration();
-      GroupWriteSupport.setSchema(context.schema, configuration);
-      GroupWriteSupport groupWriteSupport = new GroupWriteSupport();
-
-      // Create the writer properties
-      final int blockSize = context.blockSize;
-      final int pageSize = context.pageSize;
-      final int dictionaryPageSize = pageSize;
-      final boolean enableDictionary = context.enableDictionary;
-      final boolean enableValidation = context.enableValidation;
-      ParquetProperties.WriterVersion writerVersion = context.version;
-      CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
-
-      try (ParquetWriter<Group> writer = new ParquetWriter<Group>(context.fsPath,
-          groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize,
-          enableDictionary, enableValidation, writerVersion, configuration)) {
-        context.write(writer);
-      }
-
-      context.test();
-
-      context.path.delete();
-    }
-  }
-
   public static class SingletonPageReader implements PageReader {
     private final DictionaryPage dict;
     private final DataPage data;