You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/08/17 15:06:52 UTC

[parquet-mr] branch column-indexes updated: PARQUET-1310: Column indexes: Filtering (#509)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/column-indexes by this push:
     new d8e78eb  PARQUET-1310: Column indexes: Filtering (#509)
d8e78eb is described below

commit d8e78ebd8ea6fe9fc2a9a7e61d1eaa7121637ebd
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Aug 17 17:06:48 2018 +0200

    PARQUET-1310: Column indexes: Filtering (#509)
---
 .../org/apache/parquet/column/ColumnReader.java    |   3 +
 .../parquet/column/impl/ColumnReadStoreImpl.java   |  11 +-
 ...ColumnReaderImpl.java => ColumnReaderBase.java} |  74 ++-
 .../parquet/column/impl/ColumnReaderImpl.java      | 676 +--------------------
 .../column/impl/SynchronizingColumnReader.java     |  93 +++
 .../org/apache/parquet/column/page/DataPage.java   |  35 ++
 .../org/apache/parquet/column/page/DataPageV1.java |  21 +
 .../org/apache/parquet/column/page/DataPageV2.java |  48 +-
 ...e.java => NotInPageFilteringModeException.java} |  34 +-
 .../apache/parquet/column/page/PageReadStore.java  |  25 +-
 .../columnindex/BinaryColumnIndexBuilder.java      |  31 +-
 .../columnindex/BooleanColumnIndexBuilder.java     |  36 +-
 .../internal/column/columnindex/BoundaryOrder.java | 330 +++++++++-
 .../internal/column/columnindex/ColumnIndex.java   |   9 +-
 .../column/columnindex/ColumnIndexBuilder.java     | 290 +++++++--
 .../columnindex/DoubleColumnIndexBuilder.java      |  35 +-
 .../columnindex/FloatColumnIndexBuilder.java       |  35 +-
 .../internal/column/columnindex/IndexIterator.java |  98 +++
 .../column/columnindex/IntColumnIndexBuilder.java  |  35 +-
 .../column/columnindex/LongColumnIndexBuilder.java |  35 +-
 .../internal/column/columnindex/OffsetIndex.java   |  12 +
 .../filter2/columnindex/ColumnIndexFilter.java     | 194 ++++++
 .../filter2/columnindex/ColumnIndexStore.java      |  55 ++
 .../internal/filter2/columnindex/RowRanges.java    | 251 ++++++++
 .../column/columnindex/TestBoundaryOrder.java      | 487 +++++++++++++++
 .../column/columnindex/TestColumnIndexBuilder.java | 488 ++++++++++++++-
 .../column/columnindex/TestIndexIterator.java      |  63 ++
 .../column/columnindex/TestOffsetIndexBuilder.java |   2 +
 .../filter2/columnindex/TestColumnIndexFilter.java | 464 ++++++++++++++
 .../filter2/columnindex/TestRowRanges.java         | 155 +++++
 .../java/org/apache/parquet/HadoopReadOptions.java |   9 +-
 .../org/apache/parquet/ParquetReadOptions.java     |  20 +-
 .../parquet/hadoop/ColumnChunkPageReadStore.java   | 123 +++-
 .../parquet/hadoop/ColumnIndexFilterUtils.java     | 157 +++++
 .../parquet/hadoop/ColumnIndexStoreImpl.java       | 155 +++++
 .../hadoop/InternalParquetRecordReader.java        |   6 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   | 293 +++++++--
 .../apache/parquet/hadoop/ParquetInputFormat.java  |   5 +
 .../org/apache/parquet/hadoop/ParquetReader.java   |  10 +
 .../hadoop/metadata/ColumnChunkMetaData.java       |   2 -
 .../filter2/recordlevel/PhoneBookWriter.java       | 105 +++-
 .../parquet/hadoop/TestColumnIndexFiltering.java   | 442 ++++++++++++++
 42 files changed, 4539 insertions(+), 913 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
index 52d269e..6d93eee 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
@@ -41,7 +41,10 @@ public interface ColumnReader {
 
   /**
    * @return the totalCount of values to be consumed
+   * @deprecated will be removed in 2.0.0; Total values might not be able to be counted before reading the values (e.g.
+   *             in case of column index based filtering)
    */
+  @Deprecated
   long getTotalValueCount();
 
   /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 3784596..8066564 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -72,12 +72,13 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
 
   @Override
   public ColumnReader getColumnReader(ColumnDescriptor path) {
-    return newMemColumnReader(path, pageReadStore.getPageReader(path));
-  }
-
-  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
     PrimitiveConverter converter = getPrimitiveConverter(path);
-    return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
+    PageReader pageReader = pageReadStore.getPageReader(path);
+    if (pageReadStore.isInPageFilteringMode()) {
+      return new SynchronizingColumnReader(path, pageReader, converter, writerVersion, pageReadStore.getRowIndexes());
+    } else {
+      return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
+    }
   }
 
   private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
similarity index 92%
copy from parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
copy to parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
index 8c85b37..0af85c7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
@@ -52,10 +52,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * ColumnReader implementation
+ * Base superclass for {@link ColumnReader} implementations.
  */
-public class ColumnReaderImpl implements ColumnReader {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderImpl.class);
+abstract class ColumnReaderBase implements ColumnReader {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderBase.class);
 
   /**
    * binds the lower level page decoder to the record converter materializing the records
@@ -148,6 +148,7 @@ public class ColumnReaderImpl implements ColumnReader {
 
   private final PrimitiveConverter converter;
   private Binding binding;
+  private final int maxDefinitionLevel;
 
   // this is needed because we will attempt to read the value twice when filtering
   // TODO: rework that
@@ -328,11 +329,12 @@ public class ColumnReaderImpl implements ColumnReader {
    * @param converter a converter that materializes the values in this column in the current record
    * @param writerVersion writer version string from the Parquet file being read
    */
-  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
+  ColumnReaderBase(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
     this.path = checkNotNull(path, "path");
     this.pageReader = checkNotNull(pageReader, "pageReader");
     this.converter = checkNotNull(converter, "converter");
     this.writerVersion = writerVersion;
+    this.maxDefinitionLevel = path.getMaxDefinitionLevel();
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
     if (dictionaryPage != null) {
       try {
@@ -350,10 +352,9 @@ public class ColumnReaderImpl implements ColumnReader {
     if (totalValueCount <= 0) {
       throw new ParquetDecodingException("totalValueCount '" + totalValueCount + "' <= 0");
     }
-    consume();
   }
 
-  private boolean isFullyConsumed() {
+  boolean isFullyConsumed() {
     return readValues >= totalValueCount;
   }
 
@@ -508,25 +509,33 @@ public class ColumnReaderImpl implements ColumnReader {
     return definitionLevel;
   }
 
-  // TODO: change the logic around read() to not tie together reading from the 3 columns
-  private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.nextInt();
-    definitionLevel = definitionLevelColumn.nextInt();
-    ++readValues;
-  }
-
   private void checkRead() {
-    if (isPageFullyConsumed()) {
-      if (isFullyConsumed()) {
-        LOG.debug("end reached");
-        repetitionLevel = 0; // the next repetition level
-        return;
+    int rl, dl;
+    for (;;) {
+      if (isPageFullyConsumed()) {
+        if (isFullyConsumed()) {
+          LOG.debug("end reached");
+          repetitionLevel = 0; // the next repetition level
+          return;
+        }
+        readPage();
+      }
+      rl = repetitionLevelColumn.nextInt();
+      dl = definitionLevelColumn.nextInt();
+      ++readValues;
+      if (!skipLevels(rl, dl)) {
+        break;
+      }
+      if (dl == maxDefinitionLevel) {
+        binding.skip();
       }
-      readPage();
     }
-    readRepetitionAndDefinitionLevels();
+    repetitionLevel = rl;
+    definitionLevel = dl;
   }
 
+  abstract boolean skipLevels(int rl, int dl);
+
   private void readPage() {
     LOG.debug("loading page");
     DataPage page = pageReader.readPage();
@@ -585,32 +594,42 @@ public class ColumnReaderImpl implements ColumnReader {
     ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
     this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
     this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    int valueCount = page.getValueCount();
     try {
       BytesInput bytes = page.getBytes();
-      LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
+      LOG.debug("page size {} bytes and {} values", bytes.size(), valueCount);
       LOG.debug("reading repetition levels at 0");
       ByteBufferInputStream in = bytes.toInputStream();
-      rlReader.initFromPage(pageValueCount, in);
+      rlReader.initFromPage(valueCount, in);
       LOG.debug("reading definition levels at {}", in.position());
-      dlReader.initFromPage(pageValueCount, in);
+      dlReader.initFromPage(valueCount, in);
       LOG.debug("reading data at {}", in.position());
-      initDataReader(page.getValueEncoding(), in, page.getValueCount());
+      initDataReader(page.getValueEncoding(), in, valueCount);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }
+    newPageInitialized(page);
   }
 
   private void readPageV2(DataPageV2 page) {
     this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
     this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
-    LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
+    int valueCount = page.getValueCount();
+    LOG.debug("page data size {} bytes and {} values", page.getData().size(), valueCount);
     try {
-      initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
+      initDataReader(page.getDataEncoding(), page.getData().toInputStream(), valueCount);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }
+    newPageInitialized(page);
+  }
+
+  final int getPageValueCount() {
+    return pageValueCount;
   }
 
+  abstract void newPageInitialized(DataPage page);
+
   private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
     try {
       if (maxLevel == 0) {
@@ -625,7 +644,7 @@ public class ColumnReaderImpl implements ColumnReader {
     }
   }
 
-  private boolean isPageFullyConsumed() {
+  boolean isPageFullyConsumed() {
     return readValues >= endOfPageValueCount;
   }
 
@@ -643,6 +662,7 @@ public class ColumnReaderImpl implements ColumnReader {
    * {@inheritDoc}
    * @see org.apache.parquet.column.ColumnReader#getTotalValueCount()
    */
+  @Deprecated
   @Override
   public long getTotalValueCount() {
     return totalValueCount;
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
index 8c85b37..b32fb0c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,675 +18,41 @@
  */
 package org.apache.parquet.column.impl;
 
-import static java.lang.String.format;
-import static org.apache.parquet.Preconditions.checkNotNull;
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-import java.io.IOException;
-
-import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.VersionParser.ParsedVersion;
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnReader;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.RequiresPreviousReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * ColumnReader implementation
+ * ColumnReader implementation for the scenario when column indexes are not used (all values are read)
  */
-public class ColumnReaderImpl implements ColumnReader {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderImpl.class);
-
-  /**
-   * binds the lower level page decoder to the record converter materializing the records
-   */
-  private static abstract class Binding {
-
-    /**
-     * read one value from the underlying page
-     */
-    abstract void read();
-
-    /**
-     * skip one value from the underlying page
-     */
-    abstract void skip();
-
-    /**
-     * write current value to converter
-     */
-    abstract void writeValue();
-
-    /**
-     * @return current value
-     */
-    public int getDictionaryId() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public int getInteger() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public boolean getBoolean() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public long getLong() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public Binary getBinary() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public float getFloat() {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @return current value
-     */
-    public double getDouble() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private final ParsedVersion writerVersion;
-  private final ColumnDescriptor path;
-  private final long totalValueCount;
-  private final PageReader pageReader;
-  private final Dictionary dictionary;
-
-  private IntIterator repetitionLevelColumn;
-  private IntIterator definitionLevelColumn;
-  protected ValuesReader dataColumn;
-  private Encoding currentEncoding;
-
-  private int repetitionLevel;
-  private int definitionLevel;
-  private int dictionaryId;
-
-  private long endOfPageValueCount;
-  private long readValues = 0;
-  private int pageValueCount = 0;
-
-  private final PrimitiveConverter converter;
-  private Binding binding;
-
-  // this is needed because we will attempt to read the value twice when filtering
-  // TODO: rework that
-  private boolean valueRead;
-
-  private void bindToDictionary(final Dictionary dictionary) {
-    binding =
-        new Binding() {
-          void read() {
-            dictionaryId = dataColumn.readValueDictionaryId();
-          }
-          public void skip() {
-            dataColumn.skip();
-          }
-          public int getDictionaryId() {
-            return dictionaryId;
-          }
-          void writeValue() {
-            converter.addValueFromDictionary(dictionaryId);
-          }
-          public int getInteger() {
-            return dictionary.decodeToInt(dictionaryId);
-          }
-          public boolean getBoolean() {
-            return dictionary.decodeToBoolean(dictionaryId);
-          }
-          public long getLong() {
-            return dictionary.decodeToLong(dictionaryId);
-          }
-          public Binary getBinary() {
-            return dictionary.decodeToBinary(dictionaryId);
-          }
-          public float getFloat() {
-            return dictionary.decodeToFloat(dictionaryId);
-          }
-          public double getDouble() {
-            return dictionary.decodeToDouble(dictionaryId);
-          }
-        };
-  }
-
-  private void bind(PrimitiveTypeName type) {
-    binding = type.convert(new PrimitiveTypeNameConverter<Binding, RuntimeException>() {
-      @Override
-      public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          float current;
-          void read() {
-            current = dataColumn.readFloat();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          public float getFloat() {
-            return current;
-          }
-          void writeValue() {
-            converter.addFloat(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          double current;
-          void read() {
-            current = dataColumn.readDouble();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          public double getDouble() {
-            return current;
-          }
-          void writeValue() {
-            converter.addDouble(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          int current;
-          void read() {
-            current = dataColumn.readInteger();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          @Override
-          public int getInteger() {
-            return current;
-          }
-          void writeValue() {
-            converter.addInt(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          long current;
-          void read() {
-            current = dataColumn.readLong();
-          }
-          public void skip() {
-            current = 0;
-            dataColumn.skip();
-          }
-          @Override
-          public long getLong() {
-            return current;
-          }
-          void writeValue() {
-            converter.addLong(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return this.convertBINARY(primitiveTypeName);
-      }
-      @Override
-      public Binding convertFIXED_LEN_BYTE_ARRAY(
-          PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return this.convertBINARY(primitiveTypeName);
-      }
-      @Override
-      public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          boolean current;
-          void read() {
-            current = dataColumn.readBoolean();
-          }
-          public void skip() {
-            current = false;
-            dataColumn.skip();
-          }
-          @Override
-          public boolean getBoolean() {
-            return current;
-          }
-          void writeValue() {
-            converter.addBoolean(current);
-          }
-        };
-      }
-      @Override
-      public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        return new Binding() {
-          Binary current;
-          void read() {
-            current = dataColumn.readBytes();
-          }
-          public void skip() {
-            current = null;
-            dataColumn.skip();
-          }
-          @Override
-          public Binary getBinary() {
-            return current;
-          }
-          void writeValue() {
-            converter.addBinary(current);
-          }
-        };
-      }
-    });
-  }
+public class ColumnReaderImpl extends ColumnReaderBase {
 
   /**
    * creates a reader for triplets
-   * @param path the descriptor for the corresponding column
-   * @param pageReader the underlying store to read from
-   * @param converter a converter that materializes the values in this column in the current record
-   * @param writerVersion writer version string from the Parquet file being read
-   */
-  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
-    this.path = checkNotNull(path, "path");
-    this.pageReader = checkNotNull(pageReader, "pageReader");
-    this.converter = checkNotNull(converter, "converter");
-    this.writerVersion = writerVersion;
-    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-    if (dictionaryPage != null) {
-      try {
-        this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
-        if (converter.hasDictionarySupport()) {
-          converter.setDictionary(dictionary);
-        }
-      } catch (IOException e) {
-        throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
-      }
-    } else {
-      this.dictionary = null;
-    }
-    this.totalValueCount = pageReader.getTotalValueCount();
-    if (totalValueCount <= 0) {
-      throw new ParquetDecodingException("totalValueCount '" + totalValueCount + "' <= 0");
-    }
+   * 
+   * @param path
+   *          the descriptor for the corresponding column
+   * @param pageReader
+   *          the underlying store to read from
+   * @param converter
+   *          a converter that materializes the values in this column in the current record
+   * @param writerVersion
+   *          writer version string from the Parquet file being read
+   */
+  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter,
+      ParsedVersion writerVersion) {
+    super(path, pageReader, converter, writerVersion);
     consume();
   }
 
-  private boolean isFullyConsumed() {
-    return readValues >= totalValueCount;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#writeCurrentValueToConverter()
-   */
-  @Override
-  public void writeCurrentValueToConverter() {
-    readValue();
-    this.binding.writeValue();
-  }
-
-  @Override
-  public int getCurrentValueDictionaryID() {
-    readValue();
-    return binding.getDictionaryId();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getInteger()
-   */
-  @Override
-  public int getInteger() {
-    readValue();
-    return this.binding.getInteger();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getBoolean()
-   */
-  @Override
-  public boolean getBoolean() {
-    readValue();
-    return this.binding.getBoolean();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getLong()
-   */
-  @Override
-  public long getLong() {
-    readValue();
-    return this.binding.getLong();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getBinary()
-   */
-  @Override
-  public Binary getBinary() {
-    readValue();
-    return this.binding.getBinary();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getFloat()
-   */
-  @Override
-  public float getFloat() {
-    readValue();
-    return this.binding.getFloat();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getDouble()
-   */
-  @Override
-  public double getDouble() {
-    readValue();
-    return this.binding.getDouble();
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getCurrentRepetitionLevel()
-   */
-  @Override
-  public int getCurrentRepetitionLevel() {
-    return repetitionLevel;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getDescriptor()
-   */
-  @Override
-  public ColumnDescriptor getDescriptor() {
-    return path;
-  }
-
-  /**
-   * Reads the value into the binding.
-   */
-  public void readValue() {
-    try {
-      if (!valueRead) {
-        binding.read();
-        valueRead = true;
-      }
-    } catch (RuntimeException e) {
-      if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, currentEncoding) &&
-          e instanceof ArrayIndexOutOfBoundsException) {
-        // this is probably PARQUET-246, which may happen if reading data with
-        // MR because this can't be detected without reading all footers
-        throw new ParquetDecodingException("Read failure possibly due to " +
-            "PARQUET-246: try setting parquet.split.files to false",
-            new ParquetDecodingException(
-                format("Can't read value in column %s at value %d out of %d, " +
-                        "%d out of %d in currentPage. repetition level: " +
-                        "%d, definition level: %d",
-                    path, readValues, totalValueCount,
-                    readValues - (endOfPageValueCount - pageValueCount),
-                    pageValueCount, repetitionLevel, definitionLevel),
-                e));
-      }
-      throw new ParquetDecodingException(
-          format("Can't read value in column %s at value %d out of %d, " +
-                  "%d out of %d in currentPage. repetition level: " +
-                  "%d, definition level: %d",
-              path, readValues, totalValueCount,
-              readValues - (endOfPageValueCount - pageValueCount),
-              pageValueCount, repetitionLevel, definitionLevel),
-          e);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#skip()
-   */
-  @Override
-  public void skip() {
-    if (!valueRead) {
-      binding.skip();
-      valueRead = true;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getCurrentDefinitionLevel()
-   */
-  @Override
-  public int getCurrentDefinitionLevel() {
-    return definitionLevel;
-  }
-
-  // TODO: change the logic around read() to not tie together reading from the 3 columns
-  private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.nextInt();
-    definitionLevel = definitionLevelColumn.nextInt();
-    ++readValues;
-  }
-
-  private void checkRead() {
-    if (isPageFullyConsumed()) {
-      if (isFullyConsumed()) {
-        LOG.debug("end reached");
-        repetitionLevel = 0; // the next repetition level
-        return;
-      }
-      readPage();
-    }
-    readRepetitionAndDefinitionLevels();
-  }
-
-  private void readPage() {
-    LOG.debug("loading page");
-    DataPage page = pageReader.readPage();
-    page.accept(new DataPage.Visitor<Void>() {
-      @Override
-      public Void visit(DataPageV1 dataPageV1) {
-        readPageV1(dataPageV1);
-        return null;
-      }
-      @Override
-      public Void visit(DataPageV2 dataPageV2) {
-        readPageV2(dataPageV2);
-        return null;
-      }
-    });
-  }
-
-  private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
-    ValuesReader previousReader = this.dataColumn;
-
-    this.currentEncoding = dataEncoding;
-    this.pageValueCount = valueCount;
-    this.endOfPageValueCount = readValues + pageValueCount;
-
-    if (dataEncoding.usesDictionary()) {
-      if (dictionary == null) {
-        throw new ParquetDecodingException(
-            "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
-      }
-      this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
-    } else {
-      this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
-    }
-
-    if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
-      bindToDictionary(dictionary);
-    } else {
-      bind(path.getType());
-    }
-
-    try {
-      dataColumn.initFromPage(pageValueCount, in);
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page in col " + path, e);
-    }
-
-    if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
-        previousReader != null && previousReader instanceof RequiresPreviousReader) {
-      // previous reader can only be set if reading sequentially
-      ((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
-    }
-  }
-
-  private void readPageV1(DataPageV1 page) {
-    ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
-    ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
-    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-    try {
-      BytesInput bytes = page.getBytes();
-      LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
-      LOG.debug("reading repetition levels at 0");
-      ByteBufferInputStream in = bytes.toInputStream();
-      rlReader.initFromPage(pageValueCount, in);
-      LOG.debug("reading definition levels at {}", in.position());
-      dlReader.initFromPage(pageValueCount, in);
-      LOG.debug("reading data at {}", in.position());
-      initDataReader(page.getValueEncoding(), in, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
-    }
-  }
-
-  private void readPageV2(DataPageV2 page) {
-    this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
-    this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
-    LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
-    try {
-      initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
-    }
-  }
-
-  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-    try {
-      if (maxLevel == 0) {
-        return new NullIntIterator();
-      }
-      return new RLEIntIterator(
-          new RunLengthBitPackingHybridDecoder(
-              BytesUtils.getWidthFromMaxInt(maxLevel),
-              bytes.toInputStream()));
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read levels in page for col " + path, e);
-    }
-  }
-
-  private boolean isPageFullyConsumed() {
-    return readValues >= endOfPageValueCount;
-  }
-
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#consume()
-   */
   @Override
-  public void consume() {
-    checkRead();
-    valueRead = false;
+  boolean skipLevels(int rl, int dl) {
+    return false;
   }
 
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.ColumnReader#getTotalValueCount()
-   */
   @Override
-  public long getTotalValueCount() {
-    return totalValueCount;
-  }
-
-  static abstract class IntIterator {
-    abstract int nextInt();
-  }
-
-  static class ValuesReaderIntIterator extends IntIterator {
-    ValuesReader delegate;
-
-    public ValuesReaderIntIterator(ValuesReader delegate) {
-      super();
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      return delegate.readInteger();
-    }
-  }
-
-  static class RLEIntIterator extends IntIterator {
-    RunLengthBitPackingHybridDecoder delegate;
-
-    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      try {
-        return delegate.readInt();
-      } catch (IOException e) {
-        throw new ParquetDecodingException(e);
-      }
-    }
-  }
-
-  private static final class NullIntIterator extends IntIterator {
-    @Override
-    int nextInt() {
-      return 0;
-    }
+  void newPageInitialized(DataPage page) {
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/SynchronizingColumnReader.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/SynchronizingColumnReader.java
new file mode 100644
index 0000000..444ef96
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/SynchronizingColumnReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.PrimitiveConverter;
+
+/**
+ * A {@link ColumnReader} implementation for utilizing indexes. When filtering using column indexes, some of the rows
+ * may be loaded only partially, because rows are not synchronized across columns, thus pages containing other fields of
+ * a row may have been filtered out. In this case we can't assemble the row, but there is no need to do so either, since
+ * getting filtered out in another column means that it can not match the filter condition.
+ * <p>
+ * A {@link RecordReader} assembles rows by reading from each {@link ColumnReader}. Without filtering, when
+ * {@link RecordReader} starts reading a row, {@link ColumnReader}s are always positioned at the same row in respect to
+ * each other. With filtering, however, due to the misalignment described above, some of the pages read by
+ * {@link ColumnReader}s may contain values that have no corresponding values in other rows. This
+ * {@link SynchronizingColumnReader} is a column reader implementation that skips such values so that the values
+ * returned to {@link RecordReader} for the different fields all correspond to a single row.
+ * 
+ * @see PageReadStore#isInPageFilteringMode()
+ */
+class SynchronizingColumnReader extends ColumnReaderBase {
+
+  private final PrimitiveIterator.OfLong rowIndexes;
+  private long currentRow;
+  private long targetRow;
+  private long lastRowInPage;
+  private int valuesReadFromPage;
+
+  SynchronizingColumnReader(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter,
+      ParsedVersion writerVersion, PrimitiveIterator.OfLong rowIndexes) {
+    super(path, pageReader, converter, writerVersion);
+    this.rowIndexes = rowIndexes;
+    targetRow = Long.MIN_VALUE;
+    consume();
+  }
+
+  @Override
+  boolean isPageFullyConsumed() {
+    return getPageValueCount() <= valuesReadFromPage || lastRowInPage < targetRow;
+  }
+
+  @Override
+  boolean isFullyConsumed() {
+    return !rowIndexes.hasNext();
+  }
+
+  @Override
+  boolean skipLevels(int rl, int dl) {
+    ++valuesReadFromPage;
+    if (rl == 0) {
+      ++currentRow;
+      if (currentRow > targetRow) {
+        targetRow = rowIndexes.hasNext() ? rowIndexes.nextLong() : Long.MAX_VALUE;
+      }
+    }
+    return currentRow < targetRow;
+  }
+
+  @Override
+  protected void newPageInitialized(DataPage page) {
+    long firstRowIndex = page.getFirstRowIndex();
+    currentRow = firstRowIndex - 1;
+    lastRowInPage = firstRowIndex + page.getRowCount() - 1;
+    valuesReadFromPage = 0;
+  }
+
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
index 4d8f381..0751d4c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
@@ -24,10 +24,18 @@ package org.apache.parquet.column.page;
 abstract public class DataPage extends Page {
 
   private final int valueCount;
+  private final long firstRowIndex;
+  private final int rowCount;
 
   DataPage(int compressedSize, int uncompressedSize, int valueCount) {
+    this(compressedSize, uncompressedSize, valueCount, -1, -1);
+  }
+
+  DataPage(int compressedSize, int uncompressedSize, int valueCount, long firstRowIndex, int rowCount) {
     super(compressedSize, uncompressedSize);
     this.valueCount = valueCount;
+    this.firstRowIndex = firstRowIndex;
+    this.rowCount = rowCount;
   }
 
   /**
@@ -37,6 +45,33 @@ abstract public class DataPage extends Page {
     return valueCount;
   }
 
+  /**
+   * @return the index of the first row in this page
+   * @throws NotInPageFilteringModeException
+   *           if page filtering mode is not active
+   * @see PageReadStore#isInPageFilteringMode()
+   */
+  public long getFirstRowIndex() {
+    if (firstRowIndex < 0) {
+      throw new NotInPageFilteringModeException("First row index is not available");
+    }
+    return firstRowIndex;
+  }
+
+  /**
+   * @return the number of rows in this page
+   * @throws NotInPageFilteringModeException
+   *           if page filtering mode is not active; thrown only in case of {@link DataPageV1}
+   * @see PageReadStore#isInPageFilteringMode()
+   */
+  public int getRowCount() {
+    if (rowCount < 0) {
+      throw new NotInPageFilteringModeException(
+          "Row count is not available");
+    }
+    return rowCount;
+  }
+
   public abstract <T> T accept(Visitor<T> visitor);
 
   public static interface Visitor<T> {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
index 56928c3..42fc635 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
@@ -50,6 +50,27 @@ public class DataPageV1 extends DataPage {
   }
 
   /**
+   * @param bytes the bytes for this page
+   * @param valueCount count of values in this page
+   * @param uncompressedSize the uncompressed size of the page
+   * @param firstRowIndex the index of the first row in this page
+   * @param rowCount the number of rows in this page
+   * @param statistics of the page's values (max, min, num_null)
+   * @param rlEncoding the repetition level encoding for this page
+   * @param dlEncoding the definition level encoding for this page
+   * @param valuesEncoding the values encoding for this page
+   */
+  public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex, int rowCount,
+      Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
+    super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount, firstRowIndex, rowCount);
+    this.bytes = bytes;
+    this.statistics = statistics;
+    this.rlEncoding = rlEncoding;
+    this.dlEncoding = dlEncoding;
+    this.valuesEncoding = valuesEncoding;
+  }
+
+  /**
    * @return the bytes for the page
    */
   public BytesInput getBytes() {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
index 62dac83..7ec902d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
@@ -54,6 +54,32 @@ public class DataPageV2 extends DataPage {
    * @param rowCount count of rows
    * @param nullCount count of nulls
    * @param valueCount count of values
+   * @param firstRowIndex the index of the first row in this page
+   * @param repetitionLevels RLE encoded repetition levels
+   * @param definitionLevels RLE encoded definition levels
+   * @param dataEncoding encoding for the data
+   * @param data data encoded with dataEncoding
+   * @param statistics optional statistics for this page
+   * @return an uncompressed page
+   */
+  public static DataPageV2 uncompressed(
+      int rowCount, int nullCount, int valueCount, long firstRowIndex,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      Statistics<?> statistics) {
+    return new DataPageV2(
+        rowCount, nullCount, valueCount, firstRowIndex,
+        repetitionLevels, definitionLevels,
+        dataEncoding, data,
+        Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
+        statistics,
+        false);
+  }
+
+  /**
+   * @param rowCount count of rows
+   * @param nullCount count of nulls
+   * @param valueCount count of values
    * @param repetitionLevels RLE encoded repetition levels
    * @param definitionLevels RLE encoded definition levels
    * @param dataEncoding encoding for the data
@@ -77,7 +103,6 @@ public class DataPageV2 extends DataPage {
         true);
   }
 
-  private final int rowCount;
   private final int nullCount;
   private final BytesInput repetitionLevels;
   private final BytesInput definitionLevels;
@@ -93,8 +118,7 @@ public class DataPageV2 extends DataPage {
       int uncompressedSize,
       Statistics<?> statistics,
       boolean isCompressed) {
-    super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount);
-    this.rowCount = rowCount;
+    super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount, -1, rowCount);
     this.nullCount = nullCount;
     this.repetitionLevels = repetitionLevels;
     this.definitionLevels = definitionLevels;
@@ -104,8 +128,22 @@ public class DataPageV2 extends DataPage {
     this.isCompressed = isCompressed;
   }
 
-  public int getRowCount() {
-    return rowCount;
+  private DataPageV2(
+      int rowCount, int nullCount, int valueCount, long firstRowIndex,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data,
+      int uncompressedSize,
+      Statistics<?> statistics,
+      boolean isCompressed) {
+    super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize,
+        valueCount, firstRowIndex, rowCount);
+    this.nullCount = nullCount;
+    this.repetitionLevels = repetitionLevels;
+    this.definitionLevels = definitionLevels;
+    this.dataEncoding = dataEncoding;
+    this.data = data;
+    this.statistics = statistics;
+    this.isCompressed = isCompressed;
   }
 
   public int getNullCount() {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/NotInPageFilteringModeException.java
similarity index 65%
copy from parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
copy to parquet-column/src/main/java/org/apache/parquet/column/page/NotInPageFilteringModeException.java
index 24d5825..839c199 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/NotInPageFilteringModeException.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,26 +18,20 @@
  */
 package org.apache.parquet.column.page;
 
-import org.apache.parquet.column.ColumnDescriptor;
-
 /**
- * contains all the readers for all the columns of the corresponding row group
- *
- * TODO: rename to RowGroup?
+ * Exception thrown if an operation has no meaning when page filtering mode is not active.
+ * 
+ * @see PageReadStore#isInPageFilteringMode()
  */
-public interface PageReadStore {
+public class NotInPageFilteringModeException extends IllegalStateException {
 
   /**
-   *
-   * @param descriptor the descriptor of the column
-   * @return the page reader for that column
+   * Constructs an exception with the specified message
+   * 
+   * @param msg
+   *          the message of the exception
    */
-  PageReader getPageReader(ColumnDescriptor descriptor);
-
-  /**
-   *
-   * @return the total number of rows in that row group
-   */
-  long getRowCount();
-
+  public NotInPageFilteringModeException(String msg) {
+    super(msg);
+  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
index 24d5825..0fc26ed 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.page;
 
+import java.util.PrimitiveIterator;
 import org.apache.parquet.column.ColumnDescriptor;
 
 /**
@@ -29,7 +30,8 @@ public interface PageReadStore {
 
   /**
    *
-   * @param descriptor the descriptor of the column
+   * @param descriptor
+   *          the descriptor of the column
    * @return the page reader for that column
    */
   PageReader getPageReader(ColumnDescriptor descriptor);
@@ -40,4 +42,25 @@ public interface PageReadStore {
    */
   long getRowCount();
 
+  /**
+   * Returns the indexes of the rows to be read/built. All the rows which index is not returned shall be skipped.
+   *
+   * @return the incremental iterator of the row indexes
+   * @throws NotInPageFilteringModeException
+   *           if page filtering mode is not active so the related information is not available
+   * @see #isInPageFilteringMode()
+   */
+  default PrimitiveIterator.OfLong getRowIndexes() {
+    throw new NotInPageFilteringModeException("Row indexes are not available");
+  }
+
+  /**
+   * If page filtering mode is active then some values might have to be skipped to get the rows in synch between the
+   * pages.
+   *
+   * @return {@code true} if page filtering mode is active; {@code false} otherwise
+   */
+  default boolean isInPageFilteringMode() {
+    return false;
+  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 950b70f..490cc3e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -22,12 +22,13 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveType;
 
 class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
-  private static class BinaryColumnIndex extends ColumnIndexBase {
+  private static class BinaryColumnIndex extends ColumnIndexBase<Binary> {
     private Binary[] minValues;
     private Binary[] maxValues;
 
@@ -54,6 +55,28 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
     String getMaxValueAsString(int pageIndex) {
       return stringifier.stringify(maxValues[pageIndex]);
     }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+      return (Statistics<T>) new Statistics<Binary>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+    }
+
+    @Override
+    ValueComparator createValueComparator(Object value) {
+      final Binary v = (Binary) value;
+      return new ValueComparator() {
+        @Override
+        int compareValueToMin(int arrayIndex) {
+          return comparator.compare(v, minValues[arrayIndex]);
+        }
+
+        @Override
+        int compareValueToMax(int arrayIndex) {
+          return comparator.compare(v, maxValues[arrayIndex]);
+        }
+      };
+    }
   }
 
   private final List<Binary> minValues = new ArrayList<>();
@@ -76,8 +99,8 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
-    minValues.add(min == null ? null : convert(min));
-    maxValues.add(max == null ? null : convert(max));
+    minValues.add(convert(min));
+    maxValues.add(convert(max));
   }
 
   @Override
@@ -87,7 +110,7 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
   }
 
   @Override
-  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+  ColumnIndexBase<Binary> createColumnIndex(PrimitiveType type) {
     BinaryColumnIndex columnIndex = new BinaryColumnIndex(type);
     columnIndex.minValues = minValues.toArray(new Binary[minValues.size()]);
     columnIndex.maxValues = maxValues.toArray(new Binary[maxValues.size()]);
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
index 3053f78..233bd1b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.parquet.internal.column.columnindex;
 
 import java.nio.ByteBuffer;
-
+import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveType;
@@ -28,7 +28,7 @@ import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
 import it.unimi.dsi.fastutil.booleans.BooleanList;
 
 class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
-  private static class BooleanColumnIndex extends ColumnIndexBase {
+  private static class BooleanColumnIndex extends ColumnIndexBase<Boolean> {
     private boolean[] minValues;
     private boolean[] maxValues;
 
@@ -55,6 +55,28 @@ class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
     String getMaxValueAsString(int pageIndex) {
       return stringifier.stringify(maxValues[pageIndex]);
     }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+      return (Statistics<T>) new Statistics<Boolean>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+    }
+
+    @Override
+    ValueComparator createValueComparator(Object value) {
+      final boolean v = (boolean) value;
+      return new ValueComparator() {
+        @Override
+        int compareValueToMin(int arrayIndex) {
+          return comparator.compare(v, minValues[arrayIndex]);
+        }
+
+        @Override
+        int compareValueToMax(int arrayIndex) {
+          return comparator.compare(v, maxValues[arrayIndex]);
+        }
+      };
+    }
   }
 
   private final BooleanList minValues = new BooleanArrayList();
@@ -70,18 +92,18 @@ class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
-    minValues.add(min == null ? false : convert(min));
-    maxValues.add(max == null ? false : convert(max));
+    minValues.add(convert(min));
+    maxValues.add(convert(max));
   }
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add(min == null ? false : (boolean) min);
-    maxValues.add(max == null ? false : (boolean) max);
+    minValues.add((boolean) min);
+    maxValues.add((boolean) max);
   }
 
   @Override
-  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+  ColumnIndexBase<Boolean> createColumnIndex(PrimitiveType type) {
     BooleanColumnIndex columnIndex = new BooleanColumnIndex(type);
     columnIndex.minValues = minValues.toBooleanArray();
     columnIndex.maxValues = maxValues.toBooleanArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
index 5d82815..e47b5b3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
@@ -18,9 +18,335 @@
  */
 package org.apache.parquet.internal.column.columnindex;
 
+import java.util.PrimitiveIterator;
+import java.util.PrimitiveIterator.OfInt;
+
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnIndexBase;
+
 /**
- * Enum for {@link org.apache.parquet.format.BoundaryOrder}.
+ * Enum for {@link org.apache.parquet.format.BoundaryOrder}. It also contains the implementations of searching for
+ * matching page indexes for column index based filtering.
  */
 public enum BoundaryOrder {
-  UNORDERED, ASCENDING, DESCENDING;
+  UNORDERED {
+    @Override
+    PrimitiveIterator.OfInt eq(ColumnIndexBase<?>.ValueComparator comparator) {
+      return IndexIterator.filterTranslate(comparator.arrayLength(),
+          arrayIndex -> comparator.compareValueToMin(arrayIndex) >= 0 && comparator.compareValueToMax(arrayIndex) <= 0,
+          comparator::translate);
+    }
+
+    @Override
+    PrimitiveIterator.OfInt gt(ColumnIndexBase<?>.ValueComparator comparator) {
+      return IndexIterator.filterTranslate(comparator.arrayLength(),
+          arrayIndex -> comparator.compareValueToMax(arrayIndex) < 0,
+          comparator::translate);
+    }
+
+    @Override
+    PrimitiveIterator.OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      return IndexIterator.filterTranslate(comparator.arrayLength(),
+          arrayIndex -> comparator.compareValueToMax(arrayIndex) <= 0,
+          comparator::translate);
+    }
+
+    @Override
+    PrimitiveIterator.OfInt lt(ColumnIndexBase<?>.ValueComparator comparator) {
+      return IndexIterator.filterTranslate(comparator.arrayLength(),
+          arrayIndex -> comparator.compareValueToMin(arrayIndex) > 0,
+          comparator::translate);
+    }
+
+    @Override
+    PrimitiveIterator.OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      return IndexIterator.filterTranslate(comparator.arrayLength(),
+          arrayIndex -> comparator.compareValueToMin(arrayIndex) >= 0,
+          comparator::translate);
+    }
+
+    @Override
+    PrimitiveIterator.OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      return IndexIterator.filterTranslate(comparator.arrayLength(),
+          arrayIndex -> comparator.compareValueToMin(arrayIndex) != 0 || comparator.compareValueToMax(arrayIndex) != 0,
+          comparator::translate);
+    }
+  },
+  ASCENDING {
+    @Override
+    OfInt eq(ColumnIndexBase<?>.ValueComparator comparator) {
+      Bounds bounds = findBounds(comparator);
+      if (bounds == null) {
+        return IndexIterator.EMPTY;
+      }
+      return IndexIterator.rangeTranslate(bounds.lower, bounds.upper, comparator::translate);
+    }
+
+    @Override
+    OfInt gt(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = 0;
+      int right = length;
+      do {
+        int i = floorMid(left, right);
+        if (comparator.compareValueToMax(i) >= 0) {
+          left = i + 1;
+        } else {
+          right = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+    }
+
+    @Override
+    OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = 0;
+      int right = length;
+      do {
+        int i = floorMid(left, right);
+        if (comparator.compareValueToMax(i) > 0) {
+          left = i + 1;
+        } else {
+          right = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+    }
+
+    @Override
+    OfInt lt(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = -1;
+      int right = length - 1;
+      do {
+        int i = ceilingMid(left, right);
+        if (comparator.compareValueToMin(i) <= 0) {
+          right = i - 1;
+        } else {
+          left = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(0, left, comparator::translate);
+    }
+
+    @Override
+    OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = -1;
+      int right = length - 1;
+      do {
+        int i = ceilingMid(left, right);
+        if (comparator.compareValueToMin(i) < 0) {
+          right = i - 1;
+        } else {
+          left = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(0, left, comparator::translate);
+    }
+
+    @Override
+    OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      Bounds bounds = findBounds(comparator);
+      int length = comparator.arrayLength();
+      if (bounds == null) {
+        return IndexIterator.all(comparator);
+      }
+      return IndexIterator.filterTranslate(
+          length,
+          i -> i < bounds.lower || i > bounds.upper || comparator.compareValueToMin(i) != 0
+              || comparator.compareValueToMax(i) != 0,
+          comparator::translate);
+    }
+
+    private Bounds findBounds(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int lowerLeft = 0;
+      int upperLeft = 0;
+      int lowerRight = length - 1;
+      int upperRight = length - 1;
+      do {
+        if (lowerLeft > lowerRight) {
+          return null;
+        }
+        int i = floorMid(lowerLeft, lowerRight);
+        if (comparator.compareValueToMin(i) < 0) {
+          lowerRight = upperRight = i - 1;
+        } else if (comparator.compareValueToMax(i) > 0) {
+          lowerLeft = upperLeft = i + 1;
+        } else {
+          lowerRight = upperLeft = i;
+        }
+      } while (lowerLeft != lowerRight);
+      do {
+        if (upperLeft > upperRight) {
+          return null;
+        }
+        int i = ceilingMid(upperLeft, upperRight);
+        if (comparator.compareValueToMin(i) < 0) {
+          upperRight = i - 1;
+        } else if (comparator.compareValueToMax(i) > 0) {
+          upperLeft = i + 1;
+        } else {
+          upperLeft = i;
+        }
+      } while (upperLeft != upperRight);
+      return new Bounds(lowerLeft, upperRight);
+    }
+  },
+  DESCENDING {
+    @Override
+    OfInt eq(ColumnIndexBase<?>.ValueComparator comparator) {
+      Bounds bounds = findBounds(comparator);
+      if (bounds == null) {
+        return IndexIterator.EMPTY;
+      }
+      return IndexIterator.rangeTranslate(bounds.lower, bounds.upper, comparator::translate);
+    }
+
+    @Override
+    OfInt gt(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = -1;
+      int right = length - 1;
+      do {
+        int i = ceilingMid(left, right);
+        if (comparator.compareValueToMax(i) >= 0) {
+          right = i - 1;
+        } else {
+          left = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(0, left, comparator::translate);
+    }
+
+    @Override
+    OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = -1;
+      int right = length - 1;
+      do {
+        int i = ceilingMid(left, right);
+        if (comparator.compareValueToMax(i) > 0) {
+          right = i - 1;
+        } else {
+          left = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(0, left, comparator::translate);
+    }
+
+    @Override
+    OfInt lt(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = 0;
+      int right = length;
+      do {
+        int i = floorMid(left, right);
+        if (comparator.compareValueToMin(i) <= 0) {
+          left = i + 1;
+        } else {
+          right = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+    }
+
+    @Override
+    OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int left = 0;
+      int right = length;
+      do {
+        int i = floorMid(left, right);
+        if (comparator.compareValueToMin(i) < 0) {
+          left = i + 1;
+        } else {
+          right = i;
+        }
+      } while (left < right);
+      return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+    }
+
+    @Override
+    OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator) {
+      Bounds bounds = findBounds(comparator);
+      int length = comparator.arrayLength();
+      if (bounds == null) {
+        return IndexIterator.all(comparator);
+      }
+      return IndexIterator.filterTranslate(
+          length,
+          i -> i < bounds.lower || i > bounds.upper || comparator.compareValueToMin(i) != 0
+              || comparator.compareValueToMax(i) != 0,
+          comparator::translate);
+    }
+
+    private Bounds findBounds(ColumnIndexBase<?>.ValueComparator comparator) {
+      int length = comparator.arrayLength();
+      int lowerLeft = 0;
+      int upperLeft = 0;
+      int lowerRight = length - 1;
+      int upperRight = length - 1;
+      do {
+        if (lowerLeft > lowerRight) {
+          return null;
+        }
+        int i = floorMid(lowerLeft, lowerRight);
+        if (comparator.compareValueToMax(i) > 0) {
+          lowerRight = upperRight = i - 1;
+        } else if (comparator.compareValueToMin(i) < 0) {
+          lowerLeft = upperLeft = i + 1;
+        } else {
+          lowerRight = upperLeft = i;
+        }
+      } while (lowerLeft != lowerRight);
+      do {
+        if (upperLeft > upperRight) {
+          return null;
+        }
+        int i = ceilingMid(upperLeft, upperRight);
+        if (comparator.compareValueToMax(i) > 0) {
+          upperRight = i - 1;
+        } else if (comparator.compareValueToMin(i) < 0) {
+          upperLeft = i + 1;
+        } else {
+          upperLeft = i;
+        }
+      } while (upperLeft != upperRight);
+      return new Bounds(lowerLeft, upperRight);
+    }
+  };
+
+  private static class Bounds {
+    final int lower, upper;
+
+    Bounds(int lower, int upper) {
+      assert lower <= upper;
+      this.lower = lower;
+      this.upper = upper;
+    }
+  }
+
+  private static int floorMid(int left, int right) {
+    // Avoid the possible overflow might happen in case of (left + right) / 2
+    return left + ((right - left) / 2);
+  }
+
+  private static int ceilingMid(int left, int right) {
+    // Avoid the possible overflow might happen in case of (left + right + 1) / 2
+    return left + ((right - left + 1) / 2);
+  }
+
+  abstract PrimitiveIterator.OfInt eq(ColumnIndexBase<?>.ValueComparator comparator);
+
+  abstract PrimitiveIterator.OfInt gt(ColumnIndexBase<?>.ValueComparator comparator);
+
+  abstract PrimitiveIterator.OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator);
+
+  abstract PrimitiveIterator.OfInt lt(ColumnIndexBase<?>.ValueComparator comparator);
+
+  abstract PrimitiveIterator.OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator);
+
+  abstract PrimitiveIterator.OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator);
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
index f7bd16b..b91a5c0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
@@ -20,13 +20,18 @@ package org.apache.parquet.internal.column.columnindex;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
 
 /**
- * Column index containing min/max and null count values for the pages in a column chunk.
+ * Column index containing min/max and null count values for the pages in a column chunk. It also implements methods of
+ * {@link Visitor} to return the indexes of the matching pages. They are used by {@link ColumnIndexFilter}.
  *
  * @see org.apache.parquet.format.ColumnIndex
  */
-public interface ColumnIndex {
+public interface ColumnIndex extends Visitor<PrimitiveIterator.OfInt> {
   /**
    * @return the boundary order of the min/max values; used for converting to the related thrift object
    */
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index 6d05558..9633b61 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -26,8 +26,22 @@ import java.util.EnumMap;
 import java.util.Formatter;
 import java.util.List;
 import java.util.Map;
+import java.util.PrimitiveIterator;
+import java.util.function.IntPredicate;
 
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveStringifier;
@@ -37,29 +51,53 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
 import it.unimi.dsi.fastutil.booleans.BooleanList;
 import it.unimi.dsi.fastutil.booleans.BooleanLists;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
 import it.unimi.dsi.fastutil.longs.LongLists;
 
 /**
- * Builder implementation to create {@link ColumnIndex} objects during writing a parquet file.
+ * Builder implementation to create {@link ColumnIndex} objects.
  */
 public abstract class ColumnIndexBuilder {
 
-  static abstract class ColumnIndexBase implements ColumnIndex {
+  static abstract class ColumnIndexBase<C> implements ColumnIndex {
+    /*
+     * A class containing the value to be compared to the min/max values. This way we only need to do the deboxing once
+     * per predicate execution instead for every comparison.
+     */
+    abstract class ValueComparator {
+      abstract int compareValueToMin(int arrayIndex);
+
+      abstract int compareValueToMax(int arrayIndex);
+
+      int arrayLength() {
+        return pageIndexes.length;
+      }
+
+      int translate(int arrayIndex) {
+        return pageIndexes[arrayIndex];
+      }
+    }
+
     private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
     private static final int MAX_VALUE_LENGTH_FOR_TOSTRING = 40;
     private static final String TOSTRING_TRUNCATION_MARKER = "(...)";
-    private static final int TOSTRING_TRUNCATION_START_POS =
-        (MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length()) / 2;
-    private static final int TOSTRING_TRUNCATION_END_POS =
-        MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
+    private static final int TOSTRING_TRUNCATION_START_POS = (MAX_VALUE_LENGTH_FOR_TOSTRING
+        - TOSTRING_TRUNCATION_MARKER.length()) / 2;
+    private static final int TOSTRING_TRUNCATION_END_POS = MAX_VALUE_LENGTH_FOR_TOSTRING
+        - TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
     private static final String TOSTRING_MISSING_VALUE_MARKER = "<none>";
 
     final PrimitiveStringifier stringifier;
-    final PrimitiveComparator<Binary> comparator;
+    final PrimitiveComparator<C> comparator;
     private boolean[] nullPages;
     private BoundaryOrder boundaryOrder;
+    // Storing the page index for each array index (min/max values are not stored for null-pages)
+    private int[] pageIndexes;
     // might be null
     private long[] nullCounts;
 
@@ -97,11 +135,12 @@ public abstract class ColumnIndexBuilder {
     @Override
     public List<ByteBuffer> getMinValues() {
       List<ByteBuffer> list = new ArrayList<>(getPageCount());
+      int arrayIndex = 0;
       for (int i = 0, n = getPageCount(); i < n; ++i) {
         if (isNullPage(i)) {
           list.add(EMPTY_BYTE_BUFFER);
         } else {
-          list.add(getMinValueAsBytes(i));
+          list.add(getMinValueAsBytes(arrayIndex++));
         }
       }
       return list;
@@ -110,11 +149,12 @@ public abstract class ColumnIndexBuilder {
     @Override
     public List<ByteBuffer> getMaxValues() {
       List<ByteBuffer> list = new ArrayList<>(getPageCount());
+      int arrayIndex = 0;
       for (int i = 0, n = getPageCount(); i < n; ++i) {
         if (isNullPage(i)) {
           list.add(EMPTY_BYTE_BUFFER);
         } else {
-          list.add(getMaxValueAsBytes(i));
+          list.add(getMaxValueAsBytes(arrayIndex++));
         }
       }
       return list;
@@ -127,14 +167,15 @@ public abstract class ColumnIndexBuilder {
         String minMaxPart = "  %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s  %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s\n";
         formatter.format("%-10s  %20s" + minMaxPart, "", "null count", "min", "max");
         String format = "page-%-5d  %20s" + minMaxPart;
+        int arrayIndex = 0;
         for (int i = 0, n = nullPages.length; i < n; ++i) {
           String nullCount = nullCounts == null ? TOSTRING_MISSING_VALUE_MARKER : Long.toString(nullCounts[i]);
           String min, max;
           if (nullPages[i]) {
             min = max = TOSTRING_MISSING_VALUE_MARKER;
           } else {
-            min = truncate(getMinValueAsString(i));
-            max = truncate(getMaxValueAsString(i));
+            min = truncate(getMinValueAsString(arrayIndex));
+            max = truncate(getMaxValueAsString(arrayIndex++));
           }
           formatter.format(format, i, nullCount, min, max);
         }
@@ -150,13 +191,164 @@ public abstract class ColumnIndexBuilder {
       return nullPages[pageIndex];
     }
 
-    abstract ByteBuffer getMinValueAsBytes(int pageIndex);
+    /*
+     * Returns the min value for arrayIndex as a ByteBuffer. (Min values are not stored for null-pages so arrayIndex
+     * might not equal to pageIndex.)
+     */
+    abstract ByteBuffer getMinValueAsBytes(int arrayIndex);
+
+    /*
+     * Returns the max value for arrayIndex as a ByteBuffer. (Max values are not stored for null-pages so arrayIndex
+     * might not equal to pageIndex.)
+     */
+    abstract ByteBuffer getMaxValueAsBytes(int arrayIndex);
+
+    /*
+     * Returns the min value for arrayIndex as a String. (Min values are not stored for null-pages so arrayIndex might
+     * not equal to pageIndex.)
+     */
+    abstract String getMinValueAsString(int arrayIndex);
+
+    /*
+     * Returns the max value for arrayIndex as a String. (Max values are not stored for null-pages so arrayIndex might
+     * not equal to pageIndex.)
+     */
+    abstract String getMaxValueAsString(int arrayIndex);
+
+    /* Creates a Statistics object for filtering. Used for user defined predicates. */
+    abstract <T extends Comparable<T>> org.apache.parquet.filter2.predicate.Statistics<T> createStats(int arrayIndex);
+
+    /* Creates a ValueComparator object containing the specified value to be compared for min/max values */
+    abstract ValueComparator createValueComparator(Object value);
+
+    @Override
+    public PrimitiveIterator.OfInt visit(And and) {
+      throw new UnsupportedOperationException("AND shall not be used on column index directly");
+    }
+
+    @Override
+    public PrimitiveIterator.OfInt visit(Not not) {
+      throw new UnsupportedOperationException("NOT shall not be used on column index directly");
+    }
+
+    @Override
+    public PrimitiveIterator.OfInt visit(Or or) {
+      throw new UnsupportedOperationException("OR shall not be used on column index directly");
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Eq<T> eq) {
+      T value = eq.getValue();
+      if (value == null) {
+        if (nullCounts == null) {
+          // Searching for nulls so if we don't have null related statistics we have to return all pages
+          return IndexIterator.all(getPageCount());
+        } else {
+          return IndexIterator.filter(getPageCount(), pageIndex -> nullCounts[pageIndex] > 0);
+        }
+      }
+      return getBoundaryOrder().eq(createValueComparator(value));
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Gt<T> gt) {
+      return getBoundaryOrder().gt(createValueComparator(gt.getValue()));
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(GtEq<T> gtEq) {
+      return getBoundaryOrder().gtEq(createValueComparator(gtEq.getValue()));
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Lt<T> lt) {
+      return getBoundaryOrder().lt(createValueComparator(lt.getValue()));
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(LtEq<T> ltEq) {
+      return getBoundaryOrder().ltEq(createValueComparator(ltEq.getValue()));
+    }
+
+    @Override
+    public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(NotEq<T> notEq) {
+      T value = notEq.getValue();
+      if (value == null) {
+        return IndexIterator.filter(getPageCount(), pageIndex -> !nullPages[pageIndex]);
+      }
+
+      if (nullCounts == null) {
+        // Nulls match so if we don't have null related statistics we have to return all pages
+        return IndexIterator.all(getPageCount());
+      }
+
+      // Merging value filtering with pages containing nulls
+      IntSet matchingIndexes = new IntOpenHashSet();
+      getBoundaryOrder().notEq(createValueComparator(value))
+          .forEachRemaining((int index) -> matchingIndexes.add(index));
+      return IndexIterator.filter(getPageCount(),
+          pageIndex -> nullCounts[pageIndex] > 0 || matchingIndexes.contains(pageIndex));
+    }
+
+    @Override
+    public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> PrimitiveIterator.OfInt visit(
+        UserDefined<T, U> udp) {
+      final UserDefinedPredicate<T> predicate = udp.getUserDefinedPredicate();
+      final boolean acceptNulls = predicate.keep(null);
+
+      if (acceptNulls && nullCounts == null) {
+        // Nulls match so if we don't have null related statistics we have to return all pages
+        return IndexIterator.all(getPageCount());
+      }
+
+      return IndexIterator.filter(getPageCount(), new IntPredicate() {
+        private int arrayIndex = -1;
 
-    abstract ByteBuffer getMaxValueAsBytes(int pageIndex);
+        @Override
+        public boolean test(int pageIndex) {
+          if (isNullPage(pageIndex)) {
+            return acceptNulls;
+          } else {
+            ++arrayIndex;
+            if (acceptNulls && nullCounts[pageIndex] > 0) {
+              return true;
+            }
+            org.apache.parquet.filter2.predicate.Statistics<T> stats = createStats(arrayIndex);
+            return !predicate.canDrop(stats);
+          }
+        }
+      });
+    }
 
-    abstract String getMinValueAsString(int pageIndex);
+    @Override
+    public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> PrimitiveIterator.OfInt visit(
+        LogicalNotUserDefined<T, U> udp) {
+      final UserDefinedPredicate<T> inversePredicate = udp.getUserDefined().getUserDefinedPredicate();
+      final boolean acceptNulls = !inversePredicate.keep(null);
+
+      if (acceptNulls && nullCounts == null) {
+        // Nulls match so if we don't have null related statistics we have to return all pages
+        return IndexIterator.all(getPageCount());
+      }
+
+      return IndexIterator.filter(getPageCount(), new IntPredicate() {
+        private int arrayIndex = -1;
 
-    abstract String getMaxValueAsString(int pageIndex);
+        @Override
+        public boolean test(int pageIndex) {
+          if (isNullPage(pageIndex)) {
+            return acceptNulls;
+          } else {
+            ++arrayIndex;
+            if (acceptNulls && nullCounts[pageIndex] > 0) {
+              return true;
+            }
+            org.apache.parquet.filter2.predicate.Statistics<T> stats = createStats(arrayIndex);
+            return !inversePredicate.inverseCanDrop(stats);
+          }
+        }
+      });
+    }
   }
 
   private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {
@@ -174,7 +366,7 @@ public abstract class ColumnIndexBuilder {
     }
 
     @Override
-    ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    ColumnIndexBase<?> createColumnIndex(PrimitiveType type) {
       return null;
     }
 
@@ -208,6 +400,8 @@ public abstract class ColumnIndexBuilder {
   private final BooleanList nullPages = new BooleanArrayList();
   private final LongList nullCounts = new LongArrayList();
   private long minMaxSize;
+  private final IntList pageIndexes = new IntArrayList();
+  private int nextPageIndex;
 
   /**
    * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at
@@ -283,7 +477,7 @@ public abstract class ColumnIndexBuilder {
     }
 
     builder.fill(nullPages, nullCounts, minValues, maxValues);
-    ColumnIndexBase columnIndex = builder.build(type);
+    ColumnIndexBase<?> columnIndex = builder.build(type);
     columnIndex.boundaryOrder = requireNonNull(boundaryOrder);
     return columnIndex;
   }
@@ -304,13 +498,14 @@ public abstract class ColumnIndexBuilder {
       Object min = stats.genericGetMin();
       Object max = stats.genericGetMax();
       addMinMax(min, max);
+      pageIndexes.add(nextPageIndex);
       minMaxSize += sizeOf(min);
       minMaxSize += sizeOf(max);
     } else {
       nullPages.add(true);
-      addMinMax(null, null);
     }
     nullCounts.add(stats.getNumNulls());
+    ++nextPageIndex;
   }
 
   abstract void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max);
@@ -320,9 +515,9 @@ public abstract class ColumnIndexBuilder {
   private void fill(List<Boolean> nullPages, List<Long> nullCounts, List<ByteBuffer> minValues,
       List<ByteBuffer> maxValues) {
     clear();
-    int requiredSize = nullPages.size();
-    if ((nullCounts != null && nullCounts.size() != requiredSize) || minValues.size() != requiredSize
-        || maxValues.size() != requiredSize) {
+    int pageCount = nullPages.size();
+    if ((nullCounts != null && nullCounts.size() != pageCount) || minValues.size() != pageCount
+        || maxValues.size() != pageCount) {
       throw new IllegalArgumentException(
           String.format("Not all sizes are equal (nullPages:%d, nullCounts:%s, minValues:%d, maxValues:%d",
               nullPages.size(), nullCounts == null ? "null" : nullCounts.size(), minValues.size(), maxValues.size()));
@@ -333,13 +528,12 @@ public abstract class ColumnIndexBuilder {
       this.nullCounts.addAll(nullCounts);
     }
 
-    for (int i = 0; i < requiredSize; ++i) {
-      if (nullPages.get(i)) {
-        addMinMaxFromBytes(null, null);
-      } else {
+    for (int i = 0; i < pageCount; ++i) {
+      if (!nullPages.get(i)) {
         ByteBuffer min = minValues.get(i);
         ByteBuffer max = maxValues.get(i);
         addMinMaxFromBytes(min, max);
+        pageIndexes.add(i);
         minMaxSize += min.remaining();
         minMaxSize += max.remaining();
       }
@@ -350,7 +544,7 @@ public abstract class ColumnIndexBuilder {
    * @return the newly created column index or {@code null} if the {@link ColumnIndex} would be empty
    */
   public ColumnIndex build() {
-    ColumnIndexBase columnIndex = build(type);
+    ColumnIndexBase<?> columnIndex = build(type);
     if (columnIndex == null) {
       return null;
     }
@@ -358,16 +552,17 @@ public abstract class ColumnIndexBuilder {
     return columnIndex;
   }
 
-  private ColumnIndexBase build(PrimitiveType type) {
+  private ColumnIndexBase<?> build(PrimitiveType type) {
     if (nullPages.isEmpty()) {
       return null;
     }
-    ColumnIndexBase columnIndex = createColumnIndex(type);
+    ColumnIndexBase<?> columnIndex = createColumnIndex(type);
     columnIndex.nullPages = nullPages.toBooleanArray();
     // Null counts is optional so keep it null if the builder has no values
     if (!nullCounts.isEmpty()) {
       columnIndex.nullCounts = nullCounts.toLongArray();
     }
+    columnIndex.pageIndexes = pageIndexes.toIntArray();
 
     return columnIndex;
   }
@@ -384,51 +579,24 @@ public abstract class ColumnIndexBuilder {
 
   // min[i] <= min[i+1] && max[i] <= max[i+1]
   private boolean isAscending(PrimitiveComparator<Binary> comparator) {
-    int prevPage = nextNonNullPage(0);
-    // All pages are null-page
-    if (prevPage < 0) {
-      return false;
-    }
-    int nextPage = nextNonNullPage(prevPage + 1);
-    while (nextPage > 0) {
-      if (compareMinValues(comparator, prevPage, nextPage) > 0
-          || compareMaxValues(comparator, prevPage, nextPage) > 0) {
+    for (int i = 1, n = pageIndexes.size(); i < n; ++i) {
+      if (compareMinValues(comparator, i - 1, i) > 0 || compareMaxValues(comparator, i - 1, i) > 0) {
         return false;
       }
-      prevPage = nextPage;
-      nextPage = nextNonNullPage(nextPage + 1);
     }
     return true;
   }
 
   // min[i] >= min[i+1] && max[i] >= max[i+1]
   private boolean isDescending(PrimitiveComparator<Binary> comparator) {
-    int prevPage = nextNonNullPage(0);
-    // All pages are null-page
-    if (prevPage < 0) {
-      return false;
-    }
-    int nextPage = nextNonNullPage(prevPage + 1);
-    while (nextPage > 0) {
-      if (compareMinValues(comparator, prevPage, nextPage) < 0
-          || compareMaxValues(comparator, prevPage, nextPage) < 0) {
+    for (int i = 1, n = pageIndexes.size(); i < n; ++i) {
+      if (compareMinValues(comparator, i - 1, i) < 0 || compareMaxValues(comparator, i - 1, i) < 0) {
         return false;
       }
-      prevPage = nextPage;
-      nextPage = nextNonNullPage(nextPage + 1);
     }
     return true;
   }
 
-  private int nextNonNullPage(int startIndex) {
-    for (int i = startIndex, n = nullPages.size(); i < n; ++i) {
-      if (!nullPages.get(i)) {
-        return i;
-      }
-    }
-    return -1;
-  }
-
   abstract int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
 
   abstract int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
@@ -438,11 +606,13 @@ public abstract class ColumnIndexBuilder {
     nullCounts.clear();
     clearMinMax();
     minMaxSize = 0;
+    nextPageIndex = 0;
+    pageIndexes.clear();
   }
 
   abstract void clearMinMax();
 
-  abstract ColumnIndexBase createColumnIndex(PrimitiveType type);
+  abstract ColumnIndexBase<?> createColumnIndex(PrimitiveType type);
 
   abstract int sizeOf(Object value);
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
index f877dfc..24be02c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
 
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import it.unimi.dsi.fastutil.doubles.DoubleList;
 
 class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
-  private static class DoubleColumnIndex extends ColumnIndexBase {
+  private static class DoubleColumnIndex extends ColumnIndexBase<Double> {
     private double[] minValues;
     private double[] maxValues;
 
@@ -57,6 +58,28 @@ class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
     String getMaxValueAsString(int pageIndex) {
       return stringifier.stringify(maxValues[pageIndex]);
     }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+      return (Statistics<T>) new Statistics<Double>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+    }
+
+    @Override
+    ValueComparator createValueComparator(Object value) {
+      final double v = (double) value;
+      return new ValueComparator() {
+        @Override
+        int compareValueToMin(int arrayIndex) {
+          return comparator.compare(v, minValues[arrayIndex]);
+        }
+
+        @Override
+        int compareValueToMax(int arrayIndex) {
+          return comparator.compare(v, maxValues[arrayIndex]);
+        }
+      };
+    }
   }
 
   private final DoubleList minValues = new DoubleArrayList();
@@ -72,18 +95,18 @@ class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
-    minValues.add(min == null ? 0 : convert(min));
-    maxValues.add(max == null ? 0 : convert(max));
+    minValues.add(convert(min));
+    maxValues.add(convert(max));
   }
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add(min == null ? 0 : (double) min);
-    maxValues.add(max == null ? 0 : (double) max);
+    minValues.add((double) min);
+    maxValues.add((double) max);
   }
 
   @Override
-  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+  ColumnIndexBase<Double> createColumnIndex(PrimitiveType type) {
     DoubleColumnIndex columnIndex = new DoubleColumnIndex(type);
     columnIndex.minValues = minValues.toDoubleArray();
     columnIndex.maxValues = maxValues.toDoubleArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
index f170662..4452746 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
 
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
 import it.unimi.dsi.fastutil.floats.FloatList;
 
 class FloatColumnIndexBuilder extends ColumnIndexBuilder {
-  private static class FloatColumnIndex extends ColumnIndexBase {
+  private static class FloatColumnIndex extends ColumnIndexBase<Float> {
     private float[] minValues;
     private float[] maxValues;
 
@@ -57,6 +58,28 @@ class FloatColumnIndexBuilder extends ColumnIndexBuilder {
     String getMaxValueAsString(int pageIndex) {
       return stringifier.stringify(maxValues[pageIndex]);
     }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+      return (Statistics<T>) new Statistics<Float>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+    }
+
+    @Override
+    ValueComparator createValueComparator(Object value) {
+      final float v = (float) value;
+      return new ValueComparator() {
+        @Override
+        int compareValueToMin(int arrayIndex) {
+          return comparator.compare(v, minValues[arrayIndex]);
+        }
+
+        @Override
+        int compareValueToMax(int arrayIndex) {
+          return comparator.compare(v, maxValues[arrayIndex]);
+        }
+      };
+    }
   }
 
   private final FloatList minValues = new FloatArrayList();
@@ -72,18 +95,18 @@ class FloatColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
-    minValues.add(min == null ? 0 : convert(min));
-    maxValues.add(max == null ? 0 : convert(max));
+    minValues.add(convert(min));
+    maxValues.add(convert(max));
   }
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add(min == null ? 0 : (float) min);
-    maxValues.add(max == null ? 0 : (float) max);
+    minValues.add((float) min);
+    maxValues.add((float) max);
   }
 
   @Override
-  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+  ColumnIndexBase<Float> createColumnIndex(PrimitiveType type) {
     FloatColumnIndex columnIndex = new FloatColumnIndex(type);
     columnIndex.minValues = minValues.toFloatArray();
     columnIndex.maxValues = maxValues.toFloatArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
new file mode 100644
index 0000000..9eab65e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+import java.util.function.IntPredicate;
+import java.util.function.IntUnaryOperator;
+
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnIndexBase;
+
+/**
+ * Iterator implementation for page indexes.
+ */
+class IndexIterator implements PrimitiveIterator.OfInt {
+  public static final PrimitiveIterator.OfInt EMPTY = new OfInt() {
+    @Override
+    public boolean hasNext() {
+      return false;
+    }
+
+    @Override
+    public int nextInt() {
+      throw new NoSuchElementException();
+    }
+  };
+  private int index;
+  private final int endIndex;
+  private final IntPredicate filter;
+  private final IntUnaryOperator translator;
+
+  static PrimitiveIterator.OfInt all(int pageCount) {
+    return new IndexIterator(0, pageCount, i -> true, i -> i);
+  }
+
+  static PrimitiveIterator.OfInt all(ColumnIndexBase<?>.ValueComparator comparator) {
+    return new IndexIterator(0, comparator.arrayLength(), i -> true, comparator::translate);
+  }
+
+  static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) {
+    return new IndexIterator(0, pageCount, filter, i -> i);
+  }
+
+  static PrimitiveIterator.OfInt filterTranslate(int arrayLength, IntPredicate filter, IntUnaryOperator translator) {
+    return new IndexIterator(0, arrayLength, filter, translator);
+  }
+
+  static PrimitiveIterator.OfInt rangeTranslate(int from, int to, IntUnaryOperator translator) {
+    return new IndexIterator(from, to + 1, i -> true, translator);
+  }
+
+  private IndexIterator(int startIndex, int endIndex, IntPredicate filter, IntUnaryOperator translator) {
+    this.endIndex = endIndex;
+    this.filter = filter;
+    this.translator = translator;
+    index = nextPageIndex(startIndex);
+  }
+
+  private int nextPageIndex(int startIndex) {
+    for (int i = startIndex; i < endIndex; ++i) {
+      if (filter.test(i)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return index >= 0;
+  }
+
+  @Override
+  public int nextInt() {
+    if (hasNext()) {
+      int ret = index;
+      index = nextPageIndex(index + 1);
+      return translator.applyAsInt(ret);
+    }
+    throw new NoSuchElementException();
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
index f6bd94b..2d19d27 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
 
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntList;
 
 class IntColumnIndexBuilder extends ColumnIndexBuilder {
-  private static class IntColumnIndex extends ColumnIndexBase {
+  private static class IntColumnIndex extends ColumnIndexBase<Integer> {
     private int[] minValues;
     private int[] maxValues;
 
@@ -57,6 +58,28 @@ class IntColumnIndexBuilder extends ColumnIndexBuilder {
     String getMaxValueAsString(int pageIndex) {
       return stringifier.stringify(maxValues[pageIndex]);
     }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+      return (Statistics<T>) new Statistics<Integer>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+    }
+
+    @Override
+    ValueComparator createValueComparator(Object value) {
+      final int v = (int) value;
+      return new ValueComparator() {
+        @Override
+        int compareValueToMin(int arrayIndex) {
+          return comparator.compare(v, minValues[arrayIndex]);
+        }
+
+        @Override
+        int compareValueToMax(int arrayIndex) {
+          return comparator.compare(v, maxValues[arrayIndex]);
+        }
+      };
+    }
   }
 
   private final IntList minValues = new IntArrayList();
@@ -72,18 +95,18 @@ class IntColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
-    minValues.add(min == null ? 0 : convert(min));
-    maxValues.add(max == null ? 0 : convert(max));
+    minValues.add(convert(min));
+    maxValues.add(convert(max));
   }
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add(min == null ? 0 : (int) min);
-    maxValues.add(max == null ? 0 : (int) max);
+    minValues.add((int) min);
+    maxValues.add((int) max);
   }
 
   @Override
-  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+  ColumnIndexBase<Integer> createColumnIndex(PrimitiveType type) {
     IntColumnIndex columnIndex = new IntColumnIndex(type);
     columnIndex.minValues = minValues.toIntArray();
     columnIndex.maxValues = maxValues.toIntArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
index 696602d..b0189b7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
 
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.filter2.predicate.Statistics;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveComparator;
 import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
 
 class LongColumnIndexBuilder extends ColumnIndexBuilder {
-  private static class LongColumnIndex extends ColumnIndexBase {
+  private static class LongColumnIndex extends ColumnIndexBase<Long> {
     private long[] minValues;
     private long[] maxValues;
 
@@ -57,6 +58,28 @@ class LongColumnIndexBuilder extends ColumnIndexBuilder {
     String getMaxValueAsString(int pageIndex) {
       return stringifier.stringify(maxValues[pageIndex]);
     }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+      return (Statistics<T>) new Statistics<Long>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+    }
+
+    @Override
+    ValueComparator createValueComparator(Object value) {
+      final long v = (long) value;
+      return new ValueComparator() {
+        @Override
+        int compareValueToMin(int arrayIndex) {
+          return comparator.compare(v, minValues[arrayIndex]);
+        }
+
+        @Override
+        int compareValueToMax(int arrayIndex) {
+          return comparator.compare(v, maxValues[arrayIndex]);
+        }
+      };
+    }
   }
 
   private final LongList minValues = new LongArrayList();
@@ -72,18 +95,18 @@ class LongColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
-    minValues.add(min == null ? 0 : convert(min));
-    maxValues.add(max == null ? 0 : convert(max));
+    minValues.add(convert(min));
+    maxValues.add(convert(max));
   }
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add(min == null ? 0 : (long) min);
-    maxValues.add(max == null ? 0 : (long) max);
+    minValues.add((long) min);
+    maxValues.add((long) max);
   }
 
   @Override
-  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+  ColumnIndexBase<Long> createColumnIndex(PrimitiveType type) {
     LongColumnIndex columnIndex = new LongColumnIndex(type);
     columnIndex.minValues = minValues.toLongArray();
     columnIndex.maxValues = maxValues.toLongArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
index fd99ef3..ba984eb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
@@ -49,4 +49,16 @@ public interface OffsetIndex {
    * @return the index of the first row in the page
    */
   public long getFirstRowIndex(int pageIndex);
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @param rowGroupRowCount
+   *          the total number of rows in the row-group
+   * @return the calculated index of the last row of the given page
+   */
+  public default long getLastRowIndex(int pageIndex, long rowGroupRowCount) {
+    int nextPageIndex = pageIndex + 1;
+    return (nextPageIndex >= getPageCount() ? rowGroupRowCount : getFirstRowIndex(nextPageIndex)) - 1;
+  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
new file mode 100644
index 0000000..007d753
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import java.util.PrimitiveIterator;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Filter implementation based on column indexes.
+ * No filtering will be applied for columns where no column index is available.
+ * Offset index is required for all the columns in the projection, therefore a {@link MissingOffsetIndexException} will
+ * be thrown from any {@code visit} methods if any of the required offset indexes is missing.
+ */
+public class ColumnIndexFilter implements Visitor<RowRanges> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ColumnIndexFilter.class);
+  private final ColumnIndexStore columnIndexStore;
+  private final Set<ColumnPath> columns;
+  private final long rowCount;
+  private RowRanges allRows;
+
+  /**
+   * Calculates the row ranges containing the indexes of the rows might match the specified filter.
+   *
+   * @param filter
+   *          to be used for filtering the rows
+   * @param columnIndexStore
+   *          the store for providing column/offset indexes
+   * @param paths
+   *          the paths of the columns used in the actual projection; a column not being part of the projection will be
+   *          handled as containing {@code null} values only even if the column has values written in the file
+   * @param rowCount
+   *          the total number of rows in the row-group
+   * @return the ranges of the possible matching row indexes; the returned ranges will contain all the rows if any of
+   *         the required offset index is missing
+   */
+  public static RowRanges calculateRowRanges(FilterCompat.Filter filter, ColumnIndexStore columnIndexStore,
+      Set<ColumnPath> paths, long rowCount) {
+    return filter.accept(new FilterCompat.Visitor<RowRanges>() {
+      @Override
+      public RowRanges visit(FilterPredicateCompat filterPredicateCompat) {
+        try {
+          return filterPredicateCompat.getFilterPredicate()
+              .accept(new ColumnIndexFilter(columnIndexStore, paths, rowCount));
+        } catch (MissingOffsetIndexException e) {
+          LOGGER.warn("Unable to do filtering", e);
+          return RowRanges.single(rowCount);
+        }
+      }
+
+      @Override
+      public RowRanges visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
+        return RowRanges.single(rowCount);
+      }
+
+      @Override
+      public RowRanges visit(NoOpFilter noOpFilter) {
+        return RowRanges.single(rowCount);
+      }
+    });
+  }
+
+  private ColumnIndexFilter(ColumnIndexStore columnIndexStore, Set<ColumnPath> paths, long rowCount) {
+    this.columnIndexStore = columnIndexStore;
+    this.columns = paths;
+    this.rowCount = rowCount;
+  }
+
+  private RowRanges allRows() {
+    if (allRows == null) {
+      allRows = RowRanges.single(rowCount);
+    }
+    return allRows;
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
+    return applyPredicate(eq.getColumn(), ci -> ci.visit(eq), eq.getValue() == null ? allRows() : RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
+    return applyPredicate(notEq.getColumn(), ci -> ci.visit(notEq),
+        notEq.getValue() == null ? RowRanges.EMPTY : allRows());
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
+    return applyPredicate(lt.getColumn(), ci -> ci.visit(lt), RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
+    return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq), RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
+    return applyPredicate(gt.getColumn(), ci -> ci.visit(gt), RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
+    return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq), RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(UserDefined<T, U> udp) {
+    return applyPredicate(udp.getColumn(), ci -> ci.visit(udp),
+        udp.getUserDefinedPredicate().keep(null) ? allRows() : RowRanges.EMPTY);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(
+      LogicalNotUserDefined<T, U> udp) {
+    return applyPredicate(udp.getUserDefined().getColumn(), ci -> ci.visit(udp),
+        udp.getUserDefined().getUserDefinedPredicate().keep(null) ? RowRanges.EMPTY : allRows());
+  }
+
+  private RowRanges applyPredicate(Column<?> column, Function<ColumnIndex, PrimitiveIterator.OfInt> func,
+      RowRanges rangesForMissingColumns) {
+    ColumnPath columnPath = column.getColumnPath();
+    if (!columns.contains(columnPath)) {
+      return rangesForMissingColumns;
+    }
+
+    OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath);
+    ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath);
+    if (ci == null) {
+      LOGGER.warn("No column index for column {} is available; Unable to filter on this column", columnPath);
+      return allRows();
+    }
+
+    return RowRanges.build(rowCount, func.apply(ci), oi);
+  }
+
+  @Override
+  public RowRanges visit(And and) {
+    return RowRanges.intersection(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public RowRanges visit(Or or) {
+    return RowRanges.union(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public RowRanges visit(Not not) {
+    throw new IllegalArgumentException(
+        "Predicates containing a NOT must be run through LogicalInverseRewriter. " + not);
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexStore.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexStore.java
new file mode 100644
index 0000000..c82861a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+/**
+ * Provides the {@link ColumnIndex} and {@link OffsetIndex} objects for a row-group.
+ */
+public interface ColumnIndexStore {
+
+  /**
+   * Exception thrown in case of an offset index is missing for any of the columns.
+   */
+  public static class MissingOffsetIndexException extends ParquetRuntimeException {
+    public MissingOffsetIndexException(ColumnPath path) {
+      super("No offset index for column " + path.toDotString() + " is available; Unable to do filtering");
+    }
+  }
+
+  /**
+   * @param column
+   *          the path of the column
+   * @return the column index for the column-chunk in the row-group or {@code null} if no column index is available
+   */
+  ColumnIndex getColumnIndex(ColumnPath column);
+
+  /**
+   * @param column
+   *          the path of the column
+   * @return the offset index for the column-chunk in the row-group
+   * @throws MissingOffsetIndexException
+   *           if the related offset index is missing
+   */
+  OffsetIndex getOffsetIndex(ColumnPath column) throws MissingOffsetIndexException;
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
new file mode 100644
index 0000000..a04e513
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+/**
+ * Class representing row ranges in a row-group. These row ranges are calculated as a result of the column index based
+ * filtering.
+ *
+ * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Collection, long)
+ */
+public class RowRanges {
+  private static class Range {
+
+    // Returns the union of the two ranges or null if they are not overlapped.
+    private static Range union(Range left, Range right) {
+      if (left.from <= right.from) {
+        if (left.to + 1 >= right.from) {
+          return new Range(left.from, Math.max(left.to, right.to));
+        }
+      } else if (right.to + 1 >= left.from) {
+        return new Range(right.from, Math.max(left.to, right.to));
+      }
+      return null;
+    }
+
+    // Returns the intersection of the two ranges of null if they are not overlapped.
+    private static Range intersection(Range left, Range right) {
+      if (left.from <= right.from) {
+        if (left.to >= right.from) {
+          return new Range(right.from, Math.min(left.to, right.to));
+        }
+      } else if (right.to >= left.from) {
+        return new Range(left.from, Math.min(left.to, right.to));
+      }
+      return null;
+    }
+
+    final long from;
+    final long to;
+
+    // Creates a range of [from, to] (from and to are inclusive; empty ranges are not valid)
+    Range(long from, long to) {
+      assert from <= to;
+      this.from = from;
+      this.to = to;
+    }
+
+    long count() {
+      return to - from + 1;
+    }
+
+    boolean isBefore(Range other) {
+      return to < other.from;
+    }
+
+    boolean isAfter(Range other) {
+      return from > other.to;
+    }
+
+    @Override
+    public String toString() {
+      return "[" + from + ", " + to + ']';
+    }
+  }
+
+  static final RowRanges EMPTY = new RowRanges();
+
+  static RowRanges single(long rowCount) {
+    RowRanges ranges = new RowRanges();
+    ranges.add(new Range(0, rowCount - 1));
+    return ranges;
+  }
+
+  static RowRanges build(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) {
+    RowRanges ranges = new RowRanges();
+    while (pageIndexes.hasNext()) {
+      int pageIndex = pageIndexes.nextInt();
+      ranges.add(new Range(offsetIndex.getFirstRowIndex(pageIndex), offsetIndex.getLastRowIndex(pageIndex, rowCount)));
+    }
+    return ranges;
+  }
+
+  static RowRanges union(RowRanges left, RowRanges right) {
+    RowRanges result = new RowRanges();
+    Iterator<Range> it1 = left.ranges.iterator();
+    Iterator<Range> it2 = right.ranges.iterator();
+    if (it2.hasNext()) {
+      Range range2 = it2.next();
+      while (it1.hasNext()) {
+        Range range1 = it1.next();
+        if (range1.isAfter(range2)) {
+          result.add(range2);
+          range2 = range1;
+          Iterator<Range> tmp = it1;
+          it1 = it2;
+          it2 = tmp;
+        } else {
+          result.add(range1);
+        }
+      }
+      result.add(range2);
+    } else {
+      it2 = it1;
+    }
+    while (it2.hasNext()) {
+      result.add(it2.next());
+    }
+
+    return result;
+  }
+
+  static RowRanges intersection(RowRanges left, RowRanges right) {
+    RowRanges result = new RowRanges();
+
+    int rightIndex = 0;
+    for (Range l : left.ranges) {
+      for (int i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+        Range r = right.ranges.get(i);
+        if (l.isBefore(r)) {
+          break;
+        } else if (l.isAfter(r)) {
+          rightIndex = i + 1;
+          continue;
+        }
+        result.add(Range.intersection(l, r));
+      }
+    }
+
+    return result;
+  }
+
+  private final List<Range> ranges = new ArrayList<>();
+
+  private RowRanges() {
+  }
+
+  /*
+   * Adds range to the end of the list of ranges. It maintains the disjunct ascending order of the ranges by trying to
+   * union the specified range to the last ranges if they are overlapping. The specified range shall be larger than the
+   * last one or might be overlapped with some of the last ones.
+   */
+  private void add(Range range) {
+    Range rangeToAdd = range;
+    for (int i = ranges.size() - 1; i >= 0; --i) {
+      Range last = ranges.get(i);
+      assert !last.isAfter(range);
+      Range u = Range.union(last, rangeToAdd);
+      if (u == null) {
+        break;
+      }
+      rangeToAdd = u;
+      ranges.remove(i);
+    }
+    ranges.add(rangeToAdd);
+  }
+
+  /**
+   * @return the number of rows in the ranges
+   */
+  public long rowCount() {
+    long cnt = 0;
+    for (Range range : ranges) {
+      cnt += range.count();
+    }
+    return cnt;
+  }
+
+  /**
+   * @return the ascending iterator of the row indexes contained in the ranges
+   */
+  public PrimitiveIterator.OfLong allRows() {
+    return new PrimitiveIterator.OfLong() {
+      private int currentRangeIndex = -1;
+      private Range currentRange;
+      private long next = findNext();
+
+      private long findNext() {
+        if (currentRange == null || next + 1 > currentRange.to) {
+          if (currentRangeIndex + 1 < ranges.size()) {
+            currentRange = ranges.get(++currentRangeIndex);
+            next = currentRange.from;
+          } else {
+            return -1;
+          }
+        } else {
+          ++next;
+        }
+        return next;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return next >= 0;
+      }
+
+      @Override
+      public long nextLong() {
+        long ret = next;
+        if (ret < 0) {
+          throw new NoSuchElementException();
+        }
+        next = findNext();
+        return ret;
+      }
+    };
+  }
+
+  /**
+   * @param from
+   *          the first row of the range to be checked for connection
+   * @param to
+   *          the last row of the range to be checked for connection
+   * @return {@code true} if the specified range is overlapping (have common elements) with one of the ranges
+   */
+  public boolean isOverlapping(long from, long to) {
+    return Collections.binarySearch(ranges, new Range(from, to),
+        (r1, r2) -> r1.isBefore(r2) ? -1 : r1.isAfter(r2) ? 1 : 0) >= 0;
+  }
+
+  @Override
+  public String toString() {
+    return ranges.toString();
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBoundaryOrder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBoundaryOrder.java
new file mode 100644
index 0000000..3d2a924
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBoundaryOrder.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.PrimitiveIterator;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnIndexBase;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+/**
+ * Tests the operator implementations in {@link BoundaryOrder}.
+ */
+public class TestBoundaryOrder {
+  private static class SpyValueComparatorBuilder extends ColumnIndexBase<Integer> {
+    class SpyValueComparator extends ValueComparator {
+      private final ColumnIndexBase<?>.ValueComparator delegate;
+      private int compareCount;
+
+      SpyValueComparator(ColumnIndexBase<?>.ValueComparator delegate) {
+        this.delegate = delegate;
+      }
+
+      int getCompareCount() {
+        return compareCount;
+      }
+
+      @Override
+      int arrayLength() {
+        return delegate.arrayLength();
+      }
+
+      @Override
+      int translate(int arrayIndex) {
+        return delegate.translate(arrayIndex);
+      }
+
+      @Override
+      int compareValueToMin(int arrayIndex) {
+        ++compareCount;
+        return delegate.compareValueToMin(arrayIndex);
+      }
+
+      @Override
+      int compareValueToMax(int arrayIndex) {
+        ++compareCount;
+        return delegate.compareValueToMax(arrayIndex);
+      }
+    }
+
+    private SpyValueComparatorBuilder() {
+      super(TYPE);
+    }
+
+    SpyValueComparator build(ColumnIndexBase<?>.ValueComparator comparator) {
+      return new SpyValueComparator(comparator);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int arrayIndex) {
+      throw new Error("Shall never be invoked");
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int arrayIndex) {
+      throw new Error("Shall never be invoked");
+    }
+
+    @Override
+    String getMinValueAsString(int arrayIndex) {
+      throw new Error("Shall never be invoked");
+    }
+
+    @Override
+    String getMaxValueAsString(int arrayIndex) {
+      throw new Error("Shall never be invoked");
+    }
+
+    @Override
+    <T extends Comparable<T>> org.apache.parquet.filter2.predicate.Statistics<T> createStats(int arrayIndex) {
+      throw new Error("Shall never be invoked");
+    }
+
+    @Override
+    ColumnIndexBase<Integer>.ValueComparator createValueComparator(Object value) {
+      throw new Error("Shall never be invoked");
+    }
+  }
+
+  private static class ExecStats {
+    private long linearTime;
+    private long binaryTime;
+    private int linearCompareCount;
+    private int binaryCompareCount;
+    private int execCount;
+
+    IntList measureLinear(Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> op,
+        ColumnIndexBase<?>.ValueComparator comparator) {
+      IntList list = new IntArrayList(comparator.arrayLength());
+      SpyValueComparatorBuilder.SpyValueComparator spyComparator = SPY_COMPARATOR_BUILDER.build(comparator);
+      long start = System.nanoTime();
+      op.apply(spyComparator).forEachRemaining((int value) -> list.add(value));
+      linearTime = System.nanoTime() - start;
+      linearCompareCount += spyComparator.getCompareCount();
+      return list;
+    }
+
+    IntList measureBinary(Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> op,
+        ColumnIndexBase<?>.ValueComparator comparator) {
+      IntList list = new IntArrayList(comparator.arrayLength());
+      SpyValueComparatorBuilder.SpyValueComparator spyComparator = SPY_COMPARATOR_BUILDER.build(comparator);
+      long start = System.nanoTime();
+      op.apply(spyComparator).forEachRemaining((int value) -> list.add(value));
+      binaryTime = System.nanoTime() - start;
+      binaryCompareCount += spyComparator.getCompareCount();
+      return list;
+    }
+
+    void add(ExecStats stats) {
+      linearTime += stats.linearTime;
+      linearCompareCount += stats.linearCompareCount;
+      binaryTime += stats.binaryTime;
+      binaryCompareCount += stats.binaryCompareCount;
+      ++execCount;
+    }
+
+    @Override
+    public String toString() {
+      double linearMs = linearTime / 1_000_000.0;
+      double binaryMs = binaryTime / 1_000_000.0;
+      return String.format(
+          "Linear search: %.2fms (avg: %.6fms); number of compares: %d (avg: %d) [100.00%%]%n"
+              + "Binary search: %.2fms (avg: %.6fms); number of compares: %d (avg: %d) [%.2f%%]",
+          linearMs, linearMs / execCount, linearCompareCount, linearCompareCount / execCount,
+          binaryMs, binaryMs / execCount, binaryCompareCount, binaryCompareCount / execCount,
+          100.0 * binaryCompareCount / linearCompareCount);
+    }
+  }
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestBoundaryOrder.class);
+  private static final PrimitiveType TYPE = Types.required(PrimitiveTypeName.INT32).named("test_int32");
+  private static final int FROM = -15;
+  private static final int TO = 15;
+  private static final ColumnIndexBase<?> ASCENDING;
+  private static final ColumnIndexBase<?> DESCENDING;
+  private static final int SINGLE_FROM = -1;
+  private static final int SINGLE_TO = 1;
+  private static final ColumnIndexBase<?> SINGLE;
+  private static final Random RANDOM = new Random(42);
+  private static final int RAND_FROM = -2000;
+  private static final int RAND_TO = 2000;
+  private static final int RAND_COUNT = 2000;
+  private static final ColumnIndexBase<?> RAND_ASCENDING;
+  private static final ColumnIndexBase<?> RAND_DESCENDING;
+  private static final SpyValueComparatorBuilder SPY_COMPARATOR_BUILDER = new SpyValueComparatorBuilder();
+  static {
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+    builder.add(stats(FROM, -12));
+    builder.add(stats(-10, -8));
+    builder.add(stats(-8, -4));
+    builder.add(stats(-6, -4));
+    builder.add(stats(-6, -3));
+    builder.add(stats(-6, -3));
+    builder.add(stats(-6, -3));
+    builder.add(stats(0, 3));
+    builder.add(stats(3, 5));
+    builder.add(stats(3, 5));
+    builder.add(stats(5, 8));
+    builder.add(stats(10, TO));
+    ASCENDING = (ColumnIndexBase<?>) builder.build();
+
+    builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+    builder.add(stats(10, TO));
+    builder.add(stats(5, 8));
+    builder.add(stats(3, 5));
+    builder.add(stats(3, 5));
+    builder.add(stats(0, 3));
+    builder.add(stats(-6, -3));
+    builder.add(stats(-6, -3));
+    builder.add(stats(-6, -3));
+    builder.add(stats(-6, -4));
+    builder.add(stats(-8, -4));
+    builder.add(stats(-10, -8));
+    builder.add(stats(FROM, -12));
+    DESCENDING = (ColumnIndexBase<?>) builder.build();
+
+    builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+    builder.add(stats(SINGLE_FROM, SINGLE_TO));
+    SINGLE = (ColumnIndexBase<?>) builder.build();
+
+    builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+    for (PrimitiveIterator.OfInt it = IntStream.generate(() -> RANDOM.nextInt(RAND_TO - RAND_FROM + 1) + RAND_FROM)
+        .limit(RAND_COUNT * 2).sorted().iterator(); it.hasNext();) {
+      builder.add(stats(it.nextInt(), it.nextInt()));
+    }
+    RAND_ASCENDING = (ColumnIndexBase<?>) builder.build();
+
+    builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+    for (Iterator<Integer> it = IntStream.generate(() -> RANDOM.nextInt(RAND_TO - RAND_FROM + 1) + RAND_FROM)
+        .limit(RAND_COUNT * 2).mapToObj(Integer::valueOf).sorted(Collections.reverseOrder()).iterator(); it
+            .hasNext();) {
+      builder.add(stats(it.next(), it.next()));
+    }
+    RAND_DESCENDING = (ColumnIndexBase<?>) builder.build();
+  }
+
+  private static Statistics<?> stats(int min, int max) {
+    Statistics<?> stats = Statistics.createStats(TYPE);
+    stats.updateStats(min);
+    stats.updateStats(max);
+    return stats;
+  }
+
+  private static ExecStats validateOperator(String msg,
+      Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> validatorOp,
+      Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> actualOp,
+      ColumnIndexBase<?>.ValueComparator comparator) {
+    ExecStats stats = new ExecStats();
+
+    IntList expected = stats.measureLinear(validatorOp, comparator);
+    IntList actual = stats.measureBinary(actualOp, comparator);
+
+    Assert.assertEquals(msg, expected, actual);
+
+    return stats;
+  }
+
+  @Test
+  public void testEq() {
+    for (int i = FROM - 1; i <= TO + 1; ++i) {
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::eq,
+          BoundaryOrder.ASCENDING::eq,
+          ASCENDING.createValueComparator(i));
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::eq,
+          BoundaryOrder.DESCENDING::eq,
+          DESCENDING.createValueComparator(i));
+    }
+    for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+      ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::eq,
+          BoundaryOrder.ASCENDING::eq,
+          singleComparator);
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::eq,
+          BoundaryOrder.DESCENDING::eq,
+          singleComparator);
+    }
+    ExecStats stats = new ExecStats();
+    for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::eq,
+          BoundaryOrder.ASCENDING::eq,
+          RAND_ASCENDING.createValueComparator(i)));
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::eq,
+          BoundaryOrder.DESCENDING::eq,
+          RAND_DESCENDING.createValueComparator(i)));
+    }
+    LOGGER.info("Executed eq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+        RAND_TO - RAND_FROM + 2, stats);
+  }
+
+  @Test
+  public void testGt() {
+    for (int i = FROM - 1; i <= TO + 1; ++i) {
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::gt,
+          BoundaryOrder.ASCENDING::gt,
+          ASCENDING.createValueComparator(i));
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::gt,
+          BoundaryOrder.DESCENDING::gt,
+          DESCENDING.createValueComparator(i));
+    }
+    for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+      ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::gt,
+          BoundaryOrder.ASCENDING::gt,
+          singleComparator);
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::gt,
+          BoundaryOrder.DESCENDING::gt,
+          singleComparator);
+    }
+    ExecStats stats = new ExecStats();
+    for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::gt,
+          BoundaryOrder.ASCENDING::gt,
+          RAND_ASCENDING.createValueComparator(i)));
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::gt,
+          BoundaryOrder.DESCENDING::gt,
+          RAND_DESCENDING.createValueComparator(i)));
+    }
+    LOGGER.info("Executed gt on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+        RAND_TO - RAND_FROM + 2, stats);
+  }
+
+  @Test
+  public void testGtEq() {
+    for (int i = FROM - 1; i <= TO + 1; ++i) {
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::gtEq,
+          BoundaryOrder.ASCENDING::gtEq,
+          ASCENDING.createValueComparator(i));
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::gtEq,
+          BoundaryOrder.DESCENDING::gtEq,
+          DESCENDING.createValueComparator(i));
+    }
+    for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+      ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::gtEq,
+          BoundaryOrder.ASCENDING::gtEq,
+          singleComparator);
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::gtEq,
+          BoundaryOrder.DESCENDING::gtEq,
+          singleComparator);
+    }
+    ExecStats stats = new ExecStats();
+    for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::gtEq,
+          BoundaryOrder.ASCENDING::gtEq,
+          RAND_ASCENDING.createValueComparator(i)));
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::gtEq,
+          BoundaryOrder.DESCENDING::gtEq,
+          RAND_DESCENDING.createValueComparator(i)));
+    }
+    LOGGER.info("Executed gtEq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+        RAND_TO - RAND_FROM + 2, stats);
+  }
+
+  @Test
+  public void testLt() {
+    for (int i = FROM - 1; i <= TO + 1; ++i) {
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::lt,
+          BoundaryOrder.ASCENDING::lt,
+          ASCENDING.createValueComparator(i));
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::lt,
+          BoundaryOrder.DESCENDING::lt,
+          DESCENDING.createValueComparator(i));
+    }
+    for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+      ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::lt,
+          BoundaryOrder.ASCENDING::lt,
+          singleComparator);
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::lt,
+          BoundaryOrder.DESCENDING::lt,
+          singleComparator);
+    }
+    ExecStats stats = new ExecStats();
+    for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::lt,
+          BoundaryOrder.ASCENDING::lt,
+          RAND_ASCENDING.createValueComparator(i)));
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::lt,
+          BoundaryOrder.DESCENDING::lt,
+          RAND_DESCENDING.createValueComparator(i)));
+    }
+    LOGGER.info("Executed lt on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+        RAND_TO - RAND_FROM + 2, stats);
+  }
+
+  @Test
+  public void testLtEq() {
+    for (int i = FROM - 1; i <= TO + 1; ++i) {
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::ltEq,
+          BoundaryOrder.ASCENDING::ltEq,
+          ASCENDING.createValueComparator(i));
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::ltEq,
+          BoundaryOrder.DESCENDING::ltEq,
+          DESCENDING.createValueComparator(i));
+    }
+    for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+      ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::ltEq,
+          BoundaryOrder.ASCENDING::ltEq,
+          singleComparator);
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::ltEq,
+          BoundaryOrder.DESCENDING::ltEq,
+          singleComparator);
+    }
+    ExecStats stats = new ExecStats();
+    for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::ltEq,
+          BoundaryOrder.ASCENDING::ltEq,
+          RAND_ASCENDING.createValueComparator(i)));
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::ltEq,
+          BoundaryOrder.DESCENDING::ltEq,
+          RAND_DESCENDING.createValueComparator(i)));
+    }
+    LOGGER.info("Executed ltEq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+        RAND_TO - RAND_FROM + 2, stats);
+  }
+
+  @Test
+  public void testNotEq() {
+    for (int i = -16; i <= 16; ++i) {
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::notEq,
+          BoundaryOrder.ASCENDING::notEq,
+          ASCENDING.createValueComparator(i));
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::notEq,
+          BoundaryOrder.DESCENDING::notEq,
+          DESCENDING.createValueComparator(i));
+    }
+    for (int i = FROM - 1; i <= TO + 1; ++i) {
+      ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+      validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::notEq,
+          BoundaryOrder.ASCENDING::notEq,
+          singleComparator);
+      validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::notEq,
+          BoundaryOrder.DESCENDING::notEq,
+          singleComparator);
+    }
+    ExecStats stats = new ExecStats();
+    for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+          BoundaryOrder.UNORDERED::notEq,
+          BoundaryOrder.ASCENDING::notEq,
+          RAND_ASCENDING.createValueComparator(i)));
+      stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+          BoundaryOrder.UNORDERED::notEq,
+          BoundaryOrder.DESCENDING::notEq,
+          RAND_DESCENDING.createValueComparator(i)));
+    }
+    LOGGER.info("Executed notEq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+        RAND_TO - RAND_FROM + 2, stats);
+  }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 7a5745e..e4655b2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -19,6 +19,20 @@
 package org.apache.parquet.internal.column.columnindex;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
 import static org.apache.parquet.schema.OriginalType.DECIMAL;
 import static org.apache.parquet.schema.OriginalType.UINT_8;
 import static org.apache.parquet.schema.OriginalType.UTF8;
@@ -39,19 +53,19 @@ import static org.junit.Assert.fail;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.internal.column.columnindex.BinaryColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.BooleanColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.DoubleColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.FloatColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.IntColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.LongColumnIndexBuilder;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
@@ -62,12 +76,167 @@ import org.junit.Test;
  */
 public class TestColumnIndexBuilder {
 
+  public static class BinaryDecimalIsNullOrZeroUdp extends UserDefinedPredicate<Binary> {
+    private static final Binary ZERO = decimalBinary("0.0");
+
+    @Override
+    public boolean keep(Binary value) {
+      return value == null || value.equals(ZERO);
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      return cmp.compare(statistics.getMin(), ZERO) > 0 || cmp.compare(statistics.getMax(), ZERO) < 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      return cmp.compare(statistics.getMin(), ZERO) == 0 && cmp.compare(statistics.getMax(), ZERO) == 0;
+    }
+  }
+
+  public static class BinaryUtf8StartsWithB extends UserDefinedPredicate<Binary> {
+    private static final Binary B = stringBinary("B");
+    private static final Binary C = stringBinary("C");
+
+    @Override
+    public boolean keep(Binary value) {
+      return value != null && value.length() > 0 && value.getBytesUnsafe()[0] == 'B';
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      return cmp.compare(statistics.getMin(), C) >= 0 || cmp.compare(statistics.getMax(), B) < 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      return cmp.compare(statistics.getMin(), B) >= 0 && cmp.compare(statistics.getMax(), C) < 0;
+    }
+  }
+
+  public static class BooleanIsTrueOrNull extends UserDefinedPredicate<Boolean> {
+    @Override
+    public boolean keep(Boolean value) {
+      return value == null || value;
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Boolean> statistics) {
+      return statistics.getComparator().compare(statistics.getMax(), true) != 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Boolean> statistics) {
+      return statistics.getComparator().compare(statistics.getMin(), true) == 0;
+    }
+  }
+
+  public static class DoubleIsInteger extends UserDefinedPredicate<Double> {
+    @Override
+    public boolean keep(Double value) {
+      return value != null && Math.floor(value) == value;
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Double> statistics) {
+      double min = statistics.getMin();
+      double max = statistics.getMax();
+      Comparator<Double> cmp = statistics.getComparator();
+      return cmp.compare(Math.floor(min), Math.floor(max)) == 0 && cmp.compare(Math.floor(min), min) != 0
+          && cmp.compare(Math.floor(max), max) != 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Double> statistics) {
+      double min = statistics.getMin();
+      double max = statistics.getMax();
+      Comparator<Double> cmp = statistics.getComparator();
+      return cmp.compare(min, max) == 0 && cmp.compare(Math.floor(min), min) == 0;
+    }
+  }
+
+  public static class FloatIsInteger extends UserDefinedPredicate<Float> {
+    private static float floor(float value) {
+      return (float) Math.floor(value);
+    }
+
+    @Override
+    public boolean keep(Float value) {
+      return value != null && Math.floor(value) == value;
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Float> statistics) {
+      float min = statistics.getMin();
+      float max = statistics.getMax();
+      Comparator<Float> cmp = statistics.getComparator();
+      return cmp.compare(floor(min), floor(max)) == 0 && cmp.compare(floor(min), min) != 0
+          && cmp.compare(floor(max), max) != 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Float> statistics) {
+      float min = statistics.getMin();
+      float max = statistics.getMax();
+      Comparator<Float> cmp = statistics.getComparator();
+      return cmp.compare(min, max) == 0 && cmp.compare(floor(min), min) == 0;
+    }
+  }
+
+  public static class IntegerIsDivisableWith3 extends UserDefinedPredicate<Integer> {
+    @Override
+    public boolean keep(Integer value) {
+      return value != null && value % 3 == 0;
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Integer> statistics) {
+      int min = statistics.getMin();
+      int max = statistics.getMax();
+      return min % 3 != 0 && max % 3 != 0 && max - min < 3;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Integer> statistics) {
+      int min = statistics.getMin();
+      int max = statistics.getMax();
+      return min == max && min % 3 == 0;
+    }
+  }
+
+  public static class LongIsDivisableWith3 extends UserDefinedPredicate<Long> {
+    @Override
+    public boolean keep(Long value) {
+      return value != null && value % 3 == 0;
+    }
+
+    @Override
+    public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Long> statistics) {
+      long min = statistics.getMin();
+      long max = statistics.getMax();
+      return min % 3 != 0 && max % 3 != 0 && max - min < 3;
+    }
+
+    @Override
+    public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Long> statistics) {
+      long min = statistics.getMin();
+      long max = statistics.getMax();
+      return min == max && min % 3 == 0;
+    }
+  }
+
   @Test
   public void testBuildBinaryDecimal() {
     PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
     assertNull(builder.build());
+    BinaryColumn col = binaryColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
@@ -102,6 +271,16 @@ public class TestColumnIndexBuilder {
         null,
         null,
         decimalBinary("87656273"));
+    assertCorrectFiltering(columnIndex, eq(col, decimalBinary("0.0")), 1, 4);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 0, 1, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 7);
+    assertCorrectFiltering(columnIndex, gt(col, decimalBinary("2348978.45")), 1);
+    assertCorrectFiltering(columnIndex, gtEq(col, decimalBinary("2348978.45")), 1, 4);
+    assertCorrectFiltering(columnIndex, lt(col, decimalBinary("-234.23")), 4);
+    assertCorrectFiltering(columnIndex, ltEq(col, decimalBinary("-234.23")), 2, 4);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 1, 2, 4, 7);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -137,6 +316,16 @@ public class TestColumnIndexBuilder {
         null,
         decimalBinary("1234567890.12"),
         null);
+    assertCorrectFiltering(columnIndex, eq(col, decimalBinary("87656273")), 2, 4);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 3, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 0, 1, 2, 3, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 6);
+    assertCorrectFiltering(columnIndex, gt(col, decimalBinary("87656273")), 6);
+    assertCorrectFiltering(columnIndex, gtEq(col, decimalBinary("87656273")), 2, 4, 6);
+    assertCorrectFiltering(columnIndex, lt(col, decimalBinary("-0.17")), 1);
+    assertCorrectFiltering(columnIndex, ltEq(col, decimalBinary("-0.17")), 1, 2);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 2, 3, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 1, 2, 4, 6);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -172,6 +361,16 @@ public class TestColumnIndexBuilder {
         decimalBinary("-0.17"),
         null,
         decimalBinary("-9999293.23"));
+    assertCorrectFiltering(columnIndex, eq(col, decimalBinary("1234567890.12")), 2, 4);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("0.0")), 0, 1, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 2, 4, 5, 7);
+    assertCorrectFiltering(columnIndex, gt(col, decimalBinary("1234567890.12")));
+    assertCorrectFiltering(columnIndex, gtEq(col, decimalBinary("1234567890.12")), 2, 4);
+    assertCorrectFiltering(columnIndex, lt(col, decimalBinary("-0.17")), 7);
+    assertCorrectFiltering(columnIndex, ltEq(col, decimalBinary("-0.17")), 5, 7);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 5, 6);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 2, 4, 5, 7);
   }
 
   @Test
@@ -180,6 +379,7 @@ public class TestColumnIndexBuilder {
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
     assertNull(builder.build());
+    BinaryColumn col = binaryColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
@@ -214,6 +414,16 @@ public class TestColumnIndexBuilder {
         stringBinary("Dent"),
         stringBinary("Beeblebrox"),
         null);
+    assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 5, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Beeblebrox")), 0, 1, 2, 3, 4, 5, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, stringBinary("Prefect")), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, lt(col, stringBinary("Dent")), 4, 6);
+    assertCorrectFiltering(columnIndex, ltEq(col, stringBinary("Dent")), 4, 5, 6);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryUtf8StartsWithB.class), 4, 6);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 7);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -249,6 +459,16 @@ public class TestColumnIndexBuilder {
         null,
         stringBinary("Slartibartfast"),
         null);
+    assertCorrectFiltering(columnIndex, eq(col, stringBinary("Jeltz")), 3, 4);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 4, 5, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Slartibartfast")), 0, 1, 2, 3, 4, 5, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 3, 4, 6);
+    assertCorrectFiltering(columnIndex, gt(col, stringBinary("Marvin")), 4, 6);
+    assertCorrectFiltering(columnIndex, gtEq(col, stringBinary("Marvin")), 4, 6);
+    assertCorrectFiltering(columnIndex, lt(col, stringBinary("Dent")), 0);
+    assertCorrectFiltering(columnIndex, ltEq(col, stringBinary("Dent")), 0, 3, 4);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryUtf8StartsWithB.class), 0);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 6, 7);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -284,6 +504,16 @@ public class TestColumnIndexBuilder {
         null,
         null,
         stringBinary("Beeblebrox"));
+    assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 3);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 1, 2, 3, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 4, 7);
+    assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1);
+    assertCorrectFiltering(columnIndex, gtEq(col, stringBinary("Prefect")), 1, 3);
+    assertCorrectFiltering(columnIndex, lt(col, stringBinary("Marvin")), 3, 4, 7);
+    assertCorrectFiltering(columnIndex, ltEq(col, stringBinary("Marvin")), 3, 4, 7);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryUtf8StartsWithB.class), 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 6, 7);
   }
 
   @Test
@@ -335,11 +565,68 @@ public class TestColumnIndexBuilder {
   }
 
   @Test
+  public void testFilterWithoutNullCounts() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
+        BoundaryOrder.ASCENDING,
+        asList(true, true, false, false, true, false, true, false),
+        null,
+        toBBList(
+            null,
+            null,
+            stringBinary("Beeblebrox"),
+            stringBinary("Dent"),
+            null,
+            stringBinary("Jeltz"),
+            null,
+            stringBinary("Slartibartfast")),
+        toBBList(
+            null,
+            null,
+            stringBinary("Dent"),
+            stringBinary("Dent"),
+            null,
+            stringBinary("Prefect"),
+            null,
+            stringBinary("Slartibartfast")));
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertNull(columnIndex.getNullCounts());
+    assertCorrectNullPages(columnIndex, true, true, false, false, true, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        null,
+        stringBinary("Dent"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Prefect"),
+        null,
+        stringBinary("Slartibartfast"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        null,
+        stringBinary("Beeblebrox"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Jeltz"),
+        null,
+        stringBinary("Slartibartfast"));
+
+    BinaryColumn col = binaryColumn("test_col");
+    assertCorrectFiltering(columnIndex, eq(col, stringBinary("Dent")), 2, 3);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 1, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 2, 3, 5, 7);
+    assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 4, 5, 6, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 2, 3, 5, 7);
+  }
+
+  @Test
   public void testBuildBoolean() {
     PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
     assertNull(builder.build());
+    BooleanColumn col = booleanColumn("test_col");
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     StatsBuilder sb = new StatsBuilder();
@@ -356,6 +643,12 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, false, false, false, true, false);
     assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
     assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
+    assertCorrectFiltering(columnIndex, eq(col, true), 0, 1, 2);
+    assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4);
+    assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 1, 2, 3);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BooleanIsTrueOrNull.class)), 0, 1, 4);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -374,6 +667,12 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
     assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
