You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by jc...@apache.org on 2020/05/26 14:47:53 UTC

[orc] branch master updated: ORC-577: Allow row-level filtering

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fac967  ORC-577: Allow row-level filtering
6fac967 is described below

commit 6fac967b62feea5bf45bca48b30c307142fa1c64
Author: Panagiotis Garefalakis <pg...@cloudera.com>
AuthorDate: Tue May 26 15:47:42 2020 +0100

    ORC-577: Allow row-level filtering
    
    Fixes #475
---
 java/core/src/java/org/apache/orc/Reader.java      |   40 +
 .../java/org/apache/orc/impl/BitFieldReader.java   |   42 +-
 .../apache/orc/impl/ConvertTreeReaderFactory.java  |  148 ++-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |   23 +-
 .../org/apache/orc/impl/RunLengthByteReader.java   |   30 +-
 .../org/apache/orc/impl/SerializationUtils.java    |    9 +
 .../org/apache/orc/impl/TreeReaderFactory.java     |  647 ++++++++--
 .../orc/impl/reader/tree/PrimitiveBatchReader.java |    2 +-
 .../orc/impl/reader/tree/StructBatchReader.java    |   46 +-
 .../apache/orc/impl/reader/tree/TypeReader.java    |    4 +-
 .../apache/orc/TestRowFilteringComplexTypes.java   |  326 +++++
 .../org/apache/orc/TestRowFilteringNoSkip.java     |  414 +++++++
 .../test/org/apache/orc/TestRowFilteringSkip.java  | 1281 ++++++++++++++++++++
 .../src/test/org/apache/orc/TestVectorOrcFile.java |    6 +-
 java/core/src/test/resources/orc_split_elim.orc    |  Bin 0 -> 2298 bytes
 java/pom.xml                                       |    2 +-
 16 files changed, 2826 insertions(+), 194 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 007c515..0aae622 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -22,9 +22,11 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 /**
  * The interface for reading ORC files.
@@ -187,6 +189,8 @@ public interface Reader extends Closeable {
     private Boolean useZeroCopy = null;
     private Boolean skipCorruptRecords = null;
     private TypeDescription schema = null;
+    private String[] preFilterColumns = null;
+    Consumer<VectorizedRowBatch> skipRowCallback = null;
     private DataReader dataReader = null;
     private Boolean tolerateMissingSchema = null;
     private boolean forcePositionalEvolution;
@@ -238,6 +242,34 @@ public interface Reader extends Closeable {
     }
 
     /**
+     * Set a row level filter.
+     * This is an advanced feature that allows the caller to specify
+     * a list of columns that are read first and then a filter that
+     * is called to determine which rows if any should be read.
+     *
+     * User should expect the batches that come from the reader
+     * to use the selected array set by their filter.
+     *
+     * Use cases for this are predicates that SearchArgs can't represent,
+     * such as relationships between columns (eg. columnA == columnB).
+     * @param filterColumnNames a comma separated list of the column names that
+     *                      are read before the filter is applied. Only top
+     *                      level columns in the reader's schema can be used
+     *                      here and they must not be duplicated.
+     * @param filterCallback a function callback to perform filtering during the call to
+     *              RecordReader.nextBatch. This function should not reference
+     *               any static fields nor modify the passed in ColumnVectors but
+     *               should set the filter output using the selected array.
+     *
+     * @return this
+     */
+    public Options setRowFilter(String[] filterColumnNames, Consumer<VectorizedRowBatch> filterCallback) {
+      this.preFilterColumns = filterColumnNames;
+      this.skipRowCallback =  filterCallback;
+      return this;
+    }
+
+    /**
      * Set search argument for predicate push down.
      * @param sarg the search argument
      * @param columnNames the column names for
@@ -336,6 +368,14 @@ public interface Reader extends Closeable {
       return sarg;
     }
 
+    public Consumer<VectorizedRowBatch> getFilterCallback() {
+      return skipRowCallback;
+    }
+
+    public String[] getPreFilterColumnNames(){
+      return preFilterColumns;
+    }
+
     public String[] getColumnNames() {
       return columnNames;
     }
diff --git a/java/core/src/java/org/apache/orc/impl/BitFieldReader.java b/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
index 5daa204..f1d386c 100644
--- a/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,6 +21,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 
 public final class BitFieldReader {
   private final RunLengthByteReader input;
@@ -51,6 +52,38 @@ public final class BitFieldReader {
   }
 
   public void nextVector(LongColumnVector previous,
+                         FilterContext filterContext,
+                         long previousLen) throws IOException {
+    previous.isRepeating = false;
+    int previousIdx = 0;
+    if (previous.noNulls) {
+      for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+        int idx = filterContext.getSelected()[i];
+        if (idx - previousIdx > 0) {
+          skip(idx - previousIdx);
+        }
+        previous.vector[idx] = next();
+        previousIdx = idx + 1;
+      }
+      skip(previousLen - previousIdx);
+    } else {
+      for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+        int idx = filterContext.getSelected()[i];
+        if (idx - previousIdx > 0) {
+          skip(TreeReaderFactory.TreeReader.countNonNullRowsInRange(previous.isNull, previousIdx, idx));
+        }
+        if (!previous.isNull[idx]) {
+          previous.vector[idx] = next();
+        } else {
+          previous.vector[idx] = 1;
+        }
+        previousIdx = idx + 1;
+      }
+      skip(TreeReaderFactory.TreeReader.countNonNullRowsInRange(previous.isNull, previousIdx, (int)previousLen));
+    }
+  }
+
+  public void nextVector(LongColumnVector previous,
                          long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
@@ -95,8 +128,11 @@ public final class BitFieldReader {
     } else {
       final long bitsToSkip = (totalBits - availableBits);
       input.skip(bitsToSkip / 8);
-      current = input.next();
-      currentIdx = (byte) (bitsToSkip % 8);
+      // Edge case: when skipping the last bits of a bitField there is nothing more to read!
+      if (input.hasNext()) {
+        current = input.next();
+        currentIdx = (byte) (bitsToSkip % 8);
+      }
     }
   }
 
diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index e6d3863..6b6b640 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.OrcProto;
 import org.apache.orc.TypeDescription;
@@ -400,8 +401,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      fromReader.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      fromReader.nextVector(previousVector, isNull, batchSize, filterContext);
       LongColumnVector resultColVector = (LongColumnVector) previousVector;
       if (downCastNeeded) {
         if (resultColVector.isRepeating) {
@@ -451,14 +453,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, longColVector, batchSize);
     }
@@ -535,14 +538,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, longColVector, batchSize);
     }
@@ -574,14 +578,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, longColVector, batchSize);
     }
@@ -609,14 +614,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, longColVector, batchSize);
     }
@@ -647,14 +653,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, doubleColVector, batchSize);
     }
@@ -682,14 +689,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, doubleColVector, batchSize);
     }
@@ -719,14 +727,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, doubleColVector, batchSize);
     }
@@ -755,14 +764,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, doubleColVector, batchSize);
     }
@@ -776,9 +786,10 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Read present/isNull stream
-      fromReader.nextVector(previousVector, isNull, batchSize);
+      fromReader.nextVector(previousVector, isNull, batchSize, filterContext);
       DoubleColumnVector vector = (DoubleColumnVector) previousVector;
       if (previousVector.isRepeating) {
         vector.vector[0] = (float) vector.vector[0];
@@ -813,15 +824,16 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void nextVector(ColumnVector previousVector,
-        boolean[] isNull,
-        final int batchSize) throws IOException {
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, decimalColVector, batchSize);
     }
@@ -858,14 +870,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, decimalColVector, batchSize);
     }
@@ -900,14 +913,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, decimalColVector, batchSize);
     }
@@ -944,14 +958,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, decimalColVector, batchSize);
     }
@@ -985,14 +1000,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (fileDecimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         fileDecimalColVector = new DecimalColumnVector(batchSize, filePrecision, fileScale);
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(fileDecimalColVector, isNull, batchSize);
+      fromReader.nextVector(fileDecimalColVector, isNull, batchSize, filterContext);
 
       convertVector(fileDecimalColVector, decimalColVector, batchSize);
     }
@@ -1019,14 +1035,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, bytesColVector, batchSize);
     }
@@ -1078,14 +1095,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, bytesColVector, batchSize);
     }
@@ -1126,14 +1144,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, bytesColVector, batchSize);
     }
@@ -1247,14 +1266,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, bytesColVector, batchSize);
     }
@@ -1284,14 +1304,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new DateColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, bytesColVector, batchSize);
     }
@@ -1309,8 +1330,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      fromReader.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      fromReader.nextVector(previousVector, isNull, batchSize, filterContext);
 
       BytesColumnVector resultColVector = (BytesColumnVector) previousVector;
 
@@ -1370,14 +1392,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (inBytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         inBytesColVector = new BytesColumnVector();
         outBytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(inBytesColVector, isNull, batchSize);
+      fromReader.nextVector(inBytesColVector, isNull, batchSize, filterContext);
 
       convertVector(inBytesColVector, outBytesColVector, batchSize);
     }
@@ -1413,7 +1436,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
@@ -1421,7 +1445,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
       timestampColVector.changeCalendar(fileUsedProlepticGregorian, false);
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, true);
@@ -1473,7 +1497,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
@@ -1481,7 +1506,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
       timestampColVector.changeCalendar(fileUsedProlepticGregorian, false);
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, true);
@@ -1531,7 +1556,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
@@ -1539,7 +1565,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
       timestampColVector.changeCalendar(fileUsedProlepticGregorian, false);
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, true);
@@ -1588,14 +1614,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         timestampColVector = (TimestampColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, false);
@@ -1630,14 +1657,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new DateColumnVector();
         timestampColVector = (TimestampColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, false);
@@ -1672,7 +1700,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
@@ -1688,7 +1717,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
         }
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, longColVector, batchSize);
       if (dateColumnVector != null) {
@@ -1723,7 +1752,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
@@ -1734,7 +1764,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
         }
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, longColVector, batchSize);
       if (longColVector instanceof DateColumnVector) {
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 3c924de..cea1094 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -61,7 +61,9 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.TimeZone;
+import java.util.TreeSet;
 
 public class RecordReaderImpl implements RecordReader {
   static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
@@ -219,9 +221,25 @@ public class RecordReaderImpl implements RecordReader {
       skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
     }
 
+    // Map columnNames to ColumnIds
+    SortedSet<Integer> filterColIds = new TreeSet<>();
+    if (options.getPreFilterColumnNames() != null) {
+      for (String colName : options.getPreFilterColumnNames()) {
+        int expandColId = findColumns(evolution, colName);
+        if (expandColId != -1) {
+          filterColIds.add(expandColId);
+        } else {
+          throw new IllegalArgumentException("Filter could not find column with name: " +
+              colName + " on " + evolution.getReaderBaseSchema());
+        }
+      }
+      LOG.info("Filter Columns: " + filterColIds);
+    }
+
     TreeReaderFactory.ReaderContext readerContext =
         new TreeReaderFactory.ReaderContext()
           .setSchemaEvolution(evolution)
+          .setFilterCallback(filterColIds, options.getFilterCallback())
           .skipCorrupt(skipCorrupt)
           .fileFormat(fileReader.getFileVersion())
           .useUTCTimestamp(fileReader.useUTCTimestamp)
@@ -1166,16 +1184,17 @@ public class RecordReaderImpl implements RecordReader {
           batch.size = 0;
           return false;
         }
+        // Read stripe in Memory
         readStripe();
       }
 
       int batchSize = computeBatchSize(batch.getMaxSize());
-
       rowInStripe += batchSize;
       reader.setVectorColumnCount(batch.getDataColumnCount());
       reader.nextBatch(batch, batchSize);
       advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return batch.size  != 0;
+      // batch.size can be modified by filter so only batchSize can tell if we actually read rows
+      return batchSize != 0;
     } catch (IOException e) {
       // Rethrow exception with file name in log message
       throw new IOException("Error reading file: " + path, e);
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 0a75ca9..9f4c236 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
@@ -43,7 +44,7 @@ public class RunLengthByteReader {
     this.input = input;
   }
 
-  private void readValues(boolean ignoreEof) throws IOException {
+  private void readValues(boolean ignoreEof, int numSkipRows) throws IOException {
     int control = input.read();
     used = 0;
     if (control == -1) {
@@ -51,19 +52,26 @@ public class RunLengthByteReader {
         throw new EOFException("Read past end of buffer RLE byte from " + input);
       }
       used = numLiterals = 0;
-      return;
     } else if (control < 0x80) {
       repeat = true;
       numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
-      int val = input.read();
-      if (val == -1) {
-        throw new EOFException("Reading RLE byte got EOF");
+      if (numSkipRows >= numLiterals) {
+        IOUtils.skipFully(input,1);
+      } else {
+        int val = input.read();
+        if (val == -1) {
+          throw new EOFException("Reading RLE byte got EOF");
+        }
+        literals[0] = (byte) val;
       }
-      literals[0] = (byte) val;
     } else {
       repeat = false;
       numLiterals = 0x100 - control;
-      int bytes = 0;
+      numSkipRows = Math.min(numSkipRows, numLiterals);
+      if (numSkipRows > 0) {
+        IOUtils.skipFully(input, numSkipRows);
+      }
+      int bytes = numSkipRows;
       while (bytes < numLiterals) {
         int result = input.read(literals, bytes, numLiterals - bytes);
         if (result == -1) {
@@ -81,7 +89,7 @@ public class RunLengthByteReader {
   public byte next() throws IOException {
     byte result;
     if (used == numLiterals) {
-      readValues(false);
+      readValues(false, 0);
     }
     if (repeat) {
       result = literals[0];
@@ -145,7 +153,7 @@ public class RunLengthByteReader {
     if (consumed != 0) {
       // a loop is required for cases where we break the run into two parts
       while (consumed > 0) {
-        readValues(false);
+        readValues(false, 0);
         used = consumed;
         consumed -= numLiterals;
       }
@@ -158,7 +166,7 @@ public class RunLengthByteReader {
   public void skip(long items) throws IOException {
     while (items > 0) {
       if (used == numLiterals) {
-        readValues(false);
+        readValues(false, (int) items);
       }
       long consume = Math.min(items, numLiterals - used);
       used += consume;
diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
index 06ba711..dd40b31 100644
--- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc.impl;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.OrcFile;
@@ -97,6 +98,10 @@ public final class SerializationUtils {
     return Float.intBitsToFloat(val);
   }
 
+  public void skipFloat(InputStream in, int numOfFloats) throws IOException {
+    IOUtils.skipFully(in, numOfFloats * 4L);
+  }
+
   public void writeFloat(OutputStream output,
                          float value) throws IOException {
     int ser = Float.floatToIntBits(value);
@@ -135,6 +140,10 @@ public final class SerializationUtils {
     }
   }
 
+  public void skipDouble(InputStream in, int numOfDoubles) throws IOException {
+    IOUtils.skipFully(in, numOfDoubles * 8L);
+  }
+
   public void writeDouble(OutputStream output,
                           double value) throws IOException {
     writeLongLE(output, Double.doubleToLongBits(value));
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index d17de68..cfe1603 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimeZone;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
@@ -61,6 +65,10 @@ public class TreeReaderFactory {
   public interface Context {
     SchemaEvolution getSchemaEvolution();
 
+    Set<Integer> getColumnFilterIds();
+
+    Consumer<VectorizedRowBatch> getColumnFilterCallback();
+
     boolean isSkipCorrupt();
 
     boolean getUseUTCTimestamp();
@@ -85,6 +93,8 @@ public class TreeReaderFactory {
     private ReaderEncryption encryption;
     private boolean useProlepticGregorian;
     private boolean fileUsedProlepticGregorian;
+    private Set<Integer> filterColumnIds = Collections.emptySet();
+    Consumer<VectorizedRowBatch> filterCallback;
 
     public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
       this.evolution = evolution;
@@ -96,6 +106,12 @@ public class TreeReaderFactory {
       return this;
     }
 
+    public ReaderContext setFilterCallback(Set<Integer> filterColumnsList, Consumer<VectorizedRowBatch> filterCallback) {
+      this.filterColumnIds = filterColumnsList;
+      this.filterCallback = filterCallback;
+      return this;
+    }
+
     public ReaderContext skipCorrupt(boolean skipCorrupt) {
       this.skipCorrupt = skipCorrupt;
       return this;
@@ -129,6 +145,16 @@ public class TreeReaderFactory {
     }
 
     @Override
+    public Set<Integer> getColumnFilterIds() {
+      return filterColumnIds;
+    }
+
+    @Override
+    public Consumer<VectorizedRowBatch> getColumnFilterCallback() {
+      return filterCallback;
+    }
+
+    @Override
     public boolean isSkipCorrupt() {
       return skipCorrupt;
     }
@@ -255,6 +281,16 @@ public class TreeReaderFactory {
       }
     }
 
+    protected static int countNonNullRowsInRange(boolean[] isNull, int start, int end) {
+      int result = 0;
+      while (start < end) {
+        if (!isNull[start++]) {
+          result++;
+        }
+      }
+      return result;
+    }
+
     protected long countNonNulls(long rows) throws IOException {
       if (present != null) {
         long result = 0;
@@ -278,11 +314,14 @@ public class TreeReaderFactory {
      * @param isNull Whether the each value was null at a higher level. If
      *               isNull is null, all values are non-null.
      * @param batchSize      Size of the column vector
+     * @param filterContext the information about the rows that were selected
+     *                      by the filter.
      * @throws IOException
      */
     public void nextVector(ColumnVector previous,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (present != null || isNull != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
         // present stream
@@ -350,7 +389,8 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
+    public void nextVector(ColumnVector vector, boolean[] isNull, int size,
+        FilterContext filterContext) {
       vector.noNulls = false;
       vector.isNull[0] = true;
       vector.isRepeating = true;
@@ -397,14 +437,19 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
-      // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
+      if (filterContext.isSelectedInUse()) {
+        reader.nextVector(result, filterContext, batchSize);
+      } else {
+        // Read value entries based on isNull entries
+        reader.nextVector(result, batchSize);
+      }
     }
   }
 
@@ -441,11 +486,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -506,11 +552,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -571,11 +618,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -637,11 +685,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -686,15 +735,9 @@ public class TreeReaderFactory {
       stream.seek(index);
     }
 
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
+    private void nextVector(DoubleColumnVector result,
+                            boolean[] isNull,
+                            final int batchSize) throws IOException {
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
 
@@ -737,6 +780,75 @@ public class TreeReaderFactory {
       }
     }
 
+    private void nextVector(DoubleColumnVector result,
+                            boolean[] isNull,
+                            FilterContext filterContext,
+                            final int batchSize) throws IOException {
+      final boolean hasNulls = !result.noNulls;
+      boolean allNulls = hasNulls;
+      result.isRepeating = false;
+      int previousIdx = 0;
+
+      if (batchSize > 0) {
+        if (hasNulls) {
+          // conditions to ensure bounds checks skips
+          for (int i = 0; batchSize <= result.isNull.length && i < batchSize; i++) {
+            allNulls = allNulls & result.isNull[i];
+          }
+          if (allNulls) {
+            result.vector[0] = Double.NaN;
+            result.isRepeating = true;
+          } else {
+            // some nulls
+            // conditions to ensure bounds checks skips
+            for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+              int idx = filterContext.getSelected()[i];
+              if (idx - previousIdx > 0) {
+                utils.skipFloat(stream, countNonNullRowsInRange(result.isNull, previousIdx, idx));
+              }
+              if (!result.isNull[idx]) {
+                result.vector[idx] = utils.readFloat(stream);
+              } else {
+                // If the value is not present then set NaN
+                result.vector[idx] = Double.NaN;
+              }
+              previousIdx = idx + 1;
+            }
+            utils.skipFloat(stream, countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+          }
+        } else {
+          // Read only the selected row indexes and skip the rest
+          for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+            int idx = filterContext.getSelected()[i];
+            if (idx - previousIdx > 0) {
+              utils.skipFloat(stream,idx - previousIdx);
+            }
+            result.vector[idx] = utils.readFloat(stream);
+            previousIdx = idx + 1;
+          }
+          utils.skipFloat(stream,batchSize - previousIdx);
+        }
+      }
+
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize, filterContext);
+
+      if (filterContext.isSelectedInUse()) {
+        nextVector(result, isNull, filterContext, batchSize);
+      } else {
+        nextVector(result, isNull, batchSize);
+      }
+    }
+
     @Override
     public void skipRows(long items) throws IOException {
       items = countNonNulls(items);
@@ -780,14 +892,62 @@ public class TreeReaderFactory {
       stream.seek(index);
     }
 
-    @Override
-    public void nextVector(ColumnVector previousVector,
+    private void nextVector(DoubleColumnVector result,
+                            boolean[] isNull,
+                            FilterContext filterContext,
+                            final int batchSize) throws IOException {
+
+      final boolean hasNulls = !result.noNulls;
+      boolean allNulls = hasNulls;
+      result.isRepeating = false;
+      if (batchSize != 0) {
+        if (hasNulls) {
+          // conditions to ensure bounds checks skips
+          for (int i = 0; i < batchSize && batchSize <= result.isNull.length; i++) {
+            allNulls = allNulls & result.isNull[i];
+          }
+          if (allNulls) {
+            result.vector[0] = Double.NaN;
+            result.isRepeating = true;
+          } else {
+            // some nulls
+            int previousIdx = 0;
+            // conditions to ensure bounds checks skips
+            for (int i = 0; batchSize <= result.isNull.length && i != filterContext.getSelectedSize(); i++) {
+              int idx = filterContext.getSelected()[i];
+              if (idx - previousIdx > 0) {
+                utils.skipDouble(stream, countNonNullRowsInRange(result.isNull, previousIdx, idx));
+              }
+              if (!result.isNull[idx]) {
+                result.vector[idx] = utils.readDouble(stream);
+              } else {
+                // If the value is not present then set NaN
+                result.vector[idx] = Double.NaN;
+              }
+              previousIdx = idx + 1;
+            }
+            utils.skipDouble(stream, countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+          }
+        } else {
+          // no nulls
+          int previousIdx = 0;
+          // Read only the selected row indexes and skip the rest
+          for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+            int idx = filterContext.getSelected()[i];
+            if (idx - previousIdx > 0) {
+              utils.skipDouble(stream, idx - previousIdx);
+            }
+            result.vector[idx] = utils.readDouble(stream);
+            previousIdx = idx + 1;
+          }
+          utils.skipDouble(stream, batchSize - previousIdx);
+        }
+      }
+    }
+
+    private void nextVector(DoubleColumnVector result,
                            boolean[] isNull,
                            final int batchSize) throws IOException {
-      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
 
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
@@ -831,6 +991,23 @@ public class TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize, filterContext);
+
+      if (filterContext.isSelectedInUse()) {
+        nextVector(result, isNull, filterContext, batchSize);
+      } else {
+        nextVector(result, isNull, batchSize);
+      }
+    }
+
+    @Override
     public void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       long len = items * 8;
@@ -894,11 +1071,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       scratchlcv.ensureSize(batchSize, false);
       BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
@@ -1040,33 +1218,31 @@ public class TreeReaderFactory {
       nanos.seek(index);
     }
 
-    @Override
-    public void nextVector(ColumnVector previousVector,
+    public void readTimestamp(TimestampColumnVector result, int idx) throws IOException {
+      final int newNanos = parseNanos(nanos.next());
+      long millis = (data.next() + base_timestamp)
+          * TimestampTreeWriter.MILLIS_PER_SECOND + newNanos / 1_000_000;
+      if (millis < 0 && newNanos > 999_999) {
+        millis -= TimestampTreeWriter.MILLIS_PER_SECOND;
+      }
+      long offset = 0;
+      // If reader and writer time zones have different rules, adjust the timezone difference
+      // between reader and writer taking day light savings into account.
+      if (!hasSameTZRules) {
+        offset = SerializationUtils.convertBetweenTimezones(writerTimeZone,
+            readerTimeZone, millis);
+      }
+      result.time[idx] = millis + offset;
+      result.nanos[idx] = newNanos;
+    }
+
+    public void nextVector(TimestampColumnVector result,
                            boolean[] isNull,
                            final int batchSize) throws IOException {
-      TimestampColumnVector result = (TimestampColumnVector) previousVector;
-      result.changeCalendar(fileUsesProleptic, false);
-      super.nextVector(previousVector, isNull, batchSize);
-
-      result.setIsUTC(context.getUseUTCTimestamp());
 
       for (int i = 0; i < batchSize; i++) {
         if (result.noNulls || !result.isNull[i]) {
-          final int newNanos = parseNanos(nanos.next());
-          long millis = (data.next() + base_timestamp)
-              * TimestampTreeWriter.MILLIS_PER_SECOND + newNanos / 1_000_000;
-          if (millis < 0 && newNanos > 999_999) {
-            millis -= TimestampTreeWriter.MILLIS_PER_SECOND;
-          }
-          long offset = 0;
-          // If reader and writer time zones have different rules, adjust the timezone difference
-          // between reader and writer taking day light savings into account.
-          if (!hasSameTZRules) {
-            offset = SerializationUtils.convertBetweenTimezones(writerTimeZone,
-                readerTimeZone, millis);
-          }
-          result.time[i] = millis + offset;
-          result.nanos[i] = newNanos;
+          readTimestamp(result, i);
           if (result.isRepeating && i != 0 &&
               (result.time[0] != result.time[i] ||
                   result.nanos[0] != result.nanos[i])) {
@@ -1077,6 +1253,57 @@ public class TreeReaderFactory {
       result.changeCalendar(useProleptic, true);
     }
 
+    public void nextVector(TimestampColumnVector result,
+                           boolean[] isNull,
+                           FilterContext filterContext,
+                           final int batchSize) throws IOException {
+      result.isRepeating = false;
+      int previousIdx = 0;
+      if (result.noNulls) {
+        for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+          int idx = filterContext.getSelected()[i];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(idx - previousIdx);
+          }
+          readTimestamp(result, idx);
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(batchSize - previousIdx);
+      } else {
+        for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+          int idx = filterContext.getSelected()[i];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+          }
+          if (!result.isNull[idx]) {
+            readTimestamp(result, idx);
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+      }
+      result.changeCalendar(useProleptic, true);
+
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      TimestampColumnVector result = (TimestampColumnVector) previousVector;
+      result.changeCalendar(fileUsesProleptic, false);
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
+
+      result.setIsUTC(context.getUseUTCTimestamp());
+
+      if (filterContext.isSelectedInUse()) {
+        nextVector(result, isNull, filterContext, batchSize);
+      } else {
+        nextVector(result, isNull, batchSize);
+      }
+    }
+
     private static int parseNanos(long serialized) {
       int zeros = 7 & (int) serialized;
       int result = (int) (serialized >>> 3);
@@ -1086,6 +1313,11 @@ public class TreeReaderFactory {
       return result;
     }
 
+    void skipStreamRows(long items) throws IOException {
+      data.skip(items);
+      nanos.skip(items);
+    }
+
     @Override
     public void skipRows(long items) throws IOException {
       items = countNonNulls(items);
@@ -1149,7 +1381,8 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
       if (needsDateColumnVector) {
         if (result instanceof DateColumnVector) {
@@ -1161,10 +1394,11 @@ public class TreeReaderFactory {
       }
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
+
       if (needsDateColumnVector) {
         ((DateColumnVector) result).changeCalendar(useProleptic, true);
       }
@@ -1279,6 +1513,60 @@ public class TreeReaderFactory {
       }
     }
 
+    private void nextVector(DecimalColumnVector result,
+                            boolean[] isNull,
+                            FilterContext filterContext,
+                            final int batchSize) throws IOException {
+      // Allocate space for the whole array
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      // But read only read the scales that are needed
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
+      // Read value entries based on isNull entries
+      // Use the fast ORC deserialization method that emulates SerializationUtils.readBigInteger
+      // provided by HiveDecimalWritable.
+      HiveDecimalWritable[] vector = result.vector;
+      HiveDecimalWritable decWritable;
+      if (result.noNulls) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); ++r) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(idx - previousIdx);
+          }
+          decWritable = vector[idx];
+          if (!decWritable.serializationUtilsRead(
+              valueStream, scratchScaleVector[idx],
+              scratchBytes)) {
+            result.isNull[idx] = true;
+            result.noNulls = false;
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(batchSize - previousIdx);
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); ++r) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+          }
+          if (!result.isNull[idx]) {
+            decWritable = vector[idx];
+            if (!decWritable.serializationUtilsRead(
+                valueStream, scratchScaleVector[idx],
+                scratchBytes)) {
+              result.isNull[idx] = true;
+              result.noNulls = false;
+            }
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+      }
+    }
+
     private void nextVector(Decimal64ColumnVector result,
                             boolean[] isNull,
                             final int batchSize) throws IOException {
@@ -1309,16 +1597,86 @@ public class TreeReaderFactory {
       result.scale = (short) scale;
     }
 
+    private void nextVector(Decimal64ColumnVector result,
+        boolean[] isNull,
+        FilterContext filterContext,
+        final int batchSize) throws IOException {
+      if (precision > TypeDescription.MAX_DECIMAL64_PRECISION) {
+        throw new IllegalArgumentException("Reading large precision type into" +
+            " Decimal64ColumnVector.");
+      }
+      // Allocate space for the whole array
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      // Read all the scales
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
+      if (result.noNulls) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); r++) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(idx - previousIdx);
+          }
+          result.vector[idx] = SerializationUtils.readVslong(valueStream);
+          for (int s=scratchScaleVector[idx]; s < scale; ++s) {
+            result.vector[idx] *= 10;
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(batchSize - previousIdx);
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); r++) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+          }
+          if (!result.isNull[idx]) {
+            result.vector[idx] = SerializationUtils.readVslong(valueStream);
+            for (int s=scratchScaleVector[idx]; s < scale; ++s) {
+              result.vector[idx] *= 10;
+            }
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+      }
+      result.precision = (short) precision;
+      result.scale = (short) scale;
+    }
+
     @Override
     public void nextVector(ColumnVector result,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result instanceof Decimal64ColumnVector) {
-        nextVector((Decimal64ColumnVector) result, isNull, batchSize);
+         if (filterContext.isSelectedInUse()) {
+           nextVector((Decimal64ColumnVector) result, isNull, filterContext, batchSize);
+         } else {
+           nextVector((Decimal64ColumnVector) result, isNull, batchSize);
+         }
       } else {
-        nextVector((DecimalColumnVector) result, isNull, batchSize);
+        if (filterContext.isSelectedInUse()) {
+          nextVector((DecimalColumnVector) result, isNull, filterContext, batchSize);
+        } else {
+          nextVector((DecimalColumnVector) result, isNull, batchSize);
+        }
+      }
+    }
+
+    void skipStreamRows(long items) throws IOException {
+      for (int i = 0; i < items; i++) {
+        int input;
+        do {
+          input = valueStream.read();
+          if (input == -1) {
+            throw new EOFException("Reading BigInteger past EOF from " + valueStream);
+          }
+        } while(input >= 128);
       }
     }
 
@@ -1389,23 +1747,52 @@ public class TreeReaderFactory {
     }
 
     private void nextVector(DecimalColumnVector result,
+                            FilterContext filterContext,
                             final int batchSize) throws IOException {
       if (result.noNulls) {
-        for (int r=0; r < batchSize; ++r) {
-          result.vector[r].setFromLongAndScale(valueReader.next(), scale);
-        }
-      } else if (!result.isRepeating || !result.isNull[0]) {
-        for (int r=0; r < batchSize; ++r) {
-          if (result.noNulls || !result.isNull[r]) {
+        if (filterContext.isSelectedInUse()) {
+          int previousIdx = 0;
+          for (int r = 0; r != filterContext.getSelectedSize(); ++r) {
+            int idx = filterContext.getSelected()[r];
+            if (idx - previousIdx > 0) {
+              valueReader.skip(idx - previousIdx);
+            }
+            result.vector[idx].setFromLongAndScale(valueReader.next(), scale);
+            previousIdx = idx + 1;
+          }
+          valueReader.skip(batchSize - previousIdx);
+        } else {
+          for (int r = 0; r < batchSize; ++r) {
             result.vector[r].setFromLongAndScale(valueReader.next(), scale);
           }
         }
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        if (filterContext.isSelectedInUse()) {
+          int previousIdx = 0;
+          for (int r = 0; r != filterContext.getSelectedSize(); ++r) {
+            int idx = filterContext.getSelected()[r];
+            if (idx - previousIdx > 0) {
+              valueReader.skip(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+            }
+            if (!result.isNull[r])
+              result.vector[idx].setFromLongAndScale(valueReader.next(), scale);
+            previousIdx = idx + 1;
+          }
+          valueReader.skip(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+        } else {
+            for (int r = 0; r < batchSize; ++r) {
+              if (!result.isNull[r]) {
+                result.vector[r].setFromLongAndScale(valueReader.next(), scale);
+              }
+            }
+          }
       }
       result.precision = (short) precision;
       result.scale = (short) scale;
     }
 
     private void nextVector(Decimal64ColumnVector result,
+                            FilterContext filterContext,
                             final int batchSize) throws IOException {
       valueReader.nextVector(result, result.vector, batchSize);
       result.precision = (short) precision;
@@ -1415,13 +1802,14 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector result,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result instanceof Decimal64ColumnVector) {
-        nextVector((Decimal64ColumnVector) result, batchSize);
+        nextVector((Decimal64ColumnVector) result, filterContext, batchSize);
       } else {
-        nextVector((DecimalColumnVector) result, batchSize);
+        nextVector((DecimalColumnVector) result, filterContext, batchSize);
       }
     }
 
@@ -1504,8 +1892,9 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      reader.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      reader.nextVector(previousVector, isNull, batchSize, filterContext);
     }
 
     @Override
@@ -1563,8 +1952,8 @@ public class TreeReaderFactory {
                                          BytesColumnVector result,
                                          final int batchSize) throws IOException {
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
-            result, (int) batchSize);
+        byte[] allBytes =
+            commonReadByteArrays(stream, lengths, scratchlcv, result, (int) batchSize);
 
         // Too expensive to figure out 'repeating' by comparisons.
         result.isRepeating = false;
@@ -1652,11 +2041,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       scratchlcv.ensureSize(batchSize, false);
       BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
@@ -1799,13 +2189,18 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
-      int offset;
-      int length;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
+      readDictionaryByteArray(result, filterContext, batchSize);
+    }
+
+    private void readDictionaryByteArray(BytesColumnVector result, FilterContext filterContext, int batchSize) throws IOException {
+      int offset;
+      int length;
 
       if (dictionaryBuffer != null) {
 
@@ -1821,17 +2216,31 @@ public class TreeReaderFactory {
         scratchlcv.ensureSize((int) batchSize, false);
         reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
         if (!scratchlcv.isRepeating) {
-
           // The vector has non-repeating strings. Iterate thru the batch
           // and set strings one by one
-          for (int i = 0; i < batchSize; i++) {
-            if (!scratchlcv.isNull[i]) {
-              offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
-              length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
-              result.setRef(i, dictionaryBufferInBytesCache, offset, length);
-            } else {
-              // If the value is null then set offset and length to zero (null string)
+          if (filterContext.isSelectedInUse()) {
+            // Set all string values to null - offset and length is zero
+            for (int i = 0; i < batchSize; i++)
               result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+            // Read selected rows from stream
+            for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+              int idx = filterContext.getSelected()[i];
+              if (!scratchlcv.isNull[idx]) {
+                offset = dictionaryOffsets[(int) scratchlcv.vector[idx]];
+                length = getDictionaryEntryLength((int) scratchlcv.vector[idx], offset);
+                result.setRef(idx, dictionaryBufferInBytesCache, offset, length);
+              }
+            }
+          } else {
+            for (int i = 0; i < batchSize; i++) {
+              if (!scratchlcv.isNull[i]) {
+                offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+                length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+                result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+              } else {
+                // If the value is null then set offset and length to zero (null string)
+                result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+              }
             }
           }
         } else {
@@ -1899,10 +2308,11 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
-      super.nextVector(previousVector, isNull, batchSize);
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
       BytesColumnVector result = (BytesColumnVector) previousVector;
       int adjustedDownLen;
       if (result.isRepeating) {
@@ -1955,10 +2365,11 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
-      super.nextVector(previousVector, isNull, batchSize);
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
       BytesColumnVector result = (BytesColumnVector) previousVector;
 
       int adjustedDownLen;
@@ -2038,8 +2449,9 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      super.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
       StructColumnVector result = (StructColumnVector) previousVector;
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         result.isRepeating = false;
@@ -2048,7 +2460,7 @@ public class TreeReaderFactory {
         boolean[] mask = result.noNulls ? null : result.isNull;
         for (int f = 0; f < fields.length; f++) {
           if (fields[f] != null) {
-            fields[f].nextVector(result.fields[f], mask, batchSize);
+            fields[f].nextVector(result.fields[f], mask, batchSize, filterContext);
           }
         }
       }
@@ -2115,9 +2527,10 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       UnionColumnVector result = (UnionColumnVector) previousVector;
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         result.isRepeating = false;
         tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
@@ -2129,7 +2542,7 @@ public class TreeReaderFactory {
             ignore[r] = (!result.noNulls && result.isNull[r]) ||
                 result.tags[r] != f;
           }
-          fields[f].nextVector(result.fields[f], ignore, batchSize);
+          fields[f].nextVector(result.fields[f], ignore, batchSize, filterContext);
         }
       }
     }
@@ -2159,6 +2572,27 @@ public class TreeReaderFactory {
     }
   }
 
+  private static FilterContext NULL_FILTER = new FilterContext() {
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public boolean isSelectedInUse() {
+      return false;
+    }
+
+    @Override
+    public int[] getSelected() {
+      return new int[0];
+    }
+
+    @Override
+    public int getSelectedSize() {
+      return 0;
+    }
+  };
+
   public static class ListTreeReader extends TreeReader {
     protected final TypeReader elementReader;
     protected IntegerReader lengths = null;
@@ -2195,9 +2629,10 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previous,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       ListColumnVector result = (ListColumnVector) previous;
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       // if we have some none-null values, then read them
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         lengths.nextVector(result, result.lengths, batchSize);
@@ -2212,7 +2647,8 @@ public class TreeReaderFactory {
           }
         }
         result.child.ensureSize(result.childCount, false);
-        elementReader.nextVector(result.child, null, result.childCount);
+        // We always read all of the children, because the parent filter wouldn't apply right.
+        elementReader.nextVector(result.child, null, result.childCount, NULL_FILTER);
       }
     }
 
@@ -2289,9 +2725,10 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previous,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       MapColumnVector result = (MapColumnVector) previous;
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         lengths.nextVector(result, result.lengths, batchSize);
         // even with repeating lengths, the map doesn't repeat
@@ -2306,8 +2743,8 @@ public class TreeReaderFactory {
         }
         result.keys.ensureSize(result.childCount, false);
         result.values.ensureSize(result.childCount, false);
-        keyReader.nextVector(result.keys, null, result.childCount);
-        valueReader.nextVector(result.values, null, result.childCount);
+        keyReader.nextVector(result.keys, null, result.childCount, NULL_FILTER);
+        valueReader.nextVector(result.values, null, result.childCount, NULL_FILTER);
       }
     }
 