+    assertCorrectFiltering(columnIndex, eq(col, true), 4, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BooleanIsTrueOrNull.class)), 1, 4, 5);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -392,6 +691,12 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, true, null, null, true, false, null);
     assertCorrectValues(columnIndex.getMinValues(), null, true, null, null, false, false, null);
+    assertCorrectFiltering(columnIndex, eq(col, true), 1, 4);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, true), 0, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 1, 2, 3, 4, 5, 6);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, BooleanIsTrueOrNull.class)), 4, 5);
   }
 
   @Test
@@ -416,6 +721,7 @@ public class TestColumnIndexBuilder {
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
     assertNull(builder.build());
+    DoubleColumn col = doubleColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, -4.2, -4.1));
@@ -432,6 +738,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
     assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
     assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
+    assertCorrectFiltering(columnIndex, eq(col, 0.0), 1, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2.2), 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, lt(col, -4.2), 1, 5);
+    assertCorrectFiltering(columnIndex, ltEq(col, -4.2), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, DoubleIsInteger.class), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, DoubleIsInteger.class)), 0, 1, 2, 3, 4, 5);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -452,6 +768,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
+    assertCorrectFiltering(columnIndex, eq(col, 0.0), 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, 0.0), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, gt(col, 2.99999), 7);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2.99999), 5, 7);
+    assertCorrectFiltering(columnIndex, lt(col, -234.6), 1, 2);
+    assertCorrectFiltering(columnIndex, ltEq(col, -234.6), 1, 2, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, DoubleIsInteger.class), 1, 5, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, DoubleIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -472,6 +798,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
     assertCorrectValues(columnIndex.getMaxValues(), null, 532.3, null, 234.7, null, 234.69, null, null, -3.0);
     assertCorrectValues(columnIndex.getMinValues(), null, 345.2, null, 234.6, null, -2.99999, null, null, -42.83);