@@ -2421,7 +2858,7 @@ public class TreeReaderFactory {
           throws IOException {
     TypeReader reader = createTreeReader(readerType, context);
     if (reader instanceof StructTreeReader) {
-      return new StructBatchReader((StructTreeReader) reader);
+      return new StructBatchReader((StructTreeReader) reader, context);
     } else {
       return new PrimitiveBatchReader(reader);
     }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java
index 669b6a5..7caeb0e 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java
@@ -31,7 +31,7 @@ public class PrimitiveBatchReader extends BatchReader {
                         int batchSize) throws IOException {
   batch.cols[0].reset();
   batch.cols[0].ensureSize(batchSize, false);
-  rootType.nextVector(batch.cols[0], null, batchSize);
+  rootType.nextVector(batch.cols[0], null, batchSize, batch);
   resetBatch(batch, batchSize);
   }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
index 9503436..e6a2db8 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
@@ -22,25 +22,55 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.impl.TreeReaderFactory;
 
 import java.io.IOException;
+import java.util.Set;
 
 public class StructBatchReader extends BatchReader {
+  // The reader context including row-filtering details
+  private final TreeReaderFactory.Context context;
 
-  public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader) {
+  public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader, TreeReaderFactory.Context context) {
     super(rowReader);
+    this.context = context;
+  }
+
+  private void readBatchColumn(VectorizedRowBatch batch, TypeReader[] children, int batchSize, int index)
+      throws IOException {
+    ColumnVector colVector = batch.cols[index];
+    if (colVector != null) {
+      colVector.reset();
+      colVector.ensureSize(batchSize, false);
+      children[index].nextVector(colVector, null, batchSize, batch);
+    }
   }
 
   @Override
   public void nextBatch(VectorizedRowBatch batch, int batchSize) throws IOException {
     TypeReader[] children = ((TreeReaderFactory.StructTreeReader) rootType).fields;
+    // Early expand fields --> apply filter --> expand remaining fields
+    Set<Integer> earlyExpandCols = context.getColumnFilterIds();
+
+    // Clear selected and early expand columns used in Filter
+    batch.selectedInUse = false;
+    for (int i = 0; i < children.length && !earlyExpandCols.isEmpty() &&
+        (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+      if (earlyExpandCols.contains(children[i].getColumnId())) {
+        readBatchColumn(batch, children, batchSize, i);
+      }
+    }
+    // Since we are going to filter rows based on some column values set batch.size earlier here
+    batch.size = batchSize;
+
+    // Apply filter callback to reduce number of # rows selected for decoding in the next TreeReaders
+    if (!earlyExpandCols.isEmpty() && this.context.getColumnFilterCallback() != null) {
+      this.context.getColumnFilterCallback().accept(batch);
+    }
+
+    // Read the remaining columns applying row-level filtering
     for (int i = 0; i < children.length &&
-                    (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
-      ColumnVector colVector = batch.cols[i];
-      if (colVector != null) {
-        colVector.reset();
-        colVector.ensureSize(batchSize, false);
-        children[i].nextVector(colVector, null, batchSize);
+        (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+      if (!earlyExpandCols.contains(children[i].getColumnId())) {
+        readBatchColumn(batch, children, batchSize, i);
       }
     }
-    resetBatch(batch, batchSize);
   }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java
index 5932bee..110cdb1 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java
@@ -18,6 +18,7 @@
 package org.apache.orc.impl.reader.tree;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.orc.OrcProto;
 import org.apache.orc.impl.PositionProvider;
 import org.apache.orc.impl.reader.StripePlanner;
@@ -37,7 +38,8 @@ public interface TypeReader {
 
   void nextVector(ColumnVector previous,
                   boolean[] isNull,
-                  int batchSize) throws IOException;
+                  int batchSize,
+                  FilterContext filterContext) throws IOException;
 
   int getColumnId();
 }
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
new file mode 100644
index 0000000..b3a55b5
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+
+public class TestRowFilteringComplexTypes {
+    private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+            + File.separator + "tmp"));
+
+    private Configuration conf;
+    private FileSystem fs;
+    private Path testFilePath;
+
+    private static final int ColumnBatchRows = 1024;
+
+    @Rule
+    public TestName testCaseName = new TestName();
+
+    @Before
+    public void openFileSystem() throws Exception {
+        conf = new Configuration();
+        fs = FileSystem.getLocal(conf);
+        testFilePath = new Path(workDir, "TestRowFilteringComplexTypes." + testCaseName.getMethodName() + ".orc");
+        fs.delete(testFilePath, false);
+    }
+
+    @Test
+    // Inner Struct should receive the filterContext and propagate it the the SubTypes
+    public void testInnerStructRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.createStruct()
+                .addField("int1", TypeDescription.createInt())
+                .addField("innerStruct", TypeDescription.createStruct()
+                        .addField("a", TypeDescription.createDecimal())
+                        .addField("b", TypeDescription.createDecimal())
+                );
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+                OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            StructColumnVector col2 = (StructColumnVector) batch.cols[1];
+            DecimalColumnVector innerCol1 = (DecimalColumnVector) col2.fields[0];
+            DecimalColumnVector innerCol2 = (DecimalColumnVector) col2.fields[1];
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    col1.vector[row] = row;
+                    if ((row % 2) == 0) {
+                        innerCol1.vector[row] = new HiveDecimalWritable(101 + row);
+                        innerCol2.vector[row] = new HiveDecimalWritable(100 + row);
+                    } else {
+                        innerCol1.vector[row] = new HiveDecimalWritable(999 + row);
+                        innerCol2.vector[row] = new HiveDecimalWritable(998 + row);
+                    }
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+                reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            StructColumnVector col2 = (StructColumnVector) batch.cols[1];
+
+            int noNullCnt = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < ColumnBatchRows; ++r) {
+                    StringBuilder sb = new StringBuilder();
+                    col2.stringifyValue(sb, r);
+                    if (sb.toString().compareTo("[0, 0]") != 0) {
+                        noNullCnt++;
+                    }
+                }
+            }
+            // Make sure that our filter worked
+            Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+            Assert.assertEquals(0, batch.selected[0]);
+            Assert.assertEquals(2, batch.selected[1]);
+        }
+    }
+
+
+    @Test
+    // Inner UNION should make use of the filterContext
+    public void testInnerUnionRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.fromString(
+            "struct<int1:int,innerUnion:uniontype<decimal(16,3),decimal(16,3)>>");
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+            OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            UnionColumnVector col2 = (UnionColumnVector) batch.cols[1];
+            Decimal64ColumnVector innerCol1 = (Decimal64ColumnVector) col2.fields[0];
+            Decimal64ColumnVector innerCol2 = (Decimal64ColumnVector) col2.fields[1];
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    int totalRow = ColumnBatchRows * b + row;
+                    col1.vector[row] = totalRow;
+                    col2.tags[row] = totalRow % 2;
+                    if (col2.tags[row] == 0) {
+                        innerCol1.vector[row] = totalRow * 1000;
+                    } else {
+                        innerCol2.vector[row] = totalRow * 3 * 1000;
+                    }
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+            reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            UnionColumnVector col2 = (UnionColumnVector) batch.cols[1];
+            Decimal64ColumnVector innerCol1 = (Decimal64ColumnVector) col2.fields[0];
+            Decimal64ColumnVector innerCol2 = (Decimal64ColumnVector) col2.fields[1];
+
+            int previousBatchRows = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < batch.size; ++r) {
+                    int row = batch.selected[r];
+                    int originalRow = (r + previousBatchRows) * 2;
+                    Assert.assertEquals("row " + originalRow, originalRow, col1.vector[row]);
+                    Assert.assertEquals("row " + originalRow, 0, col2.tags[row]);
+                    Assert.assertEquals("row " + originalRow,
+                        originalRow * 1000, innerCol1.vector[row]);
+                }
+                // check to make sure that we didn't read innerCol2
+                for(int r = 1; r < ColumnBatchRows; r += 2) {
+                    Assert.assertEquals("row " + r, 0, innerCol2.vector[r]);
+                }
+                previousBatchRows += batch.size;
+            }
+        }
+    }
+
+
+    @Test
+    // Inner MAP should NOT make use of the filterContext
+    // TODO: selected rows should be combined with map offsets
+    public void testInnerMapRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.createStruct()
+                .addField("int1", TypeDescription.createInt())
+                .addField("innerMap", TypeDescription.createMap(
+                        TypeDescription.createDecimal(),
+                        TypeDescription.createDecimal()
+                        )
+                );
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+                OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            MapColumnVector mapCol = (MapColumnVector) batch.cols[1];
+            DecimalColumnVector keyCol = (DecimalColumnVector) mapCol.keys;
+            DecimalColumnVector valCol = (DecimalColumnVector) mapCol.values;
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    col1.vector[row] = row;
+                    // Insert 2 kv pairs in each row
+                    for (int i = 0; i < 2; i++) {
+                        keyCol.vector[i] = new HiveDecimalWritable(i);
+                        valCol.vector[i] = new HiveDecimalWritable(i * 10);
+                    }
+                    mapCol.lengths[row] = 2;
+                    mapCol.offsets[row] = 0;
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+                reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            MapColumnVector col2 = (MapColumnVector) batch.cols[1];
+
+            int noNullCnt = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < ColumnBatchRows; ++r) {
+                    StringBuilder sb = new StringBuilder();
+                    col2.stringifyValue(sb, r);
+                    if (sb.toString().equals("[{\"key\": 0, \"value\": 0}, {\"key\": 1, \"value\": 10}]")) {
+                        noNullCnt++;
+                    }
+                }
+            }
+            // Make sure that we did NOT skip any rows
+            Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+            // Even though selected Array is still used its not propagated
+            Assert.assertEquals(0, batch.selected[0]);
+            Assert.assertEquals(2, batch.selected[1]);
+        }
+    }
+
+    @Test
+    // Inner LIST should NOT make use of the filterContext
+    // TODO: selected rows should be combined with list offsets
+    public void testInnerListRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.createStruct()
+                .addField("int1", TypeDescription.createInt())
+                .addField("innerList", TypeDescription
+                        .createList(TypeDescription.createDecimal())
+                );
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+                OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            ListColumnVector listCol = (ListColumnVector) batch.cols[1];
+            DecimalColumnVector listValues = (DecimalColumnVector) listCol.child;
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    col1.vector[row] = row;
+                    // Insert 10 values to the interList per row
+                    for (int i = 0; i < 10; i++) {
+                        listValues.vector[i] = new HiveDecimalWritable(i);
+                    }
+                    listCol.lengths[row] = 10;
+                    listCol.offsets[row] = 0;
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+                reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            ListColumnVector col2 = (ListColumnVector) batch.cols[1];
+
+            int noNullCnt = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < ColumnBatchRows; ++r) {
+                    StringBuilder sb = new StringBuilder();
+                    col2.stringifyValue(sb, r);
+                    if (sb.toString().equals("[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]")) {
+                        noNullCnt++;
+                    }
+                }
+            }
+            // Make sure that we did NOT skip any rows
+            Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+            // Even though selected Array is still used its not propagated
+            Assert.assertEquals(0, batch.selected[0]);
+            Assert.assertEquals(2, batch.selected[1]);
+        }
+    }
+}
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
new file mode 100644
index 0000000..407bd81
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.sql.Timestamp;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Types that are not skipped at row-level include: Long, Short, Int, Date, Binary
+ * As it turns out it is more expensive to skip non-selected rows rather that just decode all and propagate the
+ * selected array. Skipping for these type breaks instruction pipelining and introduces more branch misspredictions.
+ */
+public class TestRowFilteringNoSkip {
+
+  private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+      + File.separator + "tmp"));
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path testFilePath;
+
+  private static final int ColumnBatchRows = 1024;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestRowFilteringNoSkip." + testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Test
+  public void testLongRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("int2", TypeDescription.createLong());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.vector[row] = 100;
+          else
+            col2.vector[row] = 999;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // For Int type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], 100);
+      Assert.assertEquals(col2.vector[511], 999);
+      Assert.assertEquals(col2.vector[1020],  100);
+      Assert.assertEquals(col2.vector[1021], 999);
+    }
+  }
+
+  @Test
+  public void testIntRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("int2", TypeDescription.createInt());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        col1.vector[1023] = b;
+        col2.vector[1023] = 101;
+        for (int row = 0; row < batch.size-1; row++) {
+          col1.vector[row] = 999;
+          col2.vector[row] = row+1;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intFirstRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCount = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCount++;
+
+
+        }
+      }
+      // For Int type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCount);
+      // check filter-selected output
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(0, batch.selected[1]);
+      Assert.assertEquals(0, batch.selected[1023]);
+    }
+  }
+
+  @Test
+  public void testShortRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("short2", TypeDescription.createShort());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = row*2+1;
+          else
+            col2.vector[row] = -1 * row*2;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // For Short type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] > 0);
+      Assert.assertTrue(col2.vector[511] < 0);
+      Assert.assertTrue(col2.vector[1020] > 0);
+      Assert.assertTrue(col2.vector[1021] < 0);
+    }
+  }
+
+  @Test
+  public void testDateRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("dt2", TypeDescription.createDate());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = Timestamp.valueOf("2020-04-01 12:34:56.9").toInstant().getEpochSecond();
+          else
+            col2.vector[row] = Timestamp.valueOf("2019-04-01 12:34:56.9").toInstant().getEpochSecond();
+        }
+        col2.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // For Date type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] != 0);
+      Assert.assertTrue(col2.vector[511] != 0);
+      Assert.assertTrue(col2.vector[1020] != 0);
+      Assert.assertTrue(col2.vector[1021] != 0);
+    }
+  }
+
+  @Test
+  public void testBinaryRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("binary2", TypeDescription.createBinary());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      // Write 50 batches where each batch has a single value for str.
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.setVal(row, TestVectorOrcFile.bytesArray(0, 1, 2, 3, row));
+          else
+            col2.setVal(row, TestVectorOrcFile.bytesArray(1, 2, 3, 4, row));
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!TestVectorOrcFile.getBinary(col2, r).equals(TestVectorOrcFile.bytes()))
+            noNullCnt ++;
+        }
+      }
+      // For Binary type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 0), TestVectorOrcFile.bytes());
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 511), TestVectorOrcFile.bytes());
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 1020), TestVectorOrcFile.bytes());
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 1021), TestVectorOrcFile.bytes());
+    }
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
new file mode 100644
index 0000000..0be05b3
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
@@ -0,0 +1,1281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Types that are skipped at row-level include: Decimal, Decimal64, Double, Float, Char, VarChar, String, Boolean, Timestamp
+ * For the remaining types that are not row-skipped see {@link TestRowFilteringNoSkip}
+ */
+public class TestRowFilteringSkip {
+
+  private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+      + File.separator + "tmp"));
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path testFilePath;
+
+  private static final int ColumnBatchRows = 1024;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestRowFilteringSkip." + testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  public static String convertTime(long time){
+    Date date = new Date(time);
+    Format format = new SimpleDateFormat("yyyy-MM-d HH:mm:ss.SSS");
+    return format.format(date);
+  }
+
+  // Filter all rows except: 924 and 940
+  public static void intAnyRowFilter(VectorizedRowBatch batch) {
+    // Dummy Filter implementation passing just one Batch row
+    int newSize = 2;
+    batch.selected[0] = batch.size-100;
+    batch.selected[1] = 940;
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  // Filter all rows except the first one
+  public static void intFirstRowFilter(VectorizedRowBatch batch) {
+    int newSize = 0;
+    for (int row = 0; row <batch.size; ++row) {
+      if (row == 0) {
+        batch.selected[newSize++] = row;
+      }
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  // Filter out rows in a round-robbin fashion starting with a pass
+  public static void intRoundRobbinRowFilter(VectorizedRowBatch batch) {
+    int newSize = 0;
+    for (int row = 0; row < batch.size; ++row) {
+      if ((row % 2) == 0) {
+        batch.selected[newSize++] = row;
+      }
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  static int rowCount = 0;
+  public static void intCustomValueFilter(VectorizedRowBatch batch) {
+    LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+    int newSize = 0;
+    for (int row = 0; row <batch.size; ++row) {
+      long val = col1.vector[row];
+      if ((val == 2) || (val == 5) || (val == 13) || (val == 29) || (val == 70)) {
+        batch.selected[newSize++] = row;
+      }
+      rowCount++;
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  @Test
+  public void testDecimalRepeatingFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal());
+
+    HiveDecimalWritable passDataVal = new HiveDecimalWritable("100");
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          col2.vector[row] = passDataVal;
+        }
+        col1.isRepeating = true;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].compareTo(passDataVal) == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], passDataVal);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[1020],  passDataVal);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+    }
+  }
+
+  @Test
+  public void testDecimalRoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal());
+
+    HiveDecimalWritable failDataVal = new HiveDecimalWritable("-100");
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = new HiveDecimalWritable(row+1);
+          else
+            col2.vector[row] = failDataVal;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].getHiveDecimal().longValue() > 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0].getHiveDecimal().longValue(), 1);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[1020].getHiveDecimal().longValue(),  1021);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+    }
+  }
+
+  @Test
+  public void testDecimalNullRoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal());
+
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = new HiveDecimalWritable(row+1);
+        }
+        // Make sure we trigger the nullCount path of DecimalTreeReader
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].getHiveDecimal().longValue() > 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0].getHiveDecimal().longValue(), 1);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[1020].getHiveDecimal().longValue(),  1021);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+    }
+  }
+
+
+  @Test
+  public void testMultiDecimalSingleFilterCallback() throws Exception {
+    /// Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal())
+        .addField("decimal2", TypeDescription.createDecimal());
+
+    HiveDecimalWritable passDataVal = new HiveDecimalWritable("12");
+    HiveDecimalWritable failDataVal = new HiveDecimalWritable("100");
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      // Write 50 batches where each batch has a single value for str.
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      DecimalColumnVector col3 = (DecimalColumnVector) batch.cols[2];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if (row == 924 || row == 940) {
+            col2.vector[row] = passDataVal;
+            col3.vector[row] = passDataVal;
+          } else {
+            col2.vector[row] = failDataVal;
+            col3.vector[row] = failDataVal;
+          }
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intAnyRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      DecimalColumnVector col3 = (DecimalColumnVector) batch.cols[2];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].compareTo(passDataVal) == 0 && col3.vector[r].compareTo(passDataVal) == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 2, noNullCnt);
+      Assert.assertEquals(924, batch.selected[0]);
+      Assert.assertEquals(940, batch.selected[1]);
+      Assert.assertEquals(0, batch.selected[2]);
+      Assert.assertEquals(col2.vector[0],  nullDataVal);
+      Assert.assertEquals(col3.vector[0],  nullDataVal);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col3.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[924], passDataVal);
+      Assert.assertEquals(col3.vector[940], passDataVal);
+      Assert.assertEquals(col2.vector[1020], nullDataVal);
+      Assert.assertEquals(col3.vector[1020], nullDataVal);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+      Assert.assertEquals(col3.vector[1021], nullDataVal);
+    }
+  }
+
+  @Test
+  public void testDecimal64RoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal().withPrecision(10).withScale(2));
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = row + 1;
+          else
+            col2.vector[row] = -1 * row;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], 1);
+      Assert.assertEquals(col2.vector[511], 0);
+      Assert.assertEquals(col2.vector[1020],  1021);
+      Assert.assertEquals(col2.vector[1021], 0);
+    }
+  }
+
+  @Test
+  public void testDecimal64NullRoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal().withPrecision(10).withScale(2));
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = row + 1;
+        }
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], 1);
+      Assert.assertEquals(col2.vector[511], 0);
+      Assert.assertEquals(col2.vector[1020],  1021);
+      Assert.assertEquals(col2.vector[1021], 0);
+    }
+  }
+
+  @Test
+  public void testDoubleRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("double2", TypeDescription.createDouble());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.vector[row] = 100;
+          else
+            col2.vector[row] = 999;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 100)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 100.0);
+      Assert.assertTrue(col2.vector[511] == 0.0);
+      Assert.assertTrue(col2.vector[1020] == 100);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testFloatRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("float2", TypeDescription.createFloat());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.vector[row] = 100+row;
+          else
+            col2.vector[row] = 999;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] != 999.0);
+      Assert.assertTrue(col2.vector[511] == 0.0);
+      Assert.assertTrue(col2.vector[1020] == 1120.0);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testCharRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("char2", TypeDescription.createChar());
+
+    byte[] passData = ("p").getBytes(StandardCharsets.UTF_8);
+    byte[] failData = ("f").getBytes(StandardCharsets.UTF_8);
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.setVal(row, passData);
+          else
+            col2.setVal(row, failData);
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).equals("p"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).equals("p"));
+      Assert.assertTrue(col2.toString(1021).isEmpty());
+    }
+  }
+
+  @Test
+  public void testVarCharRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("varchar2", TypeDescription.createVarchar());
+
+    byte[] passData = ("p").getBytes(StandardCharsets.UTF_8);
+    byte[] failData = ("f").getBytes(StandardCharsets.UTF_8);
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.setVal(row, passData);
+          else
+            col2.setVal(row, failData);
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).equals("p"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).equals("p"));
+      Assert.assertTrue(col2.toString(1021).isEmpty());
+    }
+  }
+
+  @Test
+  public void testDirectStringRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 10 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("string1", TypeDescription.createString());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      // Write 50 batches where each batch has a single value for str.
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.setVal(row, ("passData-" + row).getBytes(StandardCharsets.UTF_8));
+          else
+            col2.setVal(row, ("failData-" + row).getBytes(StandardCharsets.UTF_8));
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).startsWith("pass"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).startsWith("pass"));
+    }
+  }
+
+  @Test
+  public void testDictionaryStringRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 100 * ColumnBatchRows;
+    final int NUM_BATCHES = 100;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("string1", TypeDescription.createString());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if (row % 2 ==0)
+            col2.setVal(row, ("passData").getBytes(StandardCharsets.UTF_8));
+          else
+            col2.setVal(row, ("failData").getBytes(StandardCharsets.UTF_8));
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).startsWith("pass"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).startsWith("pass"));
+    }
+  }
+
+  @Test
+  public void testRepeatingBooleanRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("bool2", TypeDescription.createBoolean());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          col2.vector[row] = 0;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 0);
+      Assert.assertTrue(col2.vector[511] == 0);
+      Assert.assertTrue(col2.vector[1020] == 0);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testBooleanRoundRobbinRowFilterCallback() throws Exception {
+    final int INDEX_STRIDE = 0;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("bool2", TypeDescription.createBoolean());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = 1;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 1);
+      Assert.assertTrue(col2.vector[511] == 0);
+      Assert.assertTrue(col2.vector[1020] == 1);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testBooleanAnyRowFilterCallback() throws Exception {
+    final int INDEX_STRIDE = 0;
+    final int NUM_BATCHES = 100;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("bool2", TypeDescription.createBoolean());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if (row == 924 || row == 940)
+            col2.vector[row] = 1;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intAnyRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 1)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 2, noNullCnt);
+      Assert.assertEquals(924, batch.selected[0]);
+      Assert.assertEquals(940, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 0);
+      Assert.assertTrue(col2.vector[511] == 0);
+      Assert.assertTrue(col2.vector[1020] == 0);
+      Assert.assertTrue(col2.vector[924] == 1);
+      Assert.assertTrue(col2.vector[940] == 1);
+    }
+  }
+
+  @Test
+  public void testTimestampRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("ts2", TypeDescription.createTimestamp());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col2 = (TimestampColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.set(row, Timestamp.valueOf((1900+row)+"-04-01 12:34:56.9"));
+          else {
+            col2.isNull[row] = true;
+            col2.set(row, null);
+          }
+        }
+        col1.isRepeating = true;
+        col1.noNulls = false;
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col2 = (TimestampColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.getTime(r) == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(convertTime(col2.getTime(0)).compareTo("1900-04-1 12:34:56.900") == 0);
+      Assert.assertTrue(col2.getTime(511) == 0);
+      Assert.assertTrue(convertTime(col2.getTime(1020)).compareTo("2920-04-1 12:34:56.900") == 0);
+      Assert.assertTrue(col2.getTime(1021) == 0);
+    }
+  }
+
+  @Test
+  public void testcustomFileTimestampRoundRobbinRowFilterCallback() throws Exception {
+    testFilePath = new Path(getClass().getClassLoader().
+        getSystemResource("orc_split_elim.orc").getPath());
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"userid"}, TestRowFilteringSkip::intCustomValueFilter))) {
+
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col5 = (TimestampColumnVector) batch.cols[4];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col1.vector[r] != 100)
+            noNullCnt ++;
+        }
+      }
+
+      // Total rows of the file should be 25k
+      Assert.assertEquals(25000, rowCount);
+      // Make sure that our filter worked ( 5 rows with userId != 100)
+      Assert.assertEquals(5, noNullCnt);
+      Assert.assertEquals(false, col5.isRepeating);
+      Assert.assertEquals(544, batch.selected[0]);
+      Assert.assertEquals(0, batch.selected[1]);
+      Assert.assertTrue(col5.getTime(0) == 0);
+      Assert.assertTrue(col5.getTime(544) != 0);
+    }
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index bc2ff94..252c4f2 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -169,7 +169,7 @@ public class TestVectorOrcFile {
     return result;
   }
 