+    assertCorrectFiltering(columnIndex, eq(col, 234.6), 3, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 234.69), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, lt(col, -2.99999), 8);
+    assertCorrectFiltering(columnIndex, ltEq(col, -2.99999), 5, 8);
+    assertCorrectFiltering(columnIndex, userDefined(col, DoubleIsInteger.class), 1, 5, 8);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, DoubleIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
   }
 
   @Test
@@ -496,6 +832,7 @@ public class TestColumnIndexBuilder {
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
     assertNull(builder.build());
+    FloatColumn col = floatColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, -4.2f, -4.1f));
@@ -512,6 +849,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
     assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
     assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
+    assertCorrectFiltering(columnIndex, eq(col, 0.0f), 1, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2.2f), 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, lt(col, 0.0f), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, ltEq(col, 1.9f), 0, 1, 4, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, FloatIsInteger.class), 1, 4, 5);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, FloatIsInteger.class)), 0, 1, 2, 3, 4, 5);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -532,6 +879,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
+    assertCorrectFiltering(columnIndex, eq(col, 0.0f), 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, gt(col, 2.2f), 5, 7);
+    assertCorrectFiltering(columnIndex, gtEq(col, -234.7f), 2, 5, 7);
+    assertCorrectFiltering(columnIndex, lt(col, -234.6f), 1, 2);
+    assertCorrectFiltering(columnIndex, ltEq(col, -234.6f), 1, 2, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, FloatIsInteger.class), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, FloatIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -552,6 +909,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
     assertCorrectValues(columnIndex.getMaxValues(), null, 532.3f, null, 234.7f, null, 234.6f, null, null, -3.0f);
     assertCorrectValues(columnIndex.getMinValues(), null, 345.2f, null, 234.6f, null, -2.99999f, null, null, -42.83f);
+    assertCorrectFiltering(columnIndex, eq(col, 234.65f), 3);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2.2f), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, lt(col, 0.0f), 5, 8);
+    assertCorrectFiltering(columnIndex, ltEq(col, 0.0f), 5, 8);
+    assertCorrectFiltering(columnIndex, userDefined(col, FloatIsInteger.class), 1, 5, 8);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, FloatIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
   }
 
   @Test
@@ -576,6 +943,7 @@ public class TestColumnIndexBuilder {
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
     assertNull(builder.build());
+    IntColumn col = intColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, -4, 10));
@@ -592,6 +960,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
     assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
     assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
+    assertCorrectFiltering(columnIndex, eq(col, 2), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, lt(col, 2), 0, 1, 4, 5);
+    assertCorrectFiltering(columnIndex, ltEq(col, 2), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -612,6 +990,17 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
+    assertCorrectFiltering(columnIndex, eq(col, 2), 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, gt(col, 2), 7);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2), 5, 7);
+    assertCorrectFiltering(columnIndex, lt(col, 2), 1, 2, 5);
+    assertCorrectFiltering(columnIndex, ltEq(col, 2), 1, 2, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+        8);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -632,6 +1021,17 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
     assertCorrectValues(columnIndex.getMaxValues(), null, 532, null, 234, null, 42, null, null, -3);
     assertCorrectValues(columnIndex.getMinValues(), null, 345, null, 42, null, -2, null, null, -42);