-  private static BytesWritable bytes(int... items) {
+  protected static BytesWritable bytes(int... items) {
     BytesWritable result = new BytesWritable();
     result.setSize(items.length);
     for(int i=0; i < items.length; ++i) {
@@ -178,7 +178,7 @@ public class TestVectorOrcFile {
     return result;
   }
 
-  private static byte[] bytesArray(int... items) {
+  protected static byte[] bytesArray(int... items) {
     byte[] result = new byte[items.length];
     for(int i=0; i < items.length; ++i) {
       result[i] = (byte) items[i];
@@ -876,7 +876,7 @@ public class TestVectorOrcFile {
     return ((DoubleColumnVector) batch.cols[6]).vector[rowId];
   }
 
-  private static BytesWritable getBinary(BytesColumnVector column, int rowId) {
+  protected static BytesWritable getBinary(BytesColumnVector column, int rowId) {
     if (column.isRepeating) {
       rowId = 0;
     }
diff --git a/java/core/src/test/resources/orc_split_elim.orc b/java/core/src/test/resources/orc_split_elim.orc
new file mode 100644
index 0000000..ff3557f
Binary files /dev/null and b/java/core/src/test/resources/orc_split_elim.orc differ
diff --git a/java/pom.xml b/java/pom.xml
index c45844f..053bedb 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -70,7 +70,7 @@
 
     <min.hadoop.version>2.2.0</min.hadoop.version>
     <hadoop.version>2.7.3</hadoop.version>
-    <storage-api.version>2.7.1</storage-api.version>
+    <storage-api.version>2.7.2</storage-api.version>
     <zookeeper.version>3.4.6</zookeeper.version>
   </properties>