+    assertCorrectFiltering(columnIndex, eq(col, 2), 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, gt(col, 2), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, lt(col, 2), 5, 8);
+    assertCorrectFiltering(columnIndex, ltEq(col, 2), 5, 8);
+    assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+        8);
   }
 
   @Test
@@ -656,6 +1056,7 @@ public class TestColumnIndexBuilder {
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
     assertNull(builder.build());
+    IntColumn col = intColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, 4, 10));
@@ -672,6 +1073,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
     assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
     assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
+    assertCorrectFiltering(columnIndex, eq(col, 2), 2, 4);
+    assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 4, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, lt(col, 0xEF), 0, 1, 2, 4);
+    assertCorrectFiltering(columnIndex, ltEq(col, 0xEF), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 0, 1, 4, 5);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -692,6 +1103,17 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
     assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
+    assertCorrectFiltering(columnIndex, eq(col, 2), 2);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, gt(col, 0xEE), 7);
+    assertCorrectFiltering(columnIndex, gtEq(col, 0xEE), 5, 7);
+    assertCorrectFiltering(columnIndex, lt(col, 42), 1, 2);
+    assertCorrectFiltering(columnIndex, ltEq(col, 42), 1, 2, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+        8);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -712,6 +1134,17 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
     assertCorrectValues(columnIndex.getMaxValues(), null, 0xFF, null, 0xEF, null, 0xEE, null, null, 41);
     assertCorrectValues(columnIndex.getMinValues(), null, 0xFF, null, 0xEA, null, 42, null, null, 0);
+    assertCorrectFiltering(columnIndex, eq(col, 0xAB), 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, 0xFF), 0, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, gt(col, 0xFF));
+    assertCorrectFiltering(columnIndex, gtEq(col, 0xFF), 1);
+    assertCorrectFiltering(columnIndex, lt(col, 42), 8);
+    assertCorrectFiltering(columnIndex, ltEq(col, 42), 5, 8);
+    assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 2, 3, 4, 5, 6, 7,
+        8);
   }
 
   @Test
@@ -720,6 +1153,7 @@ public class TestColumnIndexBuilder {
     ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
     assertNull(builder.build());
+    LongColumn col = longColumn("test_col");
 
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, -4l, 10l));
@@ -736,6 +1170,16 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
     assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
     assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
+    assertCorrectFiltering(columnIndex, eq(col, 0l), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+    assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, gt(col, 2l), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2l), 0, 1, 2, 4, 5);
+    assertCorrectFiltering(columnIndex, lt(col, -21l));
+    assertCorrectFiltering(columnIndex, ltEq(col, -21l), 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, LongIsDivisableWith3.class), 0, 1, 5);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, LongIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -756,6 +1200,17 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
     assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
+    assertCorrectFiltering(columnIndex, eq(col, -42l), 2, 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, -42l), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, gt(col, 2l), 7);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2l), 5, 7);
+    assertCorrectFiltering(columnIndex, lt(col, -42l), 1, 2);
+    assertCorrectFiltering(columnIndex, ltEq(col, -42l), 1, 2, 5);
+    assertCorrectFiltering(columnIndex, userDefined(col, LongIsDivisableWith3.class), 1, 2, 5, 7);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, LongIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+        8);
 
     builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
@@ -776,6 +1231,17 @@ public class TestColumnIndexBuilder {
     assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
     assertCorrectValues(columnIndex.getMaxValues(), null, 532l, null, 234l, null, 42l, null, null, -3l);
     assertCorrectValues(columnIndex.getMinValues(), null, 345l, null, 42l, null, -2l, null, null, -42l);
+    assertCorrectFiltering(columnIndex, eq(col, 0l), 5);
+    assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+    assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, gt(col, 2l), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, gtEq(col, 2l), 1, 3, 5);
+    assertCorrectFiltering(columnIndex, lt(col, -42l));
+    assertCorrectFiltering(columnIndex, ltEq(col, -42l), 8);
+    assertCorrectFiltering(columnIndex, userDefined(col, LongIsDivisableWith3.class), 1, 3, 5, 8);
+    assertCorrectFiltering(columnIndex, invert(userDefined(col, LongIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+        8);
   }
 
   @Test
@@ -1034,4 +1500,8 @@ public class TestColumnIndexBuilder {
       return minMaxSize;
     }
   }
+
+  private static void assertCorrectFiltering(ColumnIndex ci, FilterPredicate predicate, int... expectedIndexes) {
+    TestIndexIterator.assertEquals(predicate.accept(ci), expectedIndexes);
+  }
 }
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
new file mode 100644
index 0000000..d9047f2
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.util.Arrays;
+import java.util.PrimitiveIterator;
+
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+/**
+ * Unit test for {@link IndexIterator}.
+ */
+public class TestIndexIterator {
+  @Test
+  public void testAll() {
+    assertEquals(IndexIterator.all(10), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+  }
+
+  @Test
+  public void testFilter() {
+    assertEquals(IndexIterator.filter(30, value -> value % 3 == 0), 0, 3, 6, 9, 12, 15, 18, 21, 24, 27);
+  }
+
+  @Test
+  public void testFilterTranslate() {
+    assertEquals(IndexIterator.filterTranslate(20, value -> value < 5, Math::negateExact), 0, -1, -2, -3, -4);
+  }
+
+  @Test
+  public void testRangeTranslate() {
+    assertEquals(IndexIterator.rangeTranslate(11, 18, i -> i - 10), 1, 2, 3, 4, 5, 6, 7, 8);
+  }
+
+  static void assertEquals(PrimitiveIterator.OfInt actualIt, int... expectedValues) {
+    IntList actualList = new IntArrayList();
+    actualIt.forEachRemaining((int value) -> actualList.add(value));
+    int[] actualValues = actualList.toIntArray();
+    assertArrayEquals(
+        "ExpectedValues: " + Arrays.toString(expectedValues) + " ActualValues: " + Arrays.toString(actualValues),
+        expectedValues, actualValues);
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
index 6207084..1e1275c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
@@ -106,6 +106,8 @@ public class TestOffsetIndexBuilder {
           offsetIndex.getCompressedPageSize(i));
       assertEquals("Invalid firstRowIndex at page " + i, offset_size_rowIndex_triplets[3 * i + 2],
           offsetIndex.getFirstRowIndex(i));
+      long expectedLastPageIndex = (i < pageCount - 1) ? (offset_size_rowIndex_triplets[3 * i + 5] - 1) : 999;
+      assertEquals("Invalid lastRowIndex at page " + i, expectedLastPageIndex, offsetIndex.getLastRowIndex(i, 1000));
     }
   }
 }
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
new file mode 100644
index 0000000..ec85c27
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING;
+import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.DESCENDING;
+import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.UNORDERED;
+import static org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.calculateRowRanges;
+import static org.apache.parquet.io.api.Binary.fromString;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Types.optional;
+import static org.junit.Assert.assertArrayEquals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.LongStream;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.BinaryUtf8StartsWithB;
+import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.DoubleIsInteger;
+import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.IntegerIsDivisableWith3;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+/**
+ * Unit tests of {@link ColumnIndexFilter}
+ */
+public class TestColumnIndexFilter {
+  private static class CIBuilder {
+    private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
+    private final PrimitiveType type;
+    private final BoundaryOrder order;
+    private List<Boolean> nullPages = new ArrayList<>();
+    private List<Long> nullCounts = new ArrayList<>();
+    private List<ByteBuffer> minValues = new ArrayList<>();
+    private List<ByteBuffer> maxValues = new ArrayList<>();
+
+    CIBuilder(PrimitiveType type, BoundaryOrder order) {
+      this.type = type;
+      this.order = order;
+    }
+
+    CIBuilder addNullPage(long nullCount) {
+      nullPages.add(true);
+      nullCounts.add(nullCount);
+      minValues.add(EMPTY);
+      maxValues.add(EMPTY);
+      return this;
+    }
+
+    CIBuilder addPage(long nullCount, int min, int max) {
+      nullPages.add(false);
+      nullCounts.add(nullCount);
+      minValues.add(ByteBuffer.wrap(BytesUtils.intToBytes(min)));
+      maxValues.add(ByteBuffer.wrap(BytesUtils.intToBytes(max)));
+      return this;
+    }
+
+    CIBuilder addPage(long nullCount, String min, String max) {
+      nullPages.add(false);
+      nullCounts.add(nullCount);
+      minValues.add(ByteBuffer.wrap(min.getBytes(UTF_8)));
+      maxValues.add(ByteBuffer.wrap(max.getBytes(UTF_8)));
+      return this;
+    }
+
+    CIBuilder addPage(long nullCount, double min, double max) {
+      nullPages.add(false);
+      nullCounts.add(nullCount);
+      minValues.add(ByteBuffer.wrap(BytesUtils.longToBytes(Double.doubleToLongBits(min))));
+      maxValues.add(ByteBuffer.wrap(BytesUtils.longToBytes(Double.doubleToLongBits(max))));
+      return this;
+    }
+
+    ColumnIndex build() {
+      return ColumnIndexBuilder.build(type, order, nullPages, nullCounts, minValues, maxValues);
+    }
+  }
+
+  private static class OIBuilder {
+    private final OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+
+    OIBuilder addPage(long rowCount) {
+      builder.add(1234, rowCount);
+      return this;
+    }
+
+    OffsetIndex build() {
+      return builder.build();
+    }
+  }
+
+  public static class AnyInt extends UserDefinedPredicate<Integer> {
+
+    @Override
+    public boolean keep(Integer value) {
+      return true;
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Integer> statistics) {
+      return false;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Integer> statistics) {
+      return true;
+    }
+
+  }
+
+  /**
+   * <pre>
+   * row     column1        column2        column3        column4 (no column index)
+   *      ------0------  ------0------  ------0------  ------0------
+   * 0.   1              Zulu           2.03
+   *      ------1------  ------1------  ------1------  ------1------
+   * 1.   2              Yankee         4.67
+   * 2.   3              Xray           3.42
+   * 3.   4              Whiskey        8.71
+   *                     ------2------                 ------2------
+   * 4.   5              Victor         0.56
+   * 5.   6              Uniform        4.30
+   *                                    ------2------  ------3------
+   * 6.   null           null           null
+   *      ------2------                                ------4------
+   * 7.   7              Tango          3.50
+   *                     ------3------
+   * 8.   7              null           3.14
+   *      ------3------
+   * 9.   7              null           null
+   *                                    ------3------
+   * 10.  null           null           9.99
+   *                     ------4------
+   * 11.  8              Sierra         8.78
+   *                                                   ------5------
+   * 12.  9              Romeo          9.56
+   * 13.  10             Quebec         2.71
+   *      ------4------
+   * 14.  11             Papa           5.71
+   * 15.  12             Oscar          4.09
+   *                     ------5------  ------4------  ------6------
+   * 16.  13             November       null
+   * 17.  14             Mike           null
+   * 18.  15             Lima           0.36
+   * 19.  16             Kilo           2.94
+   * 20.  17             Juliett        4.23
+   *      ------5------  ------6------                 ------7------
+   * 21.  18             India          null
+   * 22.  19             Hotel          5.32
+   *                                    ------5------
+   * 23.  20             Golf           4.17
+   * 24.  21             Foxtrot        7.92
+   * 25.  22             Echo           7.95
+   *                                   ------6------
+   * 26.  23             Delta          null
+   *      ------6------
+   * 27.  24             Charlie        null
+   *                                                   ------8------
+   * 28.  25             Bravo          null
+   *                     ------7------
+   * 29.  26             Alfa           null
+   * </pre>
+   */
+  private static final long TOTAL_ROW_COUNT = 30;
+  private static final ColumnIndex COLUMN1_CI = new CIBuilder(optional(INT32).named("column1"), ASCENDING)
+      .addPage(0, 1, 1)
+      .addPage(1, 2, 6)
+      .addPage(0, 7, 7)
+      .addPage(1, 7, 10)
+      .addPage(0, 11, 17)
+      .addPage(0, 18, 23)
+      .addPage(0, 24, 26)
+      .build();
+  private static final OffsetIndex COLUMN1_OI = new OIBuilder()
+      .addPage(1)
+      .addPage(6)
+      .addPage(2)
+      .addPage(5)
+      .addPage(7)
+      .addPage(6)
+      .addPage(3)
+      .build();
+  private static final ColumnIndex COLUMN2_CI = new CIBuilder(optional(BINARY).as(UTF8).named("column2"), DESCENDING)
+      .addPage(0, "Zulu", "Zulu")
+      .addPage(0, "Whiskey", "Yankee")
+      .addPage(1, "Tango", "Victor")
+      .addNullPage(3)
+      .addPage(0, "Oscar", "Sierra")
+      .addPage(0, "Juliett", "November")
+      .addPage(0, "Bravo", "India")
+      .addPage(0, "Alfa", "Alfa")
+      .build();
+  private static final OffsetIndex COLUMN2_OI = new OIBuilder()
+      .addPage(1)
+      .addPage(3)
+      .addPage(4)
+      .addPage(3)
+      .addPage(5)
+      .addPage(5)
+      .addPage(8)
+      .addPage(1)
+      .build();
+  private static final ColumnIndex COLUMN3_CI = new CIBuilder(optional(DOUBLE).named("column3"), UNORDERED)
+      .addPage(0, 2.03, 2.03)
+      .addPage(0, 0.56, 8.71)
+      .addPage(2, 3.14, 3.50)
+      .addPage(0, 2.71, 9.99)
+      .addPage(3, 0.36, 5.32)
+      .addPage(0, 4.17, 7.95)
+      .addNullPage(4)
+      .build();
+  private static final OffsetIndex COLUMN3_OI = new OIBuilder()
+      .addPage(1)
+      .addPage(5)
+      .addPage(4)
+      .addPage(6)
+      .addPage(7)
+      .addPage(3)
+      .addPage(4)
+      .build();
+  private static final ColumnIndex COLUMN4_CI = null;
+  private static final OffsetIndex COLUMN4_OI = new OIBuilder()
+      .addPage(1)
+      .addPage(3)
+      .addPage(2)
+      .addPage(1)
+      .addPage(5)
+      .addPage(4)
+      .addPage(5)
+      .addPage(7)
+      .addPage(2)
+      .build();
+  private static final ColumnIndexStore STORE = new ColumnIndexStore() {
+    @Override
+    public ColumnIndex getColumnIndex(ColumnPath column) {
+      switch (column.toDotString()) {
+        case "column1":
+          return COLUMN1_CI;
+        case "column2":
+          return COLUMN2_CI;
+        case "column3":
+          return COLUMN3_CI;
+        case "column4":
+          return COLUMN4_CI;
+        default:
+          return null;
+      }
+    }
+
+    @Override
+    public OffsetIndex getOffsetIndex(ColumnPath column) {
+      switch (column.toDotString()) {
+        case "column1":
+          return COLUMN1_OI;
+        case "column2":
+          return COLUMN2_OI;
+        case "column3":
+          return COLUMN3_OI;
+        case "column4":
+          return COLUMN4_OI;
+        default:
+          throw new MissingOffsetIndexException(column);
+      }
+    }
+  };
+
+  private static Set<ColumnPath> paths(String... columns) {
+    Set<ColumnPath> paths = new HashSet<>();
+    for (String column : columns) {
+      paths.add(ColumnPath.fromDotString(column));
+    }
+    return paths;
+  }
+
+  private static void assertAllRows(RowRanges ranges, long rowCount) {
+    LongList actualList = new LongArrayList();
+    ranges.allRows().forEachRemaining((long value) -> actualList.add(value));
+    LongList expectedList = new LongArrayList();
+    LongStream.range(0, rowCount).forEach(expectedList::add);
+    assertArrayEquals(expectedList + " != " + actualList, expectedList.toLongArray(), actualList.toLongArray());
+  }
+
+  private static void assertRows(RowRanges ranges, long... expectedRows) {
+    LongList actualList = new LongArrayList();
+    ranges.allRows().forEachRemaining((long value) -> actualList.add(value));
+    assertArrayEquals(Arrays.toString(expectedRows) + " != " + actualList, expectedRows, actualList.toLongArray());
+  }
+
+  @Test
+  public void testFiltering() {
+    Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4");
+
+    assertAllRows(
+        calculateRowRanges(FilterCompat.get(
+            userDefined(intColumn("column1"), AnyInt.class)), STORE, paths, TOTAL_ROW_COUNT),
+        TOTAL_ROW_COUNT);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            and(
+                eq(intColumn("column1"), null),
+                eq(binaryColumn("column2"), null)),
+            and(
+                eq(doubleColumn("column3"), null),
+                eq(booleanColumn("column4"), null)))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        6, 9);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            and(
+                notEq(intColumn("column1"), null),
+                notEq(binaryColumn("column2"), null)),
+            and(
+                notEq(doubleColumn("column3"), null),
+                notEq(booleanColumn("column4"), null)))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        0, 1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        or(
+            and(
+                lt(intColumn("column1"), 20),
+                gtEq(binaryColumn("column2"), fromString("Quebec"))),
+            and(
+                gt(doubleColumn("column3"), 5.32),
+                ltEq(binaryColumn("column4"), fromString("XYZ"))))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        0, 1, 2, 3, 4, 5, 6, 7, 10, 11, 12, 13, 14, 15, 23, 24, 25);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            and(
+                gtEq(intColumn("column1"), 7),
+                gt(binaryColumn("column2"), fromString("India"))),
+            and(
+                eq(doubleColumn("column3"), null),
+                notEq(binaryColumn("column4"), null)))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        7, 16, 17, 18, 19, 20);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            or(
+                invert(userDefined(intColumn("column1"), AnyInt.class)),
+                eq(binaryColumn("column2"), fromString("Echo"))),
+            eq(doubleColumn("column3"), 6.0))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        23, 24, 25);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            userDefined(intColumn("column1"), IntegerIsDivisableWith3.class),
+            and(
+                userDefined(binaryColumn("column2"), BinaryUtf8StartsWithB.class),
+                userDefined(doubleColumn("column3"), DoubleIsInteger.class)))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        21, 22, 23, 24, 25);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            and(
+                gtEq(intColumn("column1"), 7),
+                lt(intColumn("column1"), 11)),
+            and(
+                gt(binaryColumn("column2"), fromString("Romeo")),
+                ltEq(binaryColumn("column2"), fromString("Tango"))))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        7, 11, 12, 13);
+  }
+
+  @Test
+  public void testFilteringOnMissingColumns() {
+    Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4");
+
+    // Missing column filter is always true
+    assertAllRows(calculateRowRanges(FilterCompat.get(
+        notEq(intColumn("missing_column"), 0)),
+        STORE, paths, TOTAL_ROW_COUNT),
+        TOTAL_ROW_COUNT);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        and(
+            and(
+                gtEq(intColumn("column1"), 7),
+                lt(intColumn("column1"), 11)),
+            eq(binaryColumn("missing_column"), null))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        7, 8, 9, 10, 11, 12, 13);
+
+    // Missing column filter is always false
+    assertRows(calculateRowRanges(FilterCompat.get(
+        or(
+            and(
+                gtEq(intColumn("column1"), 7),
+                lt(intColumn("column1"), 11)),
+            notEq(binaryColumn("missing_column"), null))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        7, 8, 9, 10, 11, 12, 13);
+    assertRows(calculateRowRanges(FilterCompat.get(
+        gt(intColumn("missing_column"), 0)),
+        STORE, paths, TOTAL_ROW_COUNT));
+  }
+
+  @Test
+  public void testFilteringWithMissingOffsetIndex() {
+    Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4", "column_wo_oi");
+
+    assertAllRows(calculateRowRanges(FilterCompat.get(
+        and(
+            and(
+                gtEq(intColumn("column1"), 7),
+                lt(intColumn("column1"), 11)),
+            and(
+                gt(binaryColumn("column2"), fromString("Romeo")),
+                ltEq(binaryColumn("column_wo_oi"), fromString("Tango"))))),
+        STORE, paths, TOTAL_ROW_COUNT),
+        TOTAL_ROW_COUNT);
+  }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
new file mode 100644
index 0000000..071785f
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import static org.apache.parquet.internal.filter2.columnindex.RowRanges.intersection;
+import static org.apache.parquet.internal.filter2.columnindex.RowRanges.union;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+/**
+ * Unit test for {@link RowRanges}
+ */
+public class TestRowRanges {
+  private static RowRanges buildRanges(long... rowIndexes) {
+    if (rowIndexes.length == 0) {
+      return RowRanges.EMPTY;
+    }
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    for (int i = 0, n = rowIndexes.length; i < n; i += 2) {
+      long from = rowIndexes[i];
+      long to = rowIndexes[i + 1];
+      builder.add(0, 0, from);
+      builder.add(0, 0, to + 1);
+    }
+    PrimitiveIterator.OfInt pageIndexes = new PrimitiveIterator.OfInt() {
+      private int index = 0;
+
+      @Override
+      public boolean hasNext() {
+        return index < rowIndexes.length;
+      }
+
+      @Override
+      public int nextInt() {
+        int ret = index;
+        index += 2;
+        return ret;
+      }
+    };
+    return RowRanges.build(rowIndexes[rowIndexes.length - 1], pageIndexes, builder.build());
+  }
+
+  private static void assertAllRowsEqual(PrimitiveIterator.OfLong actualIt, long... expectedValues) {
+    LongList actualList = new LongArrayList();
+    actualIt.forEachRemaining((long value) -> actualList.add(value));
+    assertArrayEquals(Arrays.toString(expectedValues) + "!= " + actualList, expectedValues, actualList.toLongArray());
+  }
+
+  @Test
+  public void testCreate() {
+    RowRanges ranges = buildRanges(
+        1, 2,
+        3, 4,
+        6, 7,
+        7, 10,
+        15, 17);
+    assertAllRowsEqual(ranges.allRows(), 1, 2, 3, 4, 6, 7, 8, 9, 10, 15, 16, 17);
+    assertEquals(12, ranges.rowCount());
+    assertTrue(ranges.isOverlapping(4, 5));
+    assertFalse(ranges.isOverlapping(5, 5));
+    assertTrue(ranges.isOverlapping(10, 14));
+    assertFalse(ranges.isOverlapping(11, 14));
+    assertFalse(ranges.isOverlapping(18, Long.MAX_VALUE));
+
+    ranges = RowRanges.single(5);
+    assertAllRowsEqual(ranges.allRows(), 0, 1, 2, 3, 4);
+    assertEquals(5, ranges.rowCount());
+    assertTrue(ranges.isOverlapping(0, 100));
+    assertFalse(ranges.isOverlapping(5, Long.MAX_VALUE));
+
+    ranges = RowRanges.EMPTY;
+    assertAllRowsEqual(ranges.allRows());
+    assertEquals(0, ranges.rowCount());
+    assertFalse(ranges.isOverlapping(0, Long.MAX_VALUE));
+  }
+
+  @Test
+  public void testUnion() {
+    RowRanges ranges1 = buildRanges(
+        2, 5,
+        7, 9,
+        14, 14,
+        20, 24);
+    RowRanges ranges2 = buildRanges(
+        1, 2,
+        4, 5,
+        11, 12,
+        14, 15,
+        21, 22);
+    RowRanges empty = buildRanges();
+    assertAllRowsEqual(union(ranges1, ranges2).allRows(), 1, 2, 3, 4, 5, 7, 8, 9, 11, 12, 14, 15, 20, 21, 22, 23, 24);
+    assertAllRowsEqual(union(ranges2, ranges1).allRows(), 1, 2, 3, 4, 5, 7, 8, 9, 11, 12, 14, 15, 20, 21, 22, 23, 24);
+    assertAllRowsEqual(union(ranges1, ranges1).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+    assertAllRowsEqual(union(ranges1, empty).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+    assertAllRowsEqual(union(empty, ranges1).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+    assertAllRowsEqual(union(ranges2, ranges2).allRows(), 1, 2, 4, 5, 11, 12, 14, 15, 21, 22);
+    assertAllRowsEqual(union(ranges2, empty).allRows(), 1, 2, 4, 5, 11, 12, 14, 15, 21, 22);
+    assertAllRowsEqual(union(empty, ranges2).allRows(), 1, 2, 4, 5, 11, 12, 14, 15, 21, 22);
+    assertAllRowsEqual(union(empty, empty).allRows());
+  }
+
+  @Test
+  public void testIntersection() {
+    RowRanges ranges1 = buildRanges(
+        2, 5,
+        7, 9,
+        14, 14,
+        20, 24);
+    RowRanges ranges2 = buildRanges(
+        1, 2,
+        6, 7,
+        9, 9,
+        11, 12,
+        14, 15,
+        21, 22);
+    RowRanges empty = buildRanges();
+    assertAllRowsEqual(intersection(ranges1, ranges2).allRows(), 2, 7, 9, 14, 21, 22);
+    assertAllRowsEqual(intersection(ranges2, ranges1).allRows(), 2, 7, 9, 14, 21, 22);
+    assertAllRowsEqual(intersection(ranges1, ranges1).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+    assertAllRowsEqual(intersection(ranges1, empty).allRows());
+    assertAllRowsEqual(intersection(empty, ranges1).allRows());
+    assertAllRowsEqual(intersection(ranges2, ranges2).allRows(), 1, 2, 6, 7, 9, 11, 12, 14, 15, 21, 22);
+    assertAllRowsEqual(intersection(ranges2, empty).allRows());
+    assertAllRowsEqual(intersection(empty, ranges2).allRows());
+    assertAllRowsEqual(intersection(empty, empty).allRows());
+  }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 8d3e48d..5f75460 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.util.HadoopCodecs;
 
 import java.util.Map;
 
+import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
 import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
 import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
 import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
@@ -43,6 +44,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
                             boolean useStatsFilter,
                             boolean useDictionaryFilter,
                             boolean useRecordFilter,
+                            boolean useColumnIndexFilter,
                             FilterCompat.Filter recordFilter,
                             MetadataFilter metadataFilter,
                             CompressionCodecFactory codecFactory,
@@ -51,8 +53,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
                             Map<String, String> properties,
                             Configuration conf) {
     super(
-        useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, recordFilter,
-        metadataFilter, codecFactory, allocator, maxAllocationSize, properties
+        useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
+        recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties
     );
     this.conf = conf;
   }
@@ -83,6 +85,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
       useDictionaryFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
       useStatsFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
       useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
+      useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
       withCodecFactory(HadoopCodecs.newFactory(conf, 0));
       withRecordFilter(getFilter(conf));
       withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
@@ -95,7 +98,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
     @Override
     public ParquetReadOptions build() {
       return new HadoopReadOptions(
-          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
           recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties,
           conf);
     }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 4ef2460..846d3bd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -38,12 +38,14 @@ public class ParquetReadOptions {
   private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
+  private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
 
   private final boolean useSignedStringMinMax;
   private final boolean useStatsFilter;
   private final boolean useDictionaryFilter;
   private final boolean useRecordFilter;
+  private final boolean useColumnIndexFilter;
   private final FilterCompat.Filter recordFilter;
   private final ParquetMetadataConverter.MetadataFilter metadataFilter;
   private final CompressionCodecFactory codecFactory;
@@ -55,6 +57,7 @@ public class ParquetReadOptions {
                      boolean useStatsFilter,
                      boolean useDictionaryFilter,
                      boolean useRecordFilter,
+                     boolean useColumnIndexFilter,
                      FilterCompat.Filter recordFilter,
                      ParquetMetadataConverter.MetadataFilter metadataFilter,
                      CompressionCodecFactory codecFactory,
@@ -65,6 +68,7 @@ public class ParquetReadOptions {
     this.useStatsFilter = useStatsFilter;
     this.useDictionaryFilter = useDictionaryFilter;
     this.useRecordFilter = useRecordFilter;
+    this.useColumnIndexFilter = useColumnIndexFilter;
     this.recordFilter = recordFilter;
     this.metadataFilter = metadataFilter;
     this.codecFactory = codecFactory;
@@ -89,6 +93,10 @@ public class ParquetReadOptions {
     return useRecordFilter;
   }
 
+  public boolean useColumnIndexFilter() {
+    return useColumnIndexFilter;
+  }
+
   public FilterCompat.Filter getRecordFilter() {
     return recordFilter;
   }
@@ -134,6 +142,7 @@ public class ParquetReadOptions {
     protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
     protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
     protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
+    protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
     protected FilterCompat.Filter recordFilter = null;
     protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
     // the page size parameter isn't used when only using the codec factory to get decompressors
@@ -182,6 +191,15 @@ public class ParquetReadOptions {
       return this;
     }
 
+    public Builder useColumnIndexFilter(boolean useColumnIndexFilter) {
+      this.useColumnIndexFilter = useColumnIndexFilter;
+      return this;
+    }
+
+    public Builder useColumnIndexFilter() {
+      return useColumnIndexFilter(true);
+    }
+
     public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
       this.recordFilter = rowGroupFilter;
       return this;
@@ -239,7 +257,7 @@ public class ParquetReadOptions {
 
     public ParquetReadOptions build() {
       return new ParquetReadOptions(
-          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
           recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties);
     }
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 37dfd6d..5caca07 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -18,24 +18,28 @@
  */
 package org.apache.parquet.hadoop;
 
+import static org.apache.parquet.Ints.checkedCast;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
+import java.util.PrimitiveIterator;
 import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.NotInPageFilteringModeException;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
-import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,8 +66,13 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
     private final long valueCount;
     private final List<DataPage> compressedPages;
     private final DictionaryPage compressedDictionaryPage;
+    // null means no page synchronization is required; firstRowIndex will not be returned by the pages
+    private final OffsetIndex offsetIndex;
+    private final long rowCount;
+    private int pageIndex = 0;
 
-    ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
+    ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages,
+        DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount) {
       this.decompressor = decompressor;
       this.compressedPages = new LinkedList<DataPage>(compressedPages);
       this.compressedDictionaryPage = compressedDictionaryPage;
@@ -72,6 +81,8 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
         count += p.getValueCount();
       }
       this.valueCount = count;
+      this.offsetIndex = offsetIndex;
+      this.rowCount = rowCount;
     }
 
     @Override
@@ -85,18 +96,34 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
         return null;
       }
       DataPage compressedPage = compressedPages.remove(0);
+      final int currentPageIndex = pageIndex++;
       return compressedPage.accept(new DataPage.Visitor<DataPage>() {
         @Override
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
-            return new DataPageV1(
-                decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
-                dataPageV1.getValueCount(),
-                dataPageV1.getUncompressedSize(),
-                dataPageV1.getStatistics(),
-                dataPageV1.getRlEncoding(),
-                dataPageV1.getDlEncoding(),
-                dataPageV1.getValueEncoding());
+            BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize());
+            if (offsetIndex == null) {
+              return new DataPageV1(
+                  decompressed,
+                  dataPageV1.getValueCount(),
+                  dataPageV1.getUncompressedSize(),
+                  dataPageV1.getStatistics(),
+                  dataPageV1.getRlEncoding(),
+                  dataPageV1.getDlEncoding(),
+                  dataPageV1.getValueEncoding());
+            } else {
+              long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
+              return new DataPageV1(
+                  decompressed,
+                  dataPageV1.getValueCount(),
+                  dataPageV1.getUncompressedSize(),
+                  firstRowIndex,
+                  checkedCast(offsetIndex.getLastRowIndex(currentPageIndex, rowCount) - firstRowIndex + 1),
+                  dataPageV1.getStatistics(),
+                  dataPageV1.getRlEncoding(),
+                  dataPageV1.getDlEncoding(),
+                  dataPageV1.getValueEncoding());
+            }
           } catch (IOException e) {
             throw new ParquetDecodingException("could not decompress page", e);
           }
@@ -105,23 +132,49 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
         @Override
         public DataPage visit(DataPageV2 dataPageV2) {
           if (!dataPageV2.isCompressed()) {
-            return dataPageV2;
+            if (offsetIndex == null) {
+              return dataPageV2;
+            } else {
+              return DataPageV2.uncompressed(
+                  dataPageV2.getRowCount(),
+                  dataPageV2.getNullCount(),
+                  dataPageV2.getValueCount(),
+                  offsetIndex.getFirstRowIndex(currentPageIndex),
+                  dataPageV2.getRepetitionLevels(),
+                  dataPageV2.getDefinitionLevels(),
+                  dataPageV2.getDataEncoding(),
+                  dataPageV2.getData(),
+                  dataPageV2.getStatistics());
+            }
           }
           try {
             int uncompressedSize = Ints.checkedCast(
                 dataPageV2.getUncompressedSize()
-                - dataPageV2.getDefinitionLevels().size()
-                - dataPageV2.getRepetitionLevels().size());
-            return DataPageV2.uncompressed(
-                dataPageV2.getRowCount(),
-                dataPageV2.getNullCount(),
-                dataPageV2.getValueCount(),
-                dataPageV2.getRepetitionLevels(),
-                dataPageV2.getDefinitionLevels(),
-                dataPageV2.getDataEncoding(),
-                decompressor.decompress(dataPageV2.getData(), uncompressedSize),
-                dataPageV2.getStatistics()
-                );
+                    - dataPageV2.getDefinitionLevels().size()
+                    - dataPageV2.getRepetitionLevels().size());
+            BytesInput decompressed = decompressor.decompress(dataPageV2.getData(), uncompressedSize);
+            if (offsetIndex == null) {
+              return DataPageV2.uncompressed(
+                  dataPageV2.getRowCount(),
+                  dataPageV2.getNullCount(),
+                  dataPageV2.getValueCount(),
+                  dataPageV2.getRepetitionLevels(),
+                  dataPageV2.getDefinitionLevels(),
+                  dataPageV2.getDataEncoding(),
+                  decompressed,
+                  dataPageV2.getStatistics());
+            } else {
+              return DataPageV2.uncompressed(
+                  dataPageV2.getRowCount(),
+                  dataPageV2.getNullCount(),
+                  dataPageV2.getValueCount(),
+                  offsetIndex.getFirstRowIndex(currentPageIndex),
+                  dataPageV2.getRepetitionLevels(),
+                  dataPageV2.getDefinitionLevels(),
+                  dataPageV2.getDataEncoding(),
+                  decompressed,
+                  dataPageV2.getStatistics());
+            }
           } catch (IOException e) {
             throw new ParquetDecodingException("could not decompress page", e);
           }
@@ -147,9 +200,16 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
 
   private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
   private final long rowCount;
+  private final RowRanges rowRanges;
 
   public ColumnChunkPageReadStore(long rowCount) {
     this.rowCount = rowCount;
+    rowRanges = null;
+  }
+
+  ColumnChunkPageReadStore(RowRanges rowRanges) {
+    this.rowRanges = rowRanges;
+    rowCount = rowRanges.rowCount();
   }
 
   @Override
@@ -170,6 +230,19 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
     return readers.get(descriptor).readDictionaryPage();
   }
 
+  @Override
+  public PrimitiveIterator.OfLong getRowIndexes() {
+    if (rowRanges == null) {
+      throw new NotInPageFilteringModeException("Row indexes are not available");
+    }
+    return rowRanges.allRows();
+  }
+
+  @Override
+  public boolean isInPageFilteringMode() {
+    return rowRanges != null;
+  }
+
   void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
     if (readers.put(path, reader) != null) {
       throw new RuntimeException(path+ " was added twice");
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
new file mode 100644
index 0000000..448515e
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Formatter;
+import java.util.List;
+
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+/**
+ * Internal utility class to help at column index based filtering.
+ */
+class ColumnIndexFilterUtils {
+  static class OffsetRange {
+    private final long offset;
+    private long length;
+
+    private OffsetRange(long offset, int length) {
+      this.offset = offset;
+      this.length = length;
+    }
+
+    long getOffset() {
+      return offset;
+    }
+
+    long getLength() {
+      return length;
+    }
+
+    private boolean extend(long offset, int length) {
+      if (this.offset + this.length == offset) {
+        this.length += length;
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  private static class FilteredOffsetIndex implements OffsetIndex {
+    private final OffsetIndex offsetIndex;
+    private final int[] indexMap;
+
+    private FilteredOffsetIndex(OffsetIndex offsetIndex, int[] indexMap) {
+      this.offsetIndex = offsetIndex;
+      this.indexMap = indexMap;
+    }
+
+    @Override
+    public int getPageCount() {
+      return indexMap.length;
+    }
+
+    @Override
+    public long getOffset(int pageIndex) {
+      return offsetIndex.getOffset(indexMap[pageIndex]);
+    }
+
+    @Override
+    public int getCompressedPageSize(int pageIndex) {
+      return offsetIndex.getCompressedPageSize(indexMap[pageIndex]);
+    }
+
+    @Override
+    public long getFirstRowIndex(int pageIndex) {
+      return offsetIndex.getFirstRowIndex(indexMap[pageIndex]);
+    }
+
+    @Override
+    public long getLastRowIndex(int pageIndex, long totalRowCount) {
+      int nextIndex = indexMap[pageIndex] + 1;
+      return (nextIndex >= offsetIndex.getPageCount() ? totalRowCount : offsetIndex.getFirstRowIndex(nextIndex)) - 1;
+    }
+
+    @Override
+    public String toString() {
+      try (Formatter formatter = new Formatter()) {
+        formatter.format("%-12s  %20s  %16s  %20s\n", "", "offset", "compressed size", "first row index");
+        for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
+          int index = Arrays.binarySearch(indexMap, i);
+          boolean isHidden = index < 0;
+          formatter.format("%spage-%-5d  %20d  %16d  %20d\n",
+              isHidden ? "- " : "  ",
+              isHidden ? i : index,
+              offsetIndex.getOffset(i),
+              offsetIndex.getCompressedPageSize(i),
+              offsetIndex.getFirstRowIndex(i));
+        }
+        return formatter.toString();
+      }
+    }
+  }
+
+  /*
+   * Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+   */
+  static OffsetIndex filterOffsetIndex(OffsetIndex offsetIndex, RowRanges rowRanges, long totalRowCount) {
+    IntList indexMap = new IntArrayList();
+    for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
+      long from = offsetIndex.getFirstRowIndex(i);
+      if (rowRanges.isOverlapping(from, offsetIndex.getLastRowIndex(i, totalRowCount))) {
+        indexMap.add(i);
+      }
+    }
+    return new FilteredOffsetIndex(offsetIndex, indexMap.toIntArray());
+  }
+
+  static List<OffsetRange> calculateOffsetRanges(OffsetIndex offsetIndex, ColumnChunkMetaData cm,
+      long firstPageOffset) {
+    List<OffsetRange> ranges = new ArrayList<>();
+    int n = offsetIndex.getPageCount();
+    if (n > 0) {
+      OffsetRange currentRange = null;
+
+      // Add a range for the dictionary page if required
+      long rowGroupOffset = cm.getStartingPos();
+      if (rowGroupOffset < firstPageOffset) {
+        currentRange = new OffsetRange(rowGroupOffset, (int) (firstPageOffset - rowGroupOffset));
+        ranges.add(currentRange);
+      }
+
+      for (int i = 0; i < n; ++i) {
+        long offset = offsetIndex.getOffset(i);
+        int length = offsetIndex.getCompressedPageSize(i);
+        if (currentRange == null || !currentRange.extend(offset, length)) {
+          currentRange = new OffsetRange(offset, length);
+          ranges.add(currentRange);
+        }
+      }
+    }
+    return ranges;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexStoreImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexStoreImpl.java
new file mode 100644
index 0000000..684c5f2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexStoreImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal implementation of {@link ColumnIndexStore}.
+ */
+class ColumnIndexStoreImpl implements ColumnIndexStore {
+
+  private interface IndexStore {
+    ColumnIndex getColumnIndex();
+
+    OffsetIndex getOffsetIndex();
+  }
+
+  private class IndexStoreImpl implements IndexStore {
+    private final ColumnChunkMetaData meta;
+    private ColumnIndex columnIndex;
+    private boolean columnIndexRead;
+    private final OffsetIndex offsetIndex;
+
+    IndexStoreImpl(ColumnChunkMetaData meta) {
+      this.meta = meta;
+      OffsetIndex oi;
+      try {
+        oi = reader.readOffsetIndex(meta);
+      } catch (IOException e) {
+        // If the I/O issue still stands it will fail the reading later;
+        // otherwise we fail the filtering only with a missing offset index.
+        LOGGER.warn("Unable to read offset index for column {}", meta.getPath(), e);
+        oi = null;
+      }
+      if (oi == null) {
+        throw new MissingOffsetIndexException(meta.getPath());
+      }
+      offsetIndex = oi;
+    }
+
+    @Override
+    public ColumnIndex getColumnIndex() {
+      if (!columnIndexRead) {
+        try {
+          columnIndex = reader.readColumnIndex(meta);
+        } catch (IOException e) {
+          // If the I/O issue still stands it will fail the reading later;
+          // otherwise we fail the filtering only with a missing column index.
+          LOGGER.warn("Unable to read column index for column {}", meta.getPath(), e);
+        }
+        columnIndexRead = true;
+      }
+      return columnIndex;
+    }
+
+    @Override
+    public OffsetIndex getOffsetIndex() {
+      return offsetIndex;
+    }
+  }
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ColumnIndexStoreImpl.class);
+  // Used for columns are not in this parquet file
+  private static final IndexStore MISSING_INDEX_STORE = new IndexStore() {
+    @Override
+    public ColumnIndex getColumnIndex() {
+      return null;
+    }
+
+    @Override
+    public OffsetIndex getOffsetIndex() {
+      return null;
+    }
+  };
+  private static final ColumnIndexStoreImpl EMPTY = new ColumnIndexStoreImpl(null, new BlockMetaData(), emptySet()) {
+    @Override
+    public ColumnIndex getColumnIndex(ColumnPath column) {
+      return null;
+    }
+
+    @Override
+    public OffsetIndex getOffsetIndex(ColumnPath column) {
+      throw new MissingOffsetIndexException(column);
+    }
+  };
+
+  private final ParquetFileReader reader;
+  private final Map<ColumnPath, IndexStore> store;
+
+  /*
+   * Creates a column index store which lazily reads column/offset indexes for the columns in paths. (paths are the set
+   * of columns used for the projection)
+   */
+  static ColumnIndexStore create(ParquetFileReader reader, BlockMetaData block, Set<ColumnPath> paths) {
+    try {
+      return new ColumnIndexStoreImpl(reader, block, paths);
+    } catch (MissingOffsetIndexException e) {
+      return EMPTY;
+    }
+  }
+
+  private ColumnIndexStoreImpl(ParquetFileReader reader, BlockMetaData block, Set<ColumnPath> paths) {
+    // TODO[GS]: Offset index for every paths will be required; pre-read the consecutive ones at once?
+    // TODO[GS]: Pre-read column index based on filter?
+    this.reader = reader;
+    Map<ColumnPath, IndexStore> store = new HashMap<>();
+    for (ColumnChunkMetaData column : block.getColumns()) {
+      ColumnPath path = column.getPath();
+      if (paths.contains(path)) {
+        store.put(path, new IndexStoreImpl(column));
+      }
+    }
+    this.store = store;
+  }
+
+  @Override
+  public ColumnIndex getColumnIndex(ColumnPath column) {
+    return store.getOrDefault(column, MISSING_INDEX_STORE).getColumnIndex();
+  }
+
+  @Override
+  public OffsetIndex getOffsetIndex(ColumnPath column) {
+    return store.getOrDefault(column, MISSING_INDEX_STORE).getOffsetIndex();
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index a048878..e57f3cb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -124,7 +124,7 @@ class InternalParquetRecordReader<T> {
 
       LOG.info("at row " + current + ". reading next block");
       long t0 = System.currentTimeMillis();
-      PageReadStore pages = reader.readNextRowGroup();
+      PageReadStore pages = reader.readNextFilteredRowGroup();
       if (pages == null) {
         throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
       }
@@ -182,7 +182,7 @@ class InternalParquetRecordReader<T> {
     this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext);
     this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true);
-    this.total = reader.getRecordCount();
+    this.total = reader.getFilteredRecordCount();
     this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total);
     this.filterRecords = options.useRecordFilter();
     reader.setRequestedSchema(requestedSchema);
@@ -204,7 +204,7 @@ class InternalParquetRecordReader<T> {
     this.recordConverter = readSupport.prepareForRead(
         configuration, fileMetadata, fileSchema, readContext);
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
-    this.total = reader.getRecordCount();
+    this.total = reader.getFilteredRecordCount();
     this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
     this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true);
     reader.setRequestedSchema(requestedSchema);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index eda4745..add0e09 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -23,6 +23,8 @@ import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTI
 import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.calculateOffsetRanges;
+import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.filterOffsetIndex;
 import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
@@ -46,28 +48,28 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DictionaryPageReadStore;
-import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.RowGroupFilter;
-
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
 import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.DictionaryPageHeader;
@@ -76,20 +78,24 @@ import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
+import org.apache.parquet.hadoop.ColumnIndexFilterUtils.OffsetRange;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
-import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
 import org.apache.parquet.internal.hadoop.metadata.IndexReference;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.yetus.audience.InterfaceAudience.Private;
@@ -605,6 +611,8 @@ public class ParquetFileReader implements Closeable {
   private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
   private final FileMetaData fileMetaData; // may be null
   private final List<BlockMetaData> blocks;
+  private final List<ColumnIndexStore> blockIndexStores;
+  private final List<RowRanges> blockRowRanges;
 
   // not final. in some cases, this may be lazily loaded for backward-compat.
   private ParquetMetadata footer;
@@ -646,6 +654,8 @@ public class ParquetFileReader implements Closeable {
     this.f = file.newStream();
     this.options = HadoopReadOptions.builder(configuration).build();
     this.blocks = filterRowGroups(blocks);
+    this.blockIndexStores = listWithNulls(this.blocks.size());
+    this.blockRowRanges = listWithNulls(this.blocks.size());
     for (ColumnDescriptor col : columns) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
@@ -680,6 +690,8 @@ public class ParquetFileReader implements Closeable {
     this.footer = footer;
     this.fileMetaData = footer.getFileMetaData();
     this.blocks = filterRowGroups(footer.getBlocks());
+    this.blockIndexStores = listWithNulls(this.blocks.size());
+    this.blockRowRanges = listWithNulls(this.blocks.size());
     for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
@@ -693,11 +705,17 @@ public class ParquetFileReader implements Closeable {
     this.footer = readFooter(file, options, f, converter);
     this.fileMetaData = footer.getFileMetaData();
     this.blocks = filterRowGroups(footer.getBlocks());
+    this.blockIndexStores = listWithNulls(this.blocks.size());
+    this.blockRowRanges = listWithNulls(this.blocks.size());
     for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
   }
 
+  private static <T> List<T> listWithNulls(int size) {
+    return Stream.generate(() -> (T) null).limit(size).collect(Collectors.toCollection(ArrayList<T>::new));
+  }
+
   public ParquetMetadata getFooter() {
     if (footer == null) {
       try {
@@ -725,6 +743,17 @@ public class ParquetFileReader implements Closeable {
     return total;
   }
 
+  long getFilteredRecordCount() {
+    if (!options.useColumnIndexFilter()) {
+      return getRecordCount();
+    }
+    long total = 0;
+    for (int i = 0, n = blocks.size(); i < n; ++i) {
+      total += getRowRanges(i).rowCount();
+    }
+    return total;
+  }
+
   /**
    * @return the path for this file
    * @deprecated will be removed in 2.0.0; use {@link #getFile()} instead
@@ -787,30 +816,111 @@ public class ParquetFileReader implements Closeable {
       throw new RuntimeException("Illegal row group of 0 rows");
     }
     this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount());
-    // prepare the list of consecutive chunks to read them in one scan
-    List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
-    ConsecutiveChunkList currentChunks = null;
+    // prepare the list of consecutive parts to read them in one scan
+    List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
+    ConsecutivePartList currentParts = null;
     for (ColumnChunkMetaData mc : block.getColumns()) {
       ColumnPath pathKey = mc.getPath();
       BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
       ColumnDescriptor columnDescriptor = paths.get(pathKey);
       if (columnDescriptor != null) {
         long startingPos = mc.getStartingPos();
-        // first chunk or not consecutive => new list
-        if (currentChunks == null || currentChunks.endPos() != startingPos) {
-          currentChunks = new ConsecutiveChunkList(startingPos);
-          allChunks.add(currentChunks);
+        // first part or not consecutive => new list
+        if (currentParts == null || currentParts.endPos() != startingPos) {
+          currentParts = new ConsecutivePartList(startingPos);
+          allParts.add(currentParts);
         }
-        currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
+        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
       }
     }
     // actually read all the chunks
-    for (ConsecutiveChunkList consecutiveChunks : allChunks) {
-      final List<Chunk> chunks = consecutiveChunks.readAll(f);
-      for (Chunk chunk : chunks) {
-        currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+    ChunkListBuilder builder = new ChunkListBuilder();
+    for (ConsecutivePartList consecutiveChunks : allParts) {
+      consecutiveChunks.readAll(f, builder);
+    }
+    for (Chunk chunk : builder.build()) {
+      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+    }
+
+    // avoid re-reading bytes the dictionary reader is used after this call
+    if (nextDictionaryReader != null) {
+      nextDictionaryReader.setRowGroup(currentRowGroup);
+    }
+
+    advanceToNextBlock();
+
+    return currentRowGroup;
+  }
+
+  /**
+   * Reads all the columns requested from the row group at the current file position. It may skip specific pages based
+   * on the column indexes according to the actual filter. As the rows are not aligned among the pages of the different
+   * columns row synchronization might be required.
+   *
+   * @return the PageReadStore which can provide PageReaders for each column
+   * @throws IOException
+   *           if any I/O error occurs while reading
+   * @see {@link PageReadStore#isInPageFilteringMode()}
+   */
+  public PageReadStore readNextFilteredRowGroup() throws IOException {
+    if (currentBlock == blocks.size()) {
+      return null;
+    }
+    if (!options.useColumnIndexFilter()) {
+      return readNextRowGroup();
+    }
+    BlockMetaData block = blocks.get(currentBlock);
+    if (block.getRowCount() == 0) {
+      throw new RuntimeException("Illegal row group of 0 rows");
+    }
+    ColumnIndexStore ciStore = getColumnIndexStore(currentBlock);
+    RowRanges rowRanges = getRowRanges(currentBlock);
+    long rowCount = rowRanges.rowCount();
+    if (rowCount == 0) {
+      // There are no matching rows -> skipping this row-group
+      advanceToNextBlock();
+      return readNextFilteredRowGroup();
+    }
+    if (rowCount == block.getRowCount()) {
+      // All rows are matching -> fall back to the non-filtering path
+      return readNextRowGroup();
+    }
+
+    this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges);
+    // prepare the list of consecutive parts to read them in one scan
+    ChunkListBuilder builder = new ChunkListBuilder();
+    List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
+    ConsecutivePartList currentParts = null;
+    for (ColumnChunkMetaData mc : block.getColumns()) {
+      ColumnPath pathKey = mc.getPath();
+      ColumnDescriptor columnDescriptor = paths.get(pathKey);
+      if (columnDescriptor != null) {
+        OffsetIndex offsetIndex = ciStore.getOffsetIndex(mc.getPath());
+
+        OffsetIndex filteredOffsetIndex = filterOffsetIndex(offsetIndex, rowRanges,
+            block.getRowCount());
+        for (OffsetRange range : calculateOffsetRanges(filteredOffsetIndex, mc, offsetIndex.getOffset(0))) {
+          BenchmarkCounter.incrementTotalBytes(range.getLength());
+          long startingPos = range.getOffset();
+          // first part or not consecutive => new list
+          if (currentParts == null || currentParts.endPos() != startingPos) {
+            currentParts = new ConsecutivePartList(startingPos);
+            allParts.add(currentParts);
+          }
+          ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
+              (int) range.getLength());
+          currentParts.addChunk(chunkDescriptor);
+          builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
+        }
       }
     }
+    // actually read all the chunks
+    for (ConsecutivePartList consecutiveChunks : allParts) {
+      consecutiveChunks.readAll(f, builder);
+    }
+    for (Chunk chunk : builder.build()) {
+      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+    }
 
     // avoid re-reading bytes the dictionary reader is used after this call
     if (nextDictionaryReader != null) {
@@ -822,6 +932,25 @@ public class ParquetFileReader implements Closeable {
     return currentRowGroup;
   }
 
+  private ColumnIndexStore getColumnIndexStore(int blockIndex) {
+    ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
+    if (ciStore == null) {
+      ciStore = ColumnIndexStoreImpl.create(this, blocks.get(blockIndex), paths.keySet());
+      blockIndexStores.set(blockIndex, ciStore);
+    }
+    return ciStore;
+  }
+
+  private RowRanges getRowRanges(int blockIndex) {
+    RowRanges rowRanges = blockRowRanges.get(blockIndex);
+    if (rowRanges == null) {
+      rowRanges = ColumnIndexFilter.calculateRowRanges(options.getRecordFilter(), getColumnIndexStore(blockIndex),
+          paths.keySet(), blocks.get(blockIndex).getRowCount());
+      blockRowRanges.set(blockIndex, rowRanges);
+    }
+    return rowRanges;
+  }
+
   public boolean skipNextRowGroup() {
     return advanceToNextBlock();
   }
@@ -952,6 +1081,57 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  /*
+   * Builder to concatenate the buffers of the discontinuous parts for the same column. These parts are generated as a
+   * result of the column-index based filtering when some pages might be skipped at reading.
+   */
+  private class ChunkListBuilder {
+    private class ChunkData {
+      final List<ByteBuffer> buffers = new ArrayList<>();
+      OffsetIndex offsetIndex;
+    }
+
+    private final Map<ChunkDescriptor, ChunkData> map = new HashMap<>();
+    private ChunkDescriptor lastDescriptor;
+    private SeekableInputStream f;
+
+    void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
+      ChunkData data = map.get(descriptor);
+      if (data == null) {
+        data = new ChunkData();
+        map.put(descriptor, data);
+      }
+      data.buffers.addAll(buffers);
+
+      lastDescriptor = descriptor;
+      this.f = f;
+    }
+
+    void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) {
+      ChunkData data = map.get(descriptor);
+      if (data == null) {
+        data = new ChunkData();
+        map.put(descriptor, data);
+      }
+      data.offsetIndex = offsetIndex;
+    }
+
+    List<Chunk> build() {
+      List<Chunk> chunks = new ArrayList<>();
+      for (Entry<ChunkDescriptor, ChunkData> entry : map.entrySet()) {
+        ChunkDescriptor descriptor = entry.getKey();
+        ChunkData data = entry.getValue();
+        if (descriptor.equals(lastDescriptor)) {
+          // because of a bug, the last chunk might be larger than descriptor.size
+          chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex));
+        } else {
+          chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex));
+        }
+      }
+      return chunks;
+    }
+  }
+
   /**
    * The data for a column chunk
    */
@@ -959,15 +1139,17 @@ public class ParquetFileReader implements Closeable {
 
     protected final ChunkDescriptor descriptor;
     protected final ByteBufferInputStream stream;
+    final OffsetIndex offsetIndex;
 
     /**
-     *
      * @param descriptor descriptor for the chunk
      * @param buffers ByteBuffers that contain the chunk
+     * @param offsetIndex the offset index for this column; might be null
      */
-    public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers) {
+    public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, OffsetIndex offsetIndex) {
       this.descriptor = descriptor;
       this.stream = ByteBufferInputStream.wrap(buffers);
+      this.offsetIndex = offsetIndex;
     }
 
     protected PageHeader readPageHeader() throws IOException {
@@ -984,7 +1166,8 @@ public class ParquetFileReader implements Closeable {
       PrimitiveType type = getFileMetaData().getSchema()
           .getType(descriptor.col.getPath()).asPrimitiveType();
       long valuesCountReadSoFar = 0;
-      while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
+      int dataPageCountReadSoFar = 0;
+      while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
         PageHeader pageHeader = readPageHeader();
         int uncompressedPageSize = pageHeader.getUncompressed_page_size();
         int compressedPageSize = pageHeader.getCompressed_page_size();
@@ -994,8 +1177,8 @@ public class ParquetFileReader implements Closeable {
             if (dictionaryPage != null) {
               throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
             }
-          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
-          dictionaryPage =
+            DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+            dictionaryPage =
                 new DictionaryPage(
                     this.readAsBytesInput(compressedPageSize),
                     uncompressedPageSize,
@@ -1019,6 +1202,7 @@ public class ParquetFileReader implements Closeable {
                     converter.getEncoding(dataHeaderV1.getEncoding())
                     ));
             valuesCountReadSoFar += dataHeaderV1.getNum_values();
+            ++dataPageCountReadSoFar;
             break;
           case DATA_PAGE_V2:
             DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
@@ -1040,6 +1224,7 @@ public class ParquetFileReader implements Closeable {
                     dataHeaderV2.isIs_compressed()
                     ));
             valuesCountReadSoFar += dataHeaderV2.getNum_values();
+            ++dataPageCountReadSoFar;
             break;
           default:
             LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
@@ -1047,7 +1232,7 @@ public class ParquetFileReader implements Closeable {
             break;
         }
       }
-      if (valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
+      if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
         // Would be nice to have a CorruptParquetFileException or something as a subclass?
         throw new IOException(
             "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
@@ -1056,7 +1241,13 @@ public class ParquetFileReader implements Closeable {
             + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
       }
       BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
-      return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
+      return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
+          blocks.get(currentBlock).getRowCount());
+    }
+
+    private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
+      return offsetIndex == null ? valuesCountReadSoFar < descriptor.metadata.getValueCount()
+          : dataPageCountReadSoFar < offsetIndex.getPageCount();
     }
 
     /**
@@ -1081,8 +1272,8 @@ public class ParquetFileReader implements Closeable {
      * @param descriptor the descriptor of the chunk
      * @param f the file stream positioned at the end of this chunk
      */
-    private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
-      super(descriptor, buffers);
+    private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex) {
+      super(descriptor, buffers, offsetIndex);
       this.f = f;
     }
 
@@ -1131,7 +1322,7 @@ public class ParquetFileReader implements Closeable {
 
 
   /**
-   * information needed to read a column chunk
+   * Information needed to read a column chunk or a part of it.
    */
   private static class ChunkDescriptor {
 
@@ -1157,12 +1348,29 @@ public class ParquetFileReader implements Closeable {
       this.fileOffset = fileOffset;
       this.size = size;
     }
+
+    @Override
+    public int hashCode() {
+      return col.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      } else if (obj instanceof ChunkDescriptor) {
+        return col.equals(((ChunkDescriptor) obj).col);
+      } else {
+        return false;
+      }
+    }
   }
 
   /**
-   * describes a list of consecutive column chunks to be read at once.
+   * Describes a list of consecutive parts to be read at once. A consecutive part may contain whole column chunks or
+   * only parts of them (some pages).
    */
-  private class ConsecutiveChunkList {
+  private class ConsecutivePartList {
 
     private final long offset;
     private int length;
@@ -1171,7 +1379,7 @@ public class ParquetFileReader implements Closeable {
     /**
      * @param offset where the first chunk starts
      */
-    ConsecutiveChunkList(long offset) {
+    ConsecutivePartList(long offset) {
       this.offset = offset;
     }
 
@@ -1187,11 +1395,10 @@ public class ParquetFileReader implements Closeable {
 
     /**
      * @param f file to read the chunks from
-     * @return the chunks
+     * @param builder used to build chunk list to read the pages for the different columns
      * @throws IOException if there is an error while reading from the stream
      */
-    public List<Chunk> readAll(SeekableInputStream f) throws IOException {
-      List<Chunk> result = new ArrayList<Chunk>(chunks.size());
+    public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
       f.seek(offset);
 
       int fullAllocations = length / options.getMaxAllocationSize();
@@ -1218,14 +1425,8 @@ public class ParquetFileReader implements Closeable {
       ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
       for (int i = 0; i < chunks.size(); i++) {
         ChunkDescriptor descriptor = chunks.get(i);
-        if (i < chunks.size() - 1) {
-          result.add(new Chunk(descriptor, stream.sliceBuffers(descriptor.size)));
-        } else {
-          // because of a bug, the last chunk might be larger than descriptor.size
-          result.add(new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f));
-        }
+        builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
       }
-      return result ;
     }
 
     /**
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 2c21e52..b8fce2f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -130,6 +130,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
 
   /**
+   * key to configure whether column index filtering of pages is enabled
+   */
+  public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled";
+
+  /**
    * key to turn on or off task side metadata loading (default true)
    * if true then metadata is read on the task side and some tasks may finish immediately.
    * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index d9b273b..de20808 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -270,6 +270,16 @@ public class ParquetReader<T> implements Closeable {
       return this;
     }
 
+    public Builder<T> useColumnIndexFilter(boolean useColumnIndexFilter) {
+      optionsBuilder.useColumnIndexFilter(useColumnIndexFilter);
+      return this;
+    }
+
+    public Builder<T> useColumnIndexFilter() {
+      optionsBuilder.useColumnIndexFilter();
+      return this;
+    }
+
     public Builder<T> withFileRange(long start, long end) {
       optionsBuilder.withRange(start, end);
       return this;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 5f474fd..e6aa104 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -189,9 +189,7 @@ abstract public class ColumnChunkMetaData {
   /**
    *
    * @return column identifier
-   * @deprecated will be removed in 2.0.0. Use {@link #getPrimitiveType()} instead.
    */
-  @Deprecated
   public ColumnPath getPath() {
     return properties.getPath();
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 7acda93..18ddca0 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -31,6 +31,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.schema.MessageType;
@@ -91,6 +92,11 @@ public class PhoneBookWriter {
       result = 31 * result + (lat != null ? lat.hashCode() : 0);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "Location [lon=" + lon + ", lat=" + lat + "]";
+    }
   }
 
   public static class PhoneNumber {
@@ -129,6 +135,11 @@ public class PhoneBookWriter {
       result = 31 * result + (kind != null ? kind.hashCode() : 0);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "PhoneNumber [number=" + number + ", kind=" + kind + "]";
+    }
   }
 
   public static class User {
@@ -183,6 +194,11 @@ public class PhoneBookWriter {
       result = 31 * result + (location != null ? location.hashCode() : 0);
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "User [id=" + id + ", name=" + name + ", phoneNumbers=" + phoneNumbers + ", location=" + location + "]";
+    }
   }
 
   public static SimpleGroup groupFromUser(User user) {
@@ -216,6 +232,56 @@ public class PhoneBookWriter {
     return root;
   }
 
+  private static User userFromGroup(Group root) {
+    return new User(getLong(root, "id"), getString(root, "name"), getPhoneNumbers(getGroup(root, "phoneNumbers")),
+        getLocation(getGroup(root, "location")));
+  }
+
+  private static List<PhoneNumber> getPhoneNumbers(Group phoneNumbers) {
+    if (phoneNumbers == null) {
+      return null;
+    }
+    List<PhoneNumber> list = new ArrayList<>();
+    for (int i = 0, n = phoneNumbers.getFieldRepetitionCount("phone"); i < n; ++i) {
+      Group phone = phoneNumbers.getGroup("phone", i);
+      list.add(new PhoneNumber(getLong(phone, "number"), getString(phone, "kind")));
+    }
+    return list;
+  }
+
+  private static Location getLocation(Group location) {
+    if (location == null) {
+      return null;
+    }
+    return new Location(getDouble(location, "lon"), getDouble(location, "lat"));
+  }
+
+  private static boolean isNull(Group group, String field) {
+    int repetition = group.getFieldRepetitionCount(field);
+    if (repetition == 0) {
+      return true;
+    } else if (repetition == 1) {
+      return false;
+    }
+    throw new AssertionError("Invalid repetitionCount " + repetition + " for field " + field + " in group " + group);
+  }
+
+  private static Long getLong(Group group, String field) {
+    return isNull(group, field) ? null : group.getLong(field, 0);
+  }
+
+  private static String getString(Group group, String field) {
+    return isNull(group, field) ? null : group.getString(field, 0);
+  }
+
+  private static Double getDouble(Group group, String field) {
+    return isNull(group, field) ? null : group.getDouble(field, 0);
+  }
+
+  private static Group getGroup(Group group, String field) {
+    return isNull(group, field) ? null : group.getGroup(field, 0);
+  }
+
   public static File writeToFile(List<User> users) throws IOException {
     File f = File.createTempFile("phonebook", ".parquet");
     f.deleteOnExit();
@@ -229,25 +295,30 @@ public class PhoneBookWriter {
   }
 
   public static void writeToFile(File f, List<User> users) throws IOException {
-    Configuration conf = new Configuration();
-    GroupWriteSupport.setSchema(schema, conf);
+    write(ExampleParquetWriter.builder(new Path(f.getAbsolutePath())), users);
+  }
 
-    ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getAbsolutePath()), conf, new GroupWriteSupport());
-    for (User u : users) {
-      writer.write(groupFromUser(u));
+  public static void write(ParquetWriter.Builder<Group, ?> builder, List<User> users) throws IOException {
+    builder.config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+    try (ParquetWriter<Group> writer = builder.build()) {
+      for (User u : users) {
+        writer.write(groupFromUser(u));
+      }
     }
-    writer.close();
   }
 
-  public static List<Group> readFile(File f, Filter filter) throws IOException {
+  private static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
     Configuration conf = new Configuration();
     GroupWriteSupport.setSchema(schema, conf);
 
-    ParquetReader<Group> reader =
-        ParquetReader.builder(new GroupReadSupport(), new Path(f.getAbsolutePath()))
-                     .withConf(conf)
-                     .withFilter(filter)
-                     .build();
+    return ParquetReader.builder(new GroupReadSupport(), file)
+        .withConf(conf)
+        .withFilter(filter)
+        .build();
+  }
+
+  public static List<Group> readFile(File f, Filter filter) throws IOException {
+    ParquetReader<Group> reader = createReader(new Path(f.getAbsolutePath()), filter);
 
     Group current;
     List<Group> users = new ArrayList<Group>();
@@ -261,6 +332,16 @@ public class PhoneBookWriter {
     return users;
   }
 
+  public static List<User> readUsers(ParquetReader.Builder<Group> builder) throws IOException {
+    ParquetReader<Group> reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build();
+
+    List<User> users = new ArrayList<>();
+    for (Group group = reader.read(); group != null; group = reader.read()) {
+      users.add(userFromGroup(group));
+    }
+    return users;
+  }
+
   public static void main(String[] args) throws IOException {
     File f = new File(args[0]);
     writeToFile(f, TestRecordLevelFilters.makeUsers());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
new file mode 100644
index 0000000..71155ce
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.emptyList;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.io.api.Binary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for high level column index based filtering.
+ */
+@RunWith(Parameterized.class)
+public class TestColumnIndexFiltering {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexFiltering.class);
+  private static final Random RANDOM = new Random(42);
+  private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
+  private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
+  private static final Path FILE_V1 = createTempFile();
+  private static final Path FILE_V2 = createTempFile();
+
+  @Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+  }
+
+  private final Path file;
+
+  public TestColumnIndexFiltering(Path file) {
+    this.file = file;
+  }
+
+  private static List<User> generateData(int rowCount) {
+    List<User> users = new ArrayList<>();
+    List<String> names = generateNames(rowCount);
+    for (int i = 0; i < rowCount; ++i) {
+      users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
+    }
+    return users;
+  }
+
+  private static List<String> generateNames(int rowCount) {
+    List<String> list = new ArrayList<>();
+
+    // Adding fix values for filtering
+    list.add("anderson");
+    list.add("anderson");
+    list.add("miller");
+    list.add("miller");
+    list.add("miller");
+    list.add("thomas");
+    list.add("thomas");
+    list.add("williams");
+
+    int nullCount = rowCount / 100;
+
+    String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
+    int maxLength = 8;
+    for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
+      int l = RANDOM.nextInt(maxLength);
+      StringBuilder builder = new StringBuilder(l);
+      for (int j = 0; j < l; ++j) {
+        builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
+      }
+      list.add(builder.toString());
+    }
+    Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
+
+    // Adding nulls to random places
+    for (int i = 0; i < nullCount; ++i) {
+      list.add(RANDOM.nextInt(list.size()), null);
+    }
+
+    return list;
+  }
+
+  private static List<PhoneNumber> generatePhoneNumbers() {
+    int length = RANDOM.nextInt(5) - 1;
+    if (length < 0) {
+      return null;
+    }
+    List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
+    for (int i = 0; i < length; ++i) {
+      // 6 digits numbers
+      long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
+      phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+    }
+    return phoneNumbers;
+  }
+
+  private static Location generateLocation(int id, int rowCount) {
+    if (RANDOM.nextDouble() < 0.01) {
+      return null;
+    }
+
+    double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
+    double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
+
+    return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
+  }
+
+  private static Path createTempFile() {
+    try {
+      return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
+    return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
+  }
+
+  private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+      throws IOException {
+    return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
+  }
+
+  private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
+    return readUsers(filter, useOtherFiltering, true);
+  }
+
+  private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+      throws IOException {
+    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(filter)
+        .useDictionaryFilter(useOtherFiltering)
+        .useStatsFilter(useOtherFiltering)
+        .useRecordFilter(useOtherFiltering)
+        .useColumnIndexFilter(useColumnIndexFilter));
+  }
+
+  // Assumes that both lists are in the same order
+  private static void assertContains(Stream<User> expected, List<User> actual) {
+    Iterator<User> expIt = expected.iterator();
+    if (!expIt.hasNext()) {
+      return;
+    }
+    User exp = expIt.next();
+    for (User act : actual) {
+      if (act.equals(exp)) {
+        if (!expIt.hasNext()) {
+          break;
+        }
+        exp = expIt.next();
+      }
+    }
+    assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
+  }
+
+  private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
+      throws IOException {
+    // Check with only column index based filtering
+    List<User> result = readUsers(actualFilter, false);
+
+    assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());
+    LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
+        100 * result.size() / DATA.size());
+    // Asserts that all the required records are in the result
+    assertContains(DATA.stream().filter(expectedFilter), result);
+    // Asserts that all the retrieved records are in the file (validating non-matching records)
+    assertContains(result.stream(), DATA);
+
+    // Check with all the filtering filtering to ensure the result contains exactly the required values
+    result = readUsers(actualFilter, true);
+    assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
+  }
+
+  @BeforeClass
+  public static void createFile() throws IOException {
+    int pageSize = DATA.size() / 10;     // Ensure that several pages will be created
+    int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withWriterVersion(WriterVersion.PARQUET_1_0),
+        DATA);
+    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withWriterVersion(WriterVersion.PARQUET_2_0),
+        DATA);
+  }
+
+  @AfterClass
+  public static void deleteFile() throws IOException {
+    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
+    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    assertCorrectFiltering(
+        record -> record.getId() == 1234,
+        eq(longColumn("id"), 1234l));
+    assertCorrectFiltering(
+        record -> "miller".equals(record.getName()),
+        eq(binaryColumn("name"), Binary.fromString("miller")));
+    assertCorrectFiltering(
+        record -> record.getName() == null,
+        eq(binaryColumn("name"), null));
+  }
+
+  @Test
+  public void testNoFiltering() throws IOException {
+    // Column index filtering with no-op filter
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
+
+    // Column index filtering turned off
+    assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
+        readUsers(eq(longColumn("id"), 1234l), true, false));
+    assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()),
+        readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
+    assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()),
+        readUsers(eq(binaryColumn("name"), null), true, false));
+
+    // Every filtering mechanism turned off
+    assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
+    assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
+    assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
+  }
+
+  @Test
+  public void testComplexFiltering() throws IOException {
+    assertCorrectFiltering(
+        record -> {
+          Location loc = record.getLocation();
+          Double lat = loc == null ? null : loc.getLat();
+          Double lon = loc == null ? null : loc.getLon();
+          return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
+        },
+        and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)),
+            and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
+    assertCorrectFiltering(
+        record -> {
+          Location loc = record.getLocation();
+          return loc == null || (loc.getLat() == null && loc.getLon() == null);
+        },
+        and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null)));
+    assertCorrectFiltering(
+        record -> {
+          String name = record.getName();
+          return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
+        },
+        and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
+  }
+
+  public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
+    private static final Binary A = Binary.fromString("a");
+    private static final Binary B = Binary.fromString("b");
+    private static final Binary E = Binary.fromString("e");
+    private static final Binary F = Binary.fromString("f");
+    private static final Binary I = Binary.fromString("i");
+    private static final Binary J = Binary.fromString("j");
+    private static final Binary O = Binary.fromString("o");
+    private static final Binary P = Binary.fromString("p");
+    private static final Binary U = Binary.fromString("u");
+    private static final Binary V = Binary.fromString("v");
+
+    private static boolean isStartingWithVowel(String str) {
+      if (str == null || str.isEmpty()) {
+        return false;
+      }
+      switch (str.charAt(0)) {
+        case 'a':
+        case 'e':
+        case 'i':
+        case 'o':
+        case 'u':
+          return true;
+        default:
+          return false;
+      }
+    }
+
+    @Override
+    public boolean keep(Binary value) {
+      return value != null && isStartingWithVowel(value.toStringUsingUTF8());
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      Binary min = statistics.getMin();
+      Binary max = statistics.getMax();
+      return cmp.compare(max, A) < 0
+          || (cmp.compare(min, B) >= 0 && cmp.compare(max, E) < 0)
+          || (cmp.compare(min, F) >= 0 && cmp.compare(max, I) < 0)
+          || (cmp.compare(min, J) >= 0 && cmp.compare(max, O) < 0)
+          || (cmp.compare(min, P) >= 0 && cmp.compare(max, U) < 0)
+          || cmp.compare(min, V) >= 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      Binary min = statistics.getMin();
+      Binary max = statistics.getMax();
+      return (cmp.compare(min, A) >= 0 && cmp.compare(max, B) < 0)
+          || (cmp.compare(min, E) >= 0 && cmp.compare(max, F) < 0)
+          || (cmp.compare(min, I) >= 0 && cmp.compare(max, J) < 0)
+          || (cmp.compare(min, O) >= 0 && cmp.compare(max, P) < 0)
+          || (cmp.compare(min, U) >= 0 && cmp.compare(max, V) < 0);
+    }
+  }
+
+  public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
+    private long divisor;
+
+    IsDivisibleBy(long divisor) {
+      this.divisor = divisor;
+    }
+
+    @Override
+    public boolean keep(Long value) {
+      return value != null && value % divisor == 0;
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Long> statistics) {
+      long min = statistics.getMin();
+      long max = statistics.getMax();
+      return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Long> statistics) {
+      long min = statistics.getMin();
+      long max = statistics.getMax();
+      return min == max && min % divisor == 0;
+    }
+  }
+
+  @Test
+  public void testUDF() throws IOException {
+    assertCorrectFiltering(
+        record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
+        or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+            userDefined(longColumn("id"), new IsDivisibleBy(234))));
+    assertCorrectFiltering(
+        record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
+            not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+                userDefined(longColumn("id"), new IsDivisibleBy(234)))));
+  }
+
+  @Test
+  public void testFilteringWithMissingColumns() throws IOException {
+    // Missing column filter is always true
+    assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
+    assertCorrectFiltering(
+        record -> record.getId() == 1234,
+        and(eq(longColumn("id"), 1234l),
+            eq(longColumn("not-existing-long"), null)));
+    assertCorrectFiltering(
+        record -> "miller".equals(record.getName()),
+        and(eq(binaryColumn("name"), Binary.fromString("miller")),
+            invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
+
+    // Missing column filter is always false
+    assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
+    assertCorrectFiltering(
+        record -> "miller".equals(record.getName()),
+        or(eq(binaryColumn("name"), Binary.fromString("miller")),
+            gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
+    assertCorrectFiltering(
+        record -> record.getId() == 1234,
+        or(eq(longColumn("id"), 1234l),
+            userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
+  }
+}