You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by jc...@apache.org on 2020/05/26 14:58:13 UTC

[orc] branch master updated (6fac967 -> 8c6b178)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git.


 discard 6fac967  ORC-577: Allow row-level filtering
     new 8c6b178  ORC-577: Allow row-level filtering

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6fac967)
            \
             N -- N -- N   refs/heads/master (8c6b178)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[orc] 01/01: ORC-577: Allow row-level filtering

Posted by jc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git

commit 8c6b178d7613d5337765b7e38be947cb6b32605e
Author: Panagiotis Garefalakis <pg...@cloudera.com>
AuthorDate: Tue May 26 15:47:42 2020 +0100

    ORC-577: Allow row-level filtering
    
    Fixes #475
---
 java/core/src/java/org/apache/orc/Reader.java      |   40 +
 .../java/org/apache/orc/impl/BitFieldReader.java   |   42 +-
 .../apache/orc/impl/ConvertTreeReaderFactory.java  |  148 ++-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |   23 +-
 .../org/apache/orc/impl/RunLengthByteReader.java   |   30 +-
 .../org/apache/orc/impl/SerializationUtils.java    |    9 +
 .../org/apache/orc/impl/TreeReaderFactory.java     |  647 ++++++++--
 .../orc/impl/reader/tree/PrimitiveBatchReader.java |    2 +-
 .../orc/impl/reader/tree/StructBatchReader.java    |   46 +-
 .../apache/orc/impl/reader/tree/TypeReader.java    |    4 +-
 .../apache/orc/TestRowFilteringComplexTypes.java   |  326 +++++
 .../org/apache/orc/TestRowFilteringNoSkip.java     |  414 +++++++
 .../test/org/apache/orc/TestRowFilteringSkip.java  | 1281 ++++++++++++++++++++
 .../src/test/org/apache/orc/TestVectorOrcFile.java |    6 +-
 java/core/src/test/resources/orc_split_elim.orc    |  Bin 0 -> 2298 bytes
 java/pom.xml                                       |    2 +-
 16 files changed, 2826 insertions(+), 194 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 007c515..0aae622 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -22,9 +22,11 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 /**
  * The interface for reading ORC files.
@@ -187,6 +189,8 @@ public interface Reader extends Closeable {
     private Boolean useZeroCopy = null;
     private Boolean skipCorruptRecords = null;
     private TypeDescription schema = null;
+    private String[] preFilterColumns = null;
+    Consumer<VectorizedRowBatch> skipRowCallback = null;
     private DataReader dataReader = null;
     private Boolean tolerateMissingSchema = null;
     private boolean forcePositionalEvolution;
@@ -238,6 +242,34 @@ public interface Reader extends Closeable {
     }
 
     /**
+     * Set a row level filter.
+     * This is an advanced feature that allows the caller to specify
+     * a list of columns that are read first and then a filter that
+     * is called to determine which rows if any should be read.
+     *
+     * User should expect the batches that come from the reader
+     * to use the selected array set by their filter.
+     *
+     * Use cases for this are predicates that SearchArgs can't represent,
+     * such as relationships between columns (eg. columnA == columnB).
+     * @param filterColumnNames a comma separated list of the column names that
+     *                      are read before the filter is applied. Only top
+     *                      level columns in the reader's schema can be used
+     *                      here and they must not be duplicated.
+     * @param filterCallback a function callback to perform filtering during the call to
+     *              RecordReader.nextBatch. This function should not reference
+     *               any static fields nor modify the passed in ColumnVectors but
+     *               should set the filter output using the selected array.
+     *
+     * @return this
+     */
+    public Options setRowFilter(String[] filterColumnNames, Consumer<VectorizedRowBatch> filterCallback) {
+      this.preFilterColumns = filterColumnNames;
+      this.skipRowCallback =  filterCallback;
+      return this;
+    }
+
+    /**
      * Set search argument for predicate push down.
      * @param sarg the search argument
      * @param columnNames the column names for
@@ -336,6 +368,14 @@ public interface Reader extends Closeable {
       return sarg;
     }
 
+    public Consumer<VectorizedRowBatch> getFilterCallback() {
+      return skipRowCallback;
+    }
+
+    public String[] getPreFilterColumnNames(){
+      return preFilterColumns;
+    }
+
     public String[] getColumnNames() {
       return columnNames;
     }
diff --git a/java/core/src/java/org/apache/orc/impl/BitFieldReader.java b/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
index 5daa204..f1d386c 100644
--- a/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/java/core/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,6 +21,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 
 public final class BitFieldReader {
   private final RunLengthByteReader input;
@@ -51,6 +52,38 @@ public final class BitFieldReader {
   }
 
   public void nextVector(LongColumnVector previous,
+                         FilterContext filterContext,
+                         long previousLen) throws IOException {
+    previous.isRepeating = false;
+    int previousIdx = 0;
+    if (previous.noNulls) {
+      for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+        int idx = filterContext.getSelected()[i];
+        if (idx - previousIdx > 0) {
+          skip(idx - previousIdx);
+        }
+        previous.vector[idx] = next();
+        previousIdx = idx + 1;
+      }
+      skip(previousLen - previousIdx);
+    } else {
+      for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+        int idx = filterContext.getSelected()[i];
+        if (idx - previousIdx > 0) {
+          skip(TreeReaderFactory.TreeReader.countNonNullRowsInRange(previous.isNull, previousIdx, idx));
+        }
+        if (!previous.isNull[idx]) {
+          previous.vector[idx] = next();
+        } else {
+          previous.vector[idx] = 1;
+        }
+        previousIdx = idx + 1;
+      }
+      skip(TreeReaderFactory.TreeReader.countNonNullRowsInRange(previous.isNull, previousIdx, (int)previousLen));
+    }
+  }
+
+  public void nextVector(LongColumnVector previous,
                          long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
@@ -95,8 +128,11 @@ public final class BitFieldReader {
     } else {
       final long bitsToSkip = (totalBits - availableBits);
       input.skip(bitsToSkip / 8);
-      current = input.next();
-      currentIdx = (byte) (bitsToSkip % 8);
+      // Edge case: when skipping the last bits of a bitField there is nothing more to read!
+      if (input.hasNext()) {
+        current = input.next();
+        currentIdx = (byte) (bitsToSkip % 8);
+      }
     }
   }
 
diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index e6d3863..6b6b640 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.OrcProto;
 import org.apache.orc.TypeDescription;
@@ -400,8 +401,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      fromReader.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      fromReader.nextVector(previousVector, isNull, batchSize, filterContext);
       LongColumnVector resultColVector = (LongColumnVector) previousVector;
       if (downCastNeeded) {
         if (resultColVector.isRepeating) {
@@ -451,14 +453,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, longColVector, batchSize);
     }
@@ -535,14 +538,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, longColVector, batchSize);
     }
@@ -574,14 +578,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, longColVector, batchSize);
     }
@@ -609,14 +614,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         longColVector = (LongColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, longColVector, batchSize);
     }
@@ -647,14 +653,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, doubleColVector, batchSize);
     }
@@ -682,14 +689,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, doubleColVector, batchSize);
     }
@@ -719,14 +727,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, doubleColVector, batchSize);
     }
@@ -755,14 +764,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         doubleColVector = (DoubleColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, doubleColVector, batchSize);
     }
@@ -776,9 +786,10 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Read present/isNull stream
-      fromReader.nextVector(previousVector, isNull, batchSize);
+      fromReader.nextVector(previousVector, isNull, batchSize, filterContext);
       DoubleColumnVector vector = (DoubleColumnVector) previousVector;
       if (previousVector.isRepeating) {
         vector.vector[0] = (float) vector.vector[0];
@@ -813,15 +824,16 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void nextVector(ColumnVector previousVector,
-        boolean[] isNull,
-        final int batchSize) throws IOException {
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, decimalColVector, batchSize);
     }
@@ -858,14 +870,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, decimalColVector, batchSize);
     }
@@ -900,14 +913,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, decimalColVector, batchSize);
     }
@@ -944,14 +958,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, decimalColVector, batchSize);
     }
@@ -985,14 +1000,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (fileDecimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         fileDecimalColVector = new DecimalColumnVector(batchSize, filePrecision, fileScale);
         decimalColVector = previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(fileDecimalColVector, isNull, batchSize);
+      fromReader.nextVector(fileDecimalColVector, isNull, batchSize, filterContext);
 
       convertVector(fileDecimalColVector, decimalColVector, batchSize);
     }
@@ -1019,14 +1035,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, bytesColVector, batchSize);
     }
@@ -1078,14 +1095,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, bytesColVector, batchSize);
     }
@@ -1126,14 +1144,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, bytesColVector, batchSize);
     }
@@ -1247,14 +1266,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, bytesColVector, batchSize);
     }
@@ -1284,14 +1304,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new DateColumnVector();
         bytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, bytesColVector, batchSize);
     }
@@ -1309,8 +1330,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      fromReader.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      fromReader.nextVector(previousVector, isNull, batchSize, filterContext);
 
       BytesColumnVector resultColVector = (BytesColumnVector) previousVector;
 
@@ -1370,14 +1392,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (inBytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         inBytesColVector = new BytesColumnVector();
         outBytesColVector = (BytesColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(inBytesColVector, isNull, batchSize);
+      fromReader.nextVector(inBytesColVector, isNull, batchSize, filterContext);
 
       convertVector(inBytesColVector, outBytesColVector, batchSize);
     }
@@ -1413,7 +1436,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new LongColumnVector();
@@ -1421,7 +1445,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
       timestampColVector.changeCalendar(fileUsedProlepticGregorian, false);
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, true);
@@ -1473,7 +1497,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (doubleColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         doubleColVector = new DoubleColumnVector();
@@ -1481,7 +1506,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
       timestampColVector.changeCalendar(fileUsedProlepticGregorian, false);
       // Read present/isNull stream
-      fromReader.nextVector(doubleColVector, isNull, batchSize);
+      fromReader.nextVector(doubleColVector, isNull, batchSize, filterContext);
 
       convertVector(doubleColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, true);
@@ -1531,7 +1556,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (decimalColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         decimalColVector = new DecimalColumnVector(batchSize, precision, scale);
@@ -1539,7 +1565,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
       timestampColVector.changeCalendar(fileUsedProlepticGregorian, false);
       // Read present/isNull stream
-      fromReader.nextVector(decimalColVector, isNull, batchSize);
+      fromReader.nextVector(decimalColVector, isNull, batchSize, filterContext);
 
       convertVector(decimalColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, true);
@@ -1588,14 +1614,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
         timestampColVector = (TimestampColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, false);
@@ -1630,14 +1657,15 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (longColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         longColVector = new DateColumnVector();
         timestampColVector = (TimestampColumnVector) previousVector;
       }
       // Read present/isNull stream
-      fromReader.nextVector(longColVector, isNull, batchSize);
+      fromReader.nextVector(longColVector, isNull, batchSize, filterContext);
 
       convertVector(longColVector, timestampColVector, batchSize);
       timestampColVector.changeCalendar(useProlepticGregorian, false);
@@ -1672,7 +1700,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (bytesColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         bytesColVector = new BytesColumnVector();
@@ -1688,7 +1717,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
         }
       }
       // Read present/isNull stream
-      fromReader.nextVector(bytesColVector, isNull, batchSize);
+      fromReader.nextVector(bytesColVector, isNull, batchSize, filterContext);
 
       convertVector(bytesColVector, longColVector, batchSize);
       if (dateColumnVector != null) {
@@ -1723,7 +1752,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (timestampColVector == null) {
         // Allocate column vector for file; cast column vector for reader.
         timestampColVector = new TimestampColumnVector();
@@ -1734,7 +1764,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
         }
       }
       // Read present/isNull stream
-      fromReader.nextVector(timestampColVector, isNull, batchSize);
+      fromReader.nextVector(timestampColVector, isNull, batchSize, filterContext);
 
       convertVector(timestampColVector, longColVector, batchSize);
       if (longColVector instanceof DateColumnVector) {
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 3c924de..cea1094 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -61,7 +61,9 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.TimeZone;
+import java.util.TreeSet;
 
 public class RecordReaderImpl implements RecordReader {
   static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
@@ -219,9 +221,25 @@ public class RecordReaderImpl implements RecordReader {
       skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
     }
 
+    // Map columnNames to ColumnIds
+    SortedSet<Integer> filterColIds = new TreeSet<>();
+    if (options.getPreFilterColumnNames() != null) {
+      for (String colName : options.getPreFilterColumnNames()) {
+        int expandColId = findColumns(evolution, colName);
+        if (expandColId != -1) {
+          filterColIds.add(expandColId);
+        } else {
+          throw new IllegalArgumentException("Filter could not find column with name: " +
+              colName + " on " + evolution.getReaderBaseSchema());
+        }
+      }
+      LOG.info("Filter Columns: " + filterColIds);
+    }
+
     TreeReaderFactory.ReaderContext readerContext =
         new TreeReaderFactory.ReaderContext()
           .setSchemaEvolution(evolution)
+          .setFilterCallback(filterColIds, options.getFilterCallback())
           .skipCorrupt(skipCorrupt)
           .fileFormat(fileReader.getFileVersion())
           .useUTCTimestamp(fileReader.useUTCTimestamp)
@@ -1166,16 +1184,17 @@ public class RecordReaderImpl implements RecordReader {
           batch.size = 0;
           return false;
         }
+        // Read stripe in Memory
         readStripe();
       }
 
       int batchSize = computeBatchSize(batch.getMaxSize());
-
       rowInStripe += batchSize;
       reader.setVectorColumnCount(batch.getDataColumnCount());
       reader.nextBatch(batch, batchSize);
       advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return batch.size  != 0;
+      // batch.size can be modified by filter so only batchSize can tell if we actually read rows
+      return batchSize != 0;
     } catch (IOException e) {
       // Rethrow exception with file name in log message
       throw new IOException("Error reading file: " + path, e);
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 0a75ca9..9f4c236 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
@@ -43,7 +44,7 @@ public class RunLengthByteReader {
     this.input = input;
   }
 
-  private void readValues(boolean ignoreEof) throws IOException {
+  private void readValues(boolean ignoreEof, int numSkipRows) throws IOException {
     int control = input.read();
     used = 0;
     if (control == -1) {
@@ -51,19 +52,26 @@ public class RunLengthByteReader {
         throw new EOFException("Read past end of buffer RLE byte from " + input);
       }
       used = numLiterals = 0;
-      return;
     } else if (control < 0x80) {
       repeat = true;
       numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
-      int val = input.read();
-      if (val == -1) {
-        throw new EOFException("Reading RLE byte got EOF");
+      if (numSkipRows >= numLiterals) {
+        IOUtils.skipFully(input,1);
+      } else {
+        int val = input.read();
+        if (val == -1) {
+          throw new EOFException("Reading RLE byte got EOF");
+        }
+        literals[0] = (byte) val;
       }
-      literals[0] = (byte) val;
     } else {
       repeat = false;
       numLiterals = 0x100 - control;
-      int bytes = 0;
+      numSkipRows = Math.min(numSkipRows, numLiterals);
+      if (numSkipRows > 0) {
+        IOUtils.skipFully(input, numSkipRows);
+      }
+      int bytes = numSkipRows;
       while (bytes < numLiterals) {
         int result = input.read(literals, bytes, numLiterals - bytes);
         if (result == -1) {
@@ -81,7 +89,7 @@ public class RunLengthByteReader {
   public byte next() throws IOException {
     byte result;
     if (used == numLiterals) {
-      readValues(false);
+      readValues(false, 0);
     }
     if (repeat) {
       result = literals[0];
@@ -145,7 +153,7 @@ public class RunLengthByteReader {
     if (consumed != 0) {
       // a loop is required for cases where we break the run into two parts
       while (consumed > 0) {
-        readValues(false);
+        readValues(false, 0);
         used = consumed;
         consumed -= numLiterals;
       }
@@ -158,7 +166,7 @@ public class RunLengthByteReader {
   public void skip(long items) throws IOException {
     while (items > 0) {
       if (used == numLiterals) {
-        readValues(false);
+        readValues(false, (int) items);
       }
       long consume = Math.min(items, numLiterals - used);
       used += consume;
diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
index 06ba711..dd40b31 100644
--- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc.impl;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.OrcFile;
@@ -97,6 +98,10 @@ public final class SerializationUtils {
     return Float.intBitsToFloat(val);
   }
 
+  public void skipFloat(InputStream in, int numOfFloats) throws IOException {
+    IOUtils.skipFully(in, numOfFloats * 4L);
+  }
+
   public void writeFloat(OutputStream output,
                          float value) throws IOException {
     int ser = Float.floatToIntBits(value);
@@ -135,6 +140,10 @@ public final class SerializationUtils {
     }
   }
 
+  public void skipDouble(InputStream in, int numOfDoubles) throws IOException {
+    IOUtils.skipFully(in, numOfDoubles * 8L);
+  }
+
   public void writeDouble(OutputStream output,
                           double value) throws IOException {
     writeLongLE(output, Double.doubleToLongBits(value));
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index d17de68..cfe1603 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimeZone;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
@@ -61,6 +65,10 @@ public class TreeReaderFactory {
   public interface Context {
     SchemaEvolution getSchemaEvolution();
 
+    Set<Integer> getColumnFilterIds();
+
+    Consumer<VectorizedRowBatch> getColumnFilterCallback();
+
     boolean isSkipCorrupt();
 
     boolean getUseUTCTimestamp();
@@ -85,6 +93,8 @@ public class TreeReaderFactory {
     private ReaderEncryption encryption;
     private boolean useProlepticGregorian;
     private boolean fileUsedProlepticGregorian;
+    private Set<Integer> filterColumnIds = Collections.emptySet();
+    Consumer<VectorizedRowBatch> filterCallback;
 
     public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
       this.evolution = evolution;
@@ -96,6 +106,12 @@ public class TreeReaderFactory {
       return this;
     }
 
+    public ReaderContext setFilterCallback(Set<Integer> filterColumnsList, Consumer<VectorizedRowBatch> filterCallback) {
+      this.filterColumnIds = filterColumnsList;
+      this.filterCallback = filterCallback;
+      return this;
+    }
+
     public ReaderContext skipCorrupt(boolean skipCorrupt) {
       this.skipCorrupt = skipCorrupt;
       return this;
@@ -129,6 +145,16 @@ public class TreeReaderFactory {
     }
 
     @Override
+    public Set<Integer> getColumnFilterIds() {
+      return filterColumnIds;
+    }
+
+    @Override
+    public Consumer<VectorizedRowBatch> getColumnFilterCallback() {
+      return filterCallback;
+    }
+
+    @Override
     public boolean isSkipCorrupt() {
       return skipCorrupt;
     }
@@ -255,6 +281,16 @@ public class TreeReaderFactory {
       }
     }
 
+    protected static int countNonNullRowsInRange(boolean[] isNull, int start, int end) {
+      int result = 0;
+      while (start < end) {
+        if (!isNull[start++]) {
+          result++;
+        }
+      }
+      return result;
+    }
+
     protected long countNonNulls(long rows) throws IOException {
       if (present != null) {
         long result = 0;
@@ -278,11 +314,14 @@ public class TreeReaderFactory {
      * @param isNull Whether the each value was null at a higher level. If
      *               isNull is null, all values are non-null.
      * @param batchSize      Size of the column vector
+     * @param filterContext the information about the rows that were selected
+     *                      by the filter.
      * @throws IOException
      */
     public void nextVector(ColumnVector previous,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       if (present != null || isNull != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
         // present stream
@@ -350,7 +389,8 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
+    public void nextVector(ColumnVector vector, boolean[] isNull, int size,
+        FilterContext filterContext) {
       vector.noNulls = false;
       vector.isNull[0] = true;
       vector.isRepeating = true;
@@ -397,14 +437,19 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
-      // Read value entries based on isNull entries
-      reader.nextVector(result, batchSize);
+      if (filterContext.isSelectedInUse()) {
+        reader.nextVector(result, filterContext, batchSize);
+      } else {
+        // Read value entries based on isNull entries
+        reader.nextVector(result, batchSize);
+      }
     }
   }
 
@@ -441,11 +486,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -506,11 +552,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -571,11 +618,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -637,11 +685,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
@@ -686,15 +735,9 @@ public class TreeReaderFactory {
       stream.seek(index);
     }
 
-    @Override
-    public void nextVector(ColumnVector previousVector,
-                           boolean[] isNull,
-                           final int batchSize) throws IOException {
-      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
-
+    private void nextVector(DoubleColumnVector result,
+                            boolean[] isNull,
+                            final int batchSize) throws IOException {
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
 
@@ -737,6 +780,75 @@ public class TreeReaderFactory {
       }
     }
 
+    private void nextVector(DoubleColumnVector result,
+                            boolean[] isNull,
+                            FilterContext filterContext,
+                            final int batchSize) throws IOException {
+      final boolean hasNulls = !result.noNulls;
+      boolean allNulls = hasNulls;
+      result.isRepeating = false;
+      int previousIdx = 0;
+
+      if (batchSize > 0) {
+        if (hasNulls) {
+          // conditions to ensure bounds checks skips
+          for (int i = 0; batchSize <= result.isNull.length && i < batchSize; i++) {
+            allNulls = allNulls & result.isNull[i];
+          }
+          if (allNulls) {
+            result.vector[0] = Double.NaN;
+            result.isRepeating = true;
+          } else {
+            // some nulls
+            // conditions to ensure bounds checks skips
+            for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+              int idx = filterContext.getSelected()[i];
+              if (idx - previousIdx > 0) {
+                utils.skipFloat(stream, countNonNullRowsInRange(result.isNull, previousIdx, idx));
+              }
+              if (!result.isNull[idx]) {
+                result.vector[idx] = utils.readFloat(stream);
+              } else {
+                // If the value is not present then set NaN
+                result.vector[idx] = Double.NaN;
+              }
+              previousIdx = idx + 1;
+            }
+            utils.skipFloat(stream, countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+          }
+        } else {
+          // Read only the selected row indexes and skip the rest
+          for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+            int idx = filterContext.getSelected()[i];
+            if (idx - previousIdx > 0) {
+              utils.skipFloat(stream,idx - previousIdx);
+            }
+            result.vector[idx] = utils.readFloat(stream);
+            previousIdx = idx + 1;
+          }
+          utils.skipFloat(stream,batchSize - previousIdx);
+        }
+      }
+
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize, filterContext);
+
+      if (filterContext.isSelectedInUse()) {
+        nextVector(result, isNull, filterContext, batchSize);
+      } else {
+        nextVector(result, isNull, batchSize);
+      }
+    }
+
     @Override
     public void skipRows(long items) throws IOException {
       items = countNonNulls(items);
@@ -780,14 +892,62 @@ public class TreeReaderFactory {
       stream.seek(index);
     }
 
-    @Override
-    public void nextVector(ColumnVector previousVector,
+    private void nextVector(DoubleColumnVector result,
+                            boolean[] isNull,
+                            FilterContext filterContext,
+                            final int batchSize) throws IOException {
+
+      final boolean hasNulls = !result.noNulls;
+      boolean allNulls = hasNulls;
+      result.isRepeating = false;
+      if (batchSize != 0) {
+        if (hasNulls) {
+          // conditions to ensure bounds checks skips
+          for (int i = 0; i < batchSize && batchSize <= result.isNull.length; i++) {
+            allNulls = allNulls & result.isNull[i];
+          }
+          if (allNulls) {
+            result.vector[0] = Double.NaN;
+            result.isRepeating = true;
+          } else {
+            // some nulls
+            int previousIdx = 0;
+            // conditions to ensure bounds checks skips
+            for (int i = 0; batchSize <= result.isNull.length && i != filterContext.getSelectedSize(); i++) {
+              int idx = filterContext.getSelected()[i];
+              if (idx - previousIdx > 0) {
+                utils.skipDouble(stream, countNonNullRowsInRange(result.isNull, previousIdx, idx));
+              }
+              if (!result.isNull[idx]) {
+                result.vector[idx] = utils.readDouble(stream);
+              } else {
+                // If the value is not present then set NaN
+                result.vector[idx] = Double.NaN;
+              }
+              previousIdx = idx + 1;
+            }
+            utils.skipDouble(stream, countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+          }
+        } else {
+          // no nulls
+          int previousIdx = 0;
+          // Read only the selected row indexes and skip the rest
+          for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+            int idx = filterContext.getSelected()[i];
+            if (idx - previousIdx > 0) {
+              utils.skipDouble(stream, idx - previousIdx);
+            }
+            result.vector[idx] = utils.readDouble(stream);
+            previousIdx = idx + 1;
+          }
+          utils.skipDouble(stream, batchSize - previousIdx);
+        }
+      }
+    }
+
+    private void nextVector(DoubleColumnVector result,
                            boolean[] isNull,
                            final int batchSize) throws IOException {
-      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
-
-      // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
 
       final boolean hasNulls = !result.noNulls;
       boolean allNulls = hasNulls;
@@ -831,6 +991,23 @@ public class TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      final DoubleColumnVector result = (DoubleColumnVector) previousVector;
+
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize, filterContext);
+
+      if (filterContext.isSelectedInUse()) {
+        nextVector(result, isNull, filterContext, batchSize);
+      } else {
+        nextVector(result, isNull, batchSize);
+      }
+    }
+
+    @Override
     public void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       long len = items * 8;
@@ -894,11 +1071,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       scratchlcv.ensureSize(batchSize, false);
       BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
@@ -1040,33 +1218,31 @@ public class TreeReaderFactory {
       nanos.seek(index);
     }
 
-    @Override
-    public void nextVector(ColumnVector previousVector,
+    public void readTimestamp(TimestampColumnVector result, int idx) throws IOException {
+      final int newNanos = parseNanos(nanos.next());
+      long millis = (data.next() + base_timestamp)
+          * TimestampTreeWriter.MILLIS_PER_SECOND + newNanos / 1_000_000;
+      if (millis < 0 && newNanos > 999_999) {
+        millis -= TimestampTreeWriter.MILLIS_PER_SECOND;
+      }
+      long offset = 0;
+      // If reader and writer time zones have different rules, adjust the timezone difference
+      // between reader and writer taking day light savings into account.
+      if (!hasSameTZRules) {
+        offset = SerializationUtils.convertBetweenTimezones(writerTimeZone,
+            readerTimeZone, millis);
+      }
+      result.time[idx] = millis + offset;
+      result.nanos[idx] = newNanos;
+    }
+
+    public void nextVector(TimestampColumnVector result,
                            boolean[] isNull,
                            final int batchSize) throws IOException {
-      TimestampColumnVector result = (TimestampColumnVector) previousVector;
-      result.changeCalendar(fileUsesProleptic, false);
-      super.nextVector(previousVector, isNull, batchSize);
-
-      result.setIsUTC(context.getUseUTCTimestamp());
 
       for (int i = 0; i < batchSize; i++) {
         if (result.noNulls || !result.isNull[i]) {
-          final int newNanos = parseNanos(nanos.next());
-          long millis = (data.next() + base_timestamp)
-              * TimestampTreeWriter.MILLIS_PER_SECOND + newNanos / 1_000_000;
-          if (millis < 0 && newNanos > 999_999) {
-            millis -= TimestampTreeWriter.MILLIS_PER_SECOND;
-          }
-          long offset = 0;
-          // If reader and writer time zones have different rules, adjust the timezone difference
-          // between reader and writer taking day light savings into account.
-          if (!hasSameTZRules) {
-            offset = SerializationUtils.convertBetweenTimezones(writerTimeZone,
-                readerTimeZone, millis);
-          }
-          result.time[i] = millis + offset;
-          result.nanos[i] = newNanos;
+          readTimestamp(result, i);
           if (result.isRepeating && i != 0 &&
               (result.time[0] != result.time[i] ||
                   result.nanos[0] != result.nanos[i])) {
@@ -1077,6 +1253,57 @@ public class TreeReaderFactory {
       result.changeCalendar(useProleptic, true);
     }
 
+    public void nextVector(TimestampColumnVector result,
+                           boolean[] isNull,
+                           FilterContext filterContext,
+                           final int batchSize) throws IOException {
+      result.isRepeating = false;
+      int previousIdx = 0;
+      if (result.noNulls) {
+        for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+          int idx = filterContext.getSelected()[i];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(idx - previousIdx);
+          }
+          readTimestamp(result, idx);
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(batchSize - previousIdx);
+      } else {
+        for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+          int idx = filterContext.getSelected()[i];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+          }
+          if (!result.isNull[idx]) {
+            readTimestamp(result, idx);
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+      }
+      result.changeCalendar(useProleptic, true);
+
+    }
+
+    @Override
+    public void nextVector(ColumnVector previousVector,
+                           boolean[] isNull,
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      TimestampColumnVector result = (TimestampColumnVector) previousVector;
+      result.changeCalendar(fileUsesProleptic, false);
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
+
+      result.setIsUTC(context.getUseUTCTimestamp());
+
+      if (filterContext.isSelectedInUse()) {
+        nextVector(result, isNull, filterContext, batchSize);
+      } else {
+        nextVector(result, isNull, batchSize);
+      }
+    }
+
     private static int parseNanos(long serialized) {
       int zeros = 7 & (int) serialized;
       int result = (int) (serialized >>> 3);
@@ -1086,6 +1313,11 @@ public class TreeReaderFactory {
       return result;
     }
 
+    void skipStreamRows(long items) throws IOException {
+      data.skip(items);
+      nanos.skip(items);
+    }
+
     @Override
     public void skipRows(long items) throws IOException {
       items = countNonNulls(items);
@@ -1149,7 +1381,8 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final LongColumnVector result = (LongColumnVector) previousVector;
       if (needsDateColumnVector) {
         if (result instanceof DateColumnVector) {
@@ -1161,10 +1394,11 @@ public class TreeReaderFactory {
       }
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       // Read value entries based on isNull entries
       reader.nextVector(result, result.vector, batchSize);
+
       if (needsDateColumnVector) {
         ((DateColumnVector) result).changeCalendar(useProleptic, true);
       }
@@ -1279,6 +1513,60 @@ public class TreeReaderFactory {
       }
     }
 
+    private void nextVector(DecimalColumnVector result,
+                            boolean[] isNull,
+                            FilterContext filterContext,
+                            final int batchSize) throws IOException {
+      // Allocate space for the whole array
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      // But read only read the scales that are needed
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
+      // Read value entries based on isNull entries
+      // Use the fast ORC deserialization method that emulates SerializationUtils.readBigInteger
+      // provided by HiveDecimalWritable.
+      HiveDecimalWritable[] vector = result.vector;
+      HiveDecimalWritable decWritable;
+      if (result.noNulls) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); ++r) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(idx - previousIdx);
+          }
+          decWritable = vector[idx];
+          if (!decWritable.serializationUtilsRead(
+              valueStream, scratchScaleVector[idx],
+              scratchBytes)) {
+            result.isNull[idx] = true;
+            result.noNulls = false;
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(batchSize - previousIdx);
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); ++r) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+          }
+          if (!result.isNull[idx]) {
+            decWritable = vector[idx];
+            if (!decWritable.serializationUtilsRead(
+                valueStream, scratchScaleVector[idx],
+                scratchBytes)) {
+              result.isNull[idx] = true;
+              result.noNulls = false;
+            }
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+      }
+    }
+
     private void nextVector(Decimal64ColumnVector result,
                             boolean[] isNull,
                             final int batchSize) throws IOException {
@@ -1309,16 +1597,86 @@ public class TreeReaderFactory {
       result.scale = (short) scale;
     }
 
+    private void nextVector(Decimal64ColumnVector result,
+        boolean[] isNull,
+        FilterContext filterContext,
+        final int batchSize) throws IOException {
+      if (precision > TypeDescription.MAX_DECIMAL64_PRECISION) {
+        throw new IllegalArgumentException("Reading large precision type into" +
+            " Decimal64ColumnVector.");
+      }
+      // Allocate space for the whole array
+      if (batchSize > scratchScaleVector.length) {
+        scratchScaleVector = new int[(int) batchSize];
+      }
+      // Read all the scales
+      scaleReader.nextVector(result, scratchScaleVector, batchSize);
+      if (result.noNulls) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); r++) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(idx - previousIdx);
+          }
+          result.vector[idx] = SerializationUtils.readVslong(valueStream);
+          for (int s=scratchScaleVector[idx]; s < scale; ++s) {
+            result.vector[idx] *= 10;
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(batchSize - previousIdx);
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        int previousIdx = 0;
+        for (int r=0; r != filterContext.getSelectedSize(); r++) {
+          int idx = filterContext.getSelected()[r];
+          if (idx - previousIdx > 0) {
+            skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+          }
+          if (!result.isNull[idx]) {
+            result.vector[idx] = SerializationUtils.readVslong(valueStream);
+            for (int s=scratchScaleVector[idx]; s < scale; ++s) {
+              result.vector[idx] *= 10;
+            }
+          }
+          previousIdx = idx + 1;
+        }
+        skipStreamRows(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+      }
+      result.precision = (short) precision;
+      result.scale = (short) scale;
+    }
+
     @Override
     public void nextVector(ColumnVector result,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result instanceof Decimal64ColumnVector) {
-        nextVector((Decimal64ColumnVector) result, isNull, batchSize);
+         if (filterContext.isSelectedInUse()) {
+           nextVector((Decimal64ColumnVector) result, isNull, filterContext, batchSize);
+         } else {
+           nextVector((Decimal64ColumnVector) result, isNull, batchSize);
+         }
       } else {
-        nextVector((DecimalColumnVector) result, isNull, batchSize);
+        if (filterContext.isSelectedInUse()) {
+          nextVector((DecimalColumnVector) result, isNull, filterContext, batchSize);
+        } else {
+          nextVector((DecimalColumnVector) result, isNull, batchSize);
+        }
+      }
+    }
+
+    void skipStreamRows(long items) throws IOException {
+      for (int i = 0; i < items; i++) {
+        int input;
+        do {
+          input = valueStream.read();
+          if (input == -1) {
+            throw new EOFException("Reading BigInteger past EOF from " + valueStream);
+          }
+        } while(input >= 128);
       }
     }
 
@@ -1389,23 +1747,52 @@ public class TreeReaderFactory {
     }
 
     private void nextVector(DecimalColumnVector result,
+                            FilterContext filterContext,
                             final int batchSize) throws IOException {
       if (result.noNulls) {
-        for (int r=0; r < batchSize; ++r) {
-          result.vector[r].setFromLongAndScale(valueReader.next(), scale);
-        }
-      } else if (!result.isRepeating || !result.isNull[0]) {
-        for (int r=0; r < batchSize; ++r) {
-          if (result.noNulls || !result.isNull[r]) {
+        if (filterContext.isSelectedInUse()) {
+          int previousIdx = 0;
+          for (int r = 0; r != filterContext.getSelectedSize(); ++r) {
+            int idx = filterContext.getSelected()[r];
+            if (idx - previousIdx > 0) {
+              valueReader.skip(idx - previousIdx);
+            }
+            result.vector[idx].setFromLongAndScale(valueReader.next(), scale);
+            previousIdx = idx + 1;
+          }
+          valueReader.skip(batchSize - previousIdx);
+        } else {
+          for (int r = 0; r < batchSize; ++r) {
             result.vector[r].setFromLongAndScale(valueReader.next(), scale);
           }
         }
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        if (filterContext.isSelectedInUse()) {
+          int previousIdx = 0;
+          for (int r = 0; r != filterContext.getSelectedSize(); ++r) {
+            int idx = filterContext.getSelected()[r];
+            if (idx - previousIdx > 0) {
+              valueReader.skip(countNonNullRowsInRange(result.isNull, previousIdx, idx));
+            }
+            if (!result.isNull[r])
+              result.vector[idx].setFromLongAndScale(valueReader.next(), scale);
+            previousIdx = idx + 1;
+          }
+          valueReader.skip(countNonNullRowsInRange(result.isNull, previousIdx, batchSize));
+        } else {
+            for (int r = 0; r < batchSize; ++r) {
+              if (!result.isNull[r]) {
+                result.vector[r].setFromLongAndScale(valueReader.next(), scale);
+              }
+            }
+          }
       }
       result.precision = (short) precision;
       result.scale = (short) scale;
     }
 
     private void nextVector(Decimal64ColumnVector result,
+                            FilterContext filterContext,
                             final int batchSize) throws IOException {
       valueReader.nextVector(result, result.vector, batchSize);
       result.precision = (short) precision;
@@ -1415,13 +1802,14 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector result,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result instanceof Decimal64ColumnVector) {
-        nextVector((Decimal64ColumnVector) result, batchSize);
+        nextVector((Decimal64ColumnVector) result, filterContext, batchSize);
       } else {
-        nextVector((DecimalColumnVector) result, batchSize);
+        nextVector((DecimalColumnVector) result, filterContext, batchSize);
       }
     }
 
@@ -1504,8 +1892,9 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      reader.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      reader.nextVector(previousVector, isNull, batchSize, filterContext);
     }
 
     @Override
@@ -1563,8 +1952,8 @@ public class TreeReaderFactory {
                                          BytesColumnVector result,
                                          final int batchSize) throws IOException {
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
-        byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
-            result, (int) batchSize);
+        byte[] allBytes =
+            commonReadByteArrays(stream, lengths, scratchlcv, result, (int) batchSize);
 
         // Too expensive to figure out 'repeating' by comparisons.
         result.isRepeating = false;
@@ -1652,11 +2041,12 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
 
       scratchlcv.ensureSize(batchSize, false);
       BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
@@ -1799,13 +2189,18 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       final BytesColumnVector result = (BytesColumnVector) previousVector;
-      int offset;
-      int length;
 
       // Read present/isNull stream
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
+      readDictionaryByteArray(result, filterContext, batchSize);
+    }
+
+    private void readDictionaryByteArray(BytesColumnVector result, FilterContext filterContext, int batchSize) throws IOException {
+      int offset;
+      int length;
 
       if (dictionaryBuffer != null) {
 
@@ -1821,17 +2216,31 @@ public class TreeReaderFactory {
         scratchlcv.ensureSize((int) batchSize, false);
         reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
         if (!scratchlcv.isRepeating) {
-
           // The vector has non-repeating strings. Iterate thru the batch
           // and set strings one by one
-          for (int i = 0; i < batchSize; i++) {
-            if (!scratchlcv.isNull[i]) {
-              offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
-              length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
-              result.setRef(i, dictionaryBufferInBytesCache, offset, length);
-            } else {
-              // If the value is null then set offset and length to zero (null string)
+          if (filterContext.isSelectedInUse()) {
+            // Set all string values to null - offset and length is zero
+            for (int i = 0; i < batchSize; i++)
               result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+            // Read selected rows from stream
+            for (int i = 0; i != filterContext.getSelectedSize(); i++) {
+              int idx = filterContext.getSelected()[i];
+              if (!scratchlcv.isNull[idx]) {
+                offset = dictionaryOffsets[(int) scratchlcv.vector[idx]];
+                length = getDictionaryEntryLength((int) scratchlcv.vector[idx], offset);
+                result.setRef(idx, dictionaryBufferInBytesCache, offset, length);
+              }
+            }
+          } else {
+            for (int i = 0; i < batchSize; i++) {
+              if (!scratchlcv.isNull[i]) {
+                offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+                length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+                result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+              } else {
+                // If the value is null then set offset and length to zero (null string)
+                result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+              }
             }
           }
         } else {
@@ -1899,10 +2308,11 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
-      super.nextVector(previousVector, isNull, batchSize);
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
       BytesColumnVector result = (BytesColumnVector) previousVector;
       int adjustedDownLen;
       if (result.isRepeating) {
@@ -1955,10 +2365,11 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
-      super.nextVector(previousVector, isNull, batchSize);
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
       BytesColumnVector result = (BytesColumnVector) previousVector;
 
       int adjustedDownLen;
@@ -2038,8 +2449,9 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
-      super.nextVector(previousVector, isNull, batchSize);
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
+      super.nextVector(previousVector, isNull, batchSize, filterContext);
       StructColumnVector result = (StructColumnVector) previousVector;
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         result.isRepeating = false;
@@ -2048,7 +2460,7 @@ public class TreeReaderFactory {
         boolean[] mask = result.noNulls ? null : result.isNull;
         for (int f = 0; f < fields.length; f++) {
           if (fields[f] != null) {
-            fields[f].nextVector(result.fields[f], mask, batchSize);
+            fields[f].nextVector(result.fields[f], mask, batchSize, filterContext);
           }
         }
       }
@@ -2115,9 +2527,10 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       UnionColumnVector result = (UnionColumnVector) previousVector;
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         result.isRepeating = false;
         tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
@@ -2129,7 +2542,7 @@ public class TreeReaderFactory {
             ignore[r] = (!result.noNulls && result.isNull[r]) ||
                 result.tags[r] != f;
           }
-          fields[f].nextVector(result.fields[f], ignore, batchSize);
+          fields[f].nextVector(result.fields[f], ignore, batchSize, filterContext);
         }
       }
     }
@@ -2159,6 +2572,27 @@ public class TreeReaderFactory {
     }
   }
 
+  private static FilterContext NULL_FILTER = new FilterContext() {
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public boolean isSelectedInUse() {
+      return false;
+    }
+
+    @Override
+    public int[] getSelected() {
+      return new int[0];
+    }
+
+    @Override
+    public int getSelectedSize() {
+      return 0;
+    }
+  };
+
   public static class ListTreeReader extends TreeReader {
     protected final TypeReader elementReader;
     protected IntegerReader lengths = null;
@@ -2195,9 +2629,10 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previous,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       ListColumnVector result = (ListColumnVector) previous;
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       // if we have some none-null values, then read them
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         lengths.nextVector(result, result.lengths, batchSize);
@@ -2212,7 +2647,8 @@ public class TreeReaderFactory {
           }
         }
         result.child.ensureSize(result.childCount, false);
-        elementReader.nextVector(result.child, null, result.childCount);
+        // We always read all of the children, because the parent filter wouldn't apply right.
+        elementReader.nextVector(result.child, null, result.childCount, NULL_FILTER);
       }
     }
 
@@ -2289,9 +2725,10 @@ public class TreeReaderFactory {
     @Override
     public void nextVector(ColumnVector previous,
                            boolean[] isNull,
-                           final int batchSize) throws IOException {
+                           final int batchSize,
+                           FilterContext filterContext) throws IOException {
       MapColumnVector result = (MapColumnVector) previous;
-      super.nextVector(result, isNull, batchSize);
+      super.nextVector(result, isNull, batchSize, filterContext);
       if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
         lengths.nextVector(result, result.lengths, batchSize);
         // even with repeating lengths, the map doesn't repeat
@@ -2306,8 +2743,8 @@ public class TreeReaderFactory {
         }
         result.keys.ensureSize(result.childCount, false);
         result.values.ensureSize(result.childCount, false);
-        keyReader.nextVector(result.keys, null, result.childCount);
-        valueReader.nextVector(result.values, null, result.childCount);
+        keyReader.nextVector(result.keys, null, result.childCount, NULL_FILTER);
+        valueReader.nextVector(result.values, null, result.childCount, NULL_FILTER);
       }
     }
 
@@ -2421,7 +2858,7 @@ public class TreeReaderFactory {
           throws IOException {
     TypeReader reader = createTreeReader(readerType, context);
     if (reader instanceof StructTreeReader) {
-      return new StructBatchReader((StructTreeReader) reader);
+      return new StructBatchReader((StructTreeReader) reader, context);
     } else {
       return new PrimitiveBatchReader(reader);
     }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java
index 669b6a5..7caeb0e 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/PrimitiveBatchReader.java
@@ -31,7 +31,7 @@ public class PrimitiveBatchReader extends BatchReader {
                         int batchSize) throws IOException {
   batch.cols[0].reset();
   batch.cols[0].ensureSize(batchSize, false);
-  rootType.nextVector(batch.cols[0], null, batchSize);
+  rootType.nextVector(batch.cols[0], null, batchSize, batch);
   resetBatch(batch, batchSize);
   }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
index 9503436..e6a2db8 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
@@ -22,25 +22,55 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.impl.TreeReaderFactory;
 
 import java.io.IOException;
+import java.util.Set;
 
 public class StructBatchReader extends BatchReader {
+  // The reader context including row-filtering details
+  private final TreeReaderFactory.Context context;
 
-  public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader) {
+  public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader, TreeReaderFactory.Context context) {
     super(rowReader);
+    this.context = context;
+  }
+
+  private void readBatchColumn(VectorizedRowBatch batch, TypeReader[] children, int batchSize, int index)
+      throws IOException {
+    ColumnVector colVector = batch.cols[index];
+    if (colVector != null) {
+      colVector.reset();
+      colVector.ensureSize(batchSize, false);
+      children[index].nextVector(colVector, null, batchSize, batch);
+    }
   }
 
   @Override
   public void nextBatch(VectorizedRowBatch batch, int batchSize) throws IOException {
     TypeReader[] children = ((TreeReaderFactory.StructTreeReader) rootType).fields;
+    // Early expand fields --> apply filter --> expand remaining fields
+    Set<Integer> earlyExpandCols = context.getColumnFilterIds();
+
+    // Clear selected and early expand columns used in Filter
+    batch.selectedInUse = false;
+    for (int i = 0; i < children.length && !earlyExpandCols.isEmpty() &&
+        (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+      if (earlyExpandCols.contains(children[i].getColumnId())) {
+        readBatchColumn(batch, children, batchSize, i);
+      }
+    }
+    // Since we are going to filter rows based on some column values set batch.size earlier here
+    batch.size = batchSize;
+
+    // Apply filter callback to reduce number of # rows selected for decoding in the next TreeReaders
+    if (!earlyExpandCols.isEmpty() && this.context.getColumnFilterCallback() != null) {
+      this.context.getColumnFilterCallback().accept(batch);
+    }
+
+    // Read the remaining columns applying row-level filtering
     for (int i = 0; i < children.length &&
-                    (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
-      ColumnVector colVector = batch.cols[i];
-      if (colVector != null) {
-        colVector.reset();
-        colVector.ensureSize(batchSize, false);
-        children[i].nextVector(colVector, null, batchSize);
+        (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+      if (!earlyExpandCols.contains(children[i].getColumnId())) {
+        readBatchColumn(batch, children, batchSize, i);
       }
     }
-    resetBatch(batch, batchSize);
   }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java
index 5932bee..110cdb1 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/TypeReader.java
@@ -18,6 +18,7 @@
 package org.apache.orc.impl.reader.tree;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.orc.OrcProto;
 import org.apache.orc.impl.PositionProvider;
 import org.apache.orc.impl.reader.StripePlanner;
@@ -37,7 +38,8 @@ public interface TypeReader {
 
   void nextVector(ColumnVector previous,
                   boolean[] isNull,
-                  int batchSize) throws IOException;
+                  int batchSize,
+                  FilterContext filterContext) throws IOException;
 
   int getColumnId();
 }
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
new file mode 100644
index 0000000..b3a55b5
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+
+public class TestRowFilteringComplexTypes {
+    private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+            + File.separator + "tmp"));
+
+    private Configuration conf;
+    private FileSystem fs;
+    private Path testFilePath;
+
+    private static final int ColumnBatchRows = 1024;
+
+    @Rule
+    public TestName testCaseName = new TestName();
+
+    @Before
+    public void openFileSystem() throws Exception {
+        conf = new Configuration();
+        fs = FileSystem.getLocal(conf);
+        testFilePath = new Path(workDir, "TestRowFilteringComplexTypes." + testCaseName.getMethodName() + ".orc");
+        fs.delete(testFilePath, false);
+    }
+
+    @Test
+    // Inner Struct should receive the filterContext and propagate it the the SubTypes
+    public void testInnerStructRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.createStruct()
+                .addField("int1", TypeDescription.createInt())
+                .addField("innerStruct", TypeDescription.createStruct()
+                        .addField("a", TypeDescription.createDecimal())
+                        .addField("b", TypeDescription.createDecimal())
+                );
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+                OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            StructColumnVector col2 = (StructColumnVector) batch.cols[1];
+            DecimalColumnVector innerCol1 = (DecimalColumnVector) col2.fields[0];
+            DecimalColumnVector innerCol2 = (DecimalColumnVector) col2.fields[1];
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    col1.vector[row] = row;
+                    if ((row % 2) == 0) {
+                        innerCol1.vector[row] = new HiveDecimalWritable(101 + row);
+                        innerCol2.vector[row] = new HiveDecimalWritable(100 + row);
+                    } else {
+                        innerCol1.vector[row] = new HiveDecimalWritable(999 + row);
+                        innerCol2.vector[row] = new HiveDecimalWritable(998 + row);
+                    }
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+                reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            StructColumnVector col2 = (StructColumnVector) batch.cols[1];
+
+            int noNullCnt = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < ColumnBatchRows; ++r) {
+                    StringBuilder sb = new StringBuilder();
+                    col2.stringifyValue(sb, r);
+                    if (sb.toString().compareTo("[0, 0]") != 0) {
+                        noNullCnt++;
+                    }
+                }
+            }
+            // Make sure that our filter worked
+            Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+            Assert.assertEquals(0, batch.selected[0]);
+            Assert.assertEquals(2, batch.selected[1]);
+        }
+    }
+
+
+    @Test
+    // Inner UNION should make use of the filterContext
+    public void testInnerUnionRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.fromString(
+            "struct<int1:int,innerUnion:uniontype<decimal(16,3),decimal(16,3)>>");
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+            OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            UnionColumnVector col2 = (UnionColumnVector) batch.cols[1];
+            Decimal64ColumnVector innerCol1 = (Decimal64ColumnVector) col2.fields[0];
+            Decimal64ColumnVector innerCol2 = (Decimal64ColumnVector) col2.fields[1];
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    int totalRow = ColumnBatchRows * b + row;
+                    col1.vector[row] = totalRow;
+                    col2.tags[row] = totalRow % 2;
+                    if (col2.tags[row] == 0) {
+                        innerCol1.vector[row] = totalRow * 1000;
+                    } else {
+                        innerCol2.vector[row] = totalRow * 3 * 1000;
+                    }
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+            reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            UnionColumnVector col2 = (UnionColumnVector) batch.cols[1];
+            Decimal64ColumnVector innerCol1 = (Decimal64ColumnVector) col2.fields[0];
+            Decimal64ColumnVector innerCol2 = (Decimal64ColumnVector) col2.fields[1];
+
+            int previousBatchRows = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < batch.size; ++r) {
+                    int row = batch.selected[r];
+                    int originalRow = (r + previousBatchRows) * 2;
+                    Assert.assertEquals("row " + originalRow, originalRow, col1.vector[row]);
+                    Assert.assertEquals("row " + originalRow, 0, col2.tags[row]);
+                    Assert.assertEquals("row " + originalRow,
+                        originalRow * 1000, innerCol1.vector[row]);
+                }
+                // check to make sure that we didn't read innerCol2
+                for(int r = 1; r < ColumnBatchRows; r += 2) {
+                    Assert.assertEquals("row " + r, 0, innerCol2.vector[r]);
+                }
+                previousBatchRows += batch.size;
+            }
+        }
+    }
+
+
+    @Test
+    // Inner MAP should NOT make use of the filterContext
+    // TODO: selected rows should be combined with map offsets
+    public void testInnerMapRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.createStruct()
+                .addField("int1", TypeDescription.createInt())
+                .addField("innerMap", TypeDescription.createMap(
+                        TypeDescription.createDecimal(),
+                        TypeDescription.createDecimal()
+                        )
+                );
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+                OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            MapColumnVector mapCol = (MapColumnVector) batch.cols[1];
+            DecimalColumnVector keyCol = (DecimalColumnVector) mapCol.keys;
+            DecimalColumnVector valCol = (DecimalColumnVector) mapCol.values;
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    col1.vector[row] = row;
+                    // Insert 2 kv pairs in each row
+                    for (int i = 0; i < 2; i++) {
+                        keyCol.vector[i] = new HiveDecimalWritable(i);
+                        valCol.vector[i] = new HiveDecimalWritable(i * 10);
+                    }
+                    mapCol.lengths[row] = 2;
+                    mapCol.offsets[row] = 0;
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+                reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            MapColumnVector col2 = (MapColumnVector) batch.cols[1];
+
+            int noNullCnt = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < ColumnBatchRows; ++r) {
+                    StringBuilder sb = new StringBuilder();
+                    col2.stringifyValue(sb, r);
+                    if (sb.toString().equals("[{\"key\": 0, \"value\": 0}, {\"key\": 1, \"value\": 10}]")) {
+                        noNullCnt++;
+                    }
+                }
+            }
+            // Make sure that we did NOT skip any rows
+            Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+            // Even though selected Array is still used its not propagated
+            Assert.assertEquals(0, batch.selected[0]);
+            Assert.assertEquals(2, batch.selected[1]);
+        }
+    }
+
+    @Test
+    // Inner LIST should NOT make use of the filterContext
+    // TODO: selected rows should be combined with list offsets
+    public void testInnerListRowFilter() throws Exception {
+        // Set the row stride to a multiple of the batch size
+        final int INDEX_STRIDE = 16 * ColumnBatchRows;
+        final int NUM_BATCHES = 2;
+
+        TypeDescription schema = TypeDescription.createStruct()
+                .addField("int1", TypeDescription.createInt())
+                .addField("innerList", TypeDescription
+                        .createList(TypeDescription.createDecimal())
+                );
+
+        try (Writer writer = OrcFile.createWriter(testFilePath,
+                OrcFile.writerOptions(conf).setSchema(schema).rowIndexStride(INDEX_STRIDE))) {
+            VectorizedRowBatch batch = schema.createRowBatchV2();
+            LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+            ListColumnVector listCol = (ListColumnVector) batch.cols[1];
+            DecimalColumnVector listValues = (DecimalColumnVector) listCol.child;
+
+            for (int b = 0; b < NUM_BATCHES; ++b) {
+                batch.reset();
+                batch.size = ColumnBatchRows;
+                for (int row = 0; row < batch.size; row++) {
+                    col1.vector[row] = row;
+                    // Insert 10 values to the interList per row
+                    for (int i = 0; i < 10; i++) {
+                        listValues.vector[i] = new HiveDecimalWritable(i);
+                    }
+                    listCol.lengths[row] = 10;
+                    listCol.offsets[row] = 0;
+                }
+                col1.isRepeating = false;
+                writer.addRowBatch(batch);
+            }
+        }
+
+        Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+        try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+                reader.options().setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+            VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+            ListColumnVector col2 = (ListColumnVector) batch.cols[1];
+
+            int noNullCnt = 0;
+            while (rows.nextBatch(batch)) {
+                Assert.assertTrue(batch.selectedInUse);
+                Assert.assertEquals(ColumnBatchRows / 2, batch.size);
+                for (int r = 0; r < ColumnBatchRows; ++r) {
+                    StringBuilder sb = new StringBuilder();
+                    col2.stringifyValue(sb, r);
+                    if (sb.toString().equals("[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]")) {
+                        noNullCnt++;
+                    }
+                }
+            }
+            // Make sure that we did NOT skip any rows
+            Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+            // Even though selected Array is still used its not propagated
+            Assert.assertEquals(0, batch.selected[0]);
+            Assert.assertEquals(2, batch.selected[1]);
+        }
+    }
+}
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
new file mode 100644
index 0000000..407bd81
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.sql.Timestamp;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Types that are not skipped at row-level include: Long, Short, Int, Date, Binary
+ * As it turns out it is more expensive to skip non-selected rows rather that just decode all and propagate the
+ * selected array. Skipping for these type breaks instruction pipelining and introduces more branch misspredictions.
+ */
+public class TestRowFilteringNoSkip {
+
+  private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+      + File.separator + "tmp"));
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path testFilePath;
+
+  private static final int ColumnBatchRows = 1024;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestRowFilteringNoSkip." + testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Test
+  public void testLongRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("int2", TypeDescription.createLong());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.vector[row] = 100;
+          else
+            col2.vector[row] = 999;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // For Int type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], 100);
+      Assert.assertEquals(col2.vector[511], 999);
+      Assert.assertEquals(col2.vector[1020],  100);
+      Assert.assertEquals(col2.vector[1021], 999);
+    }
+  }
+
+  @Test
+  public void testIntRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("int2", TypeDescription.createInt());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        col1.vector[1023] = b;
+        col2.vector[1023] = 101;
+        for (int row = 0; row < batch.size-1; row++) {
+          col1.vector[row] = 999;
+          col2.vector[row] = row+1;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intFirstRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCount = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCount++;
+
+
+        }
+      }
+      // For Int type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCount);
+      // check filter-selected output
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(0, batch.selected[1]);
+      Assert.assertEquals(0, batch.selected[1023]);
+    }
+  }
+
+  @Test
+  public void testShortRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("short2", TypeDescription.createShort());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = row*2+1;
+          else
+            col2.vector[row] = -1 * row*2;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // For Short type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] > 0);
+      Assert.assertTrue(col2.vector[511] < 0);
+      Assert.assertTrue(col2.vector[1020] > 0);
+      Assert.assertTrue(col2.vector[1021] < 0);
+    }
+  }
+
+  @Test
+  public void testDateRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("dt2", TypeDescription.createDate());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = Timestamp.valueOf("2020-04-01 12:34:56.9").toInstant().getEpochSecond();
+          else
+            col2.vector[row] = Timestamp.valueOf("2019-04-01 12:34:56.9").toInstant().getEpochSecond();
+        }
+        col2.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // For Date type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] != 0);
+      Assert.assertTrue(col2.vector[511] != 0);
+      Assert.assertTrue(col2.vector[1020] != 0);
+      Assert.assertTrue(col2.vector[1021] != 0);
+    }
+  }
+
+  @Test
+  public void testBinaryRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("binary2", TypeDescription.createBinary());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      // Write 50 batches where each batch has a single value for str.
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.setVal(row, TestVectorOrcFile.bytesArray(0, 1, 2, 3, row));
+          else
+            col2.setVal(row, TestVectorOrcFile.bytesArray(1, 2, 3, 4, row));
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        // We applied the given filter so selected is true
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Selected Arrays is propagated -- so size is never 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        // But since this Column type is not actually filtered there will be no nulls!
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!TestVectorOrcFile.getBinary(col2, r).equals(TestVectorOrcFile.bytes()))
+            noNullCnt ++;
+        }
+      }
+      // For Binary type ColumnVector filtering does not remove any data!
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 0), TestVectorOrcFile.bytes());
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 511), TestVectorOrcFile.bytes());
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 1020), TestVectorOrcFile.bytes());
+      Assert.assertNotEquals(TestVectorOrcFile.getBinary(col2, 1021), TestVectorOrcFile.bytes());
+    }
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
new file mode 100644
index 0000000..0be05b3
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
@@ -0,0 +1,1281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Types that are skipped at row-level include: Decimal, Decimal64, Double, Float, Char, VarChar, String, Boolean, Timestamp
+ * For the remaining types that are not row-skipped see {@link TestRowFilteringNoSkip}
+ */
+public class TestRowFilteringSkip {
+
+  private Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+      + File.separator + "tmp"));
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path testFilePath;
+
+  private static final int ColumnBatchRows = 1024;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestRowFilteringSkip." + testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  public static String convertTime(long time){
+    Date date = new Date(time);
+    Format format = new SimpleDateFormat("yyyy-MM-d HH:mm:ss.SSS");
+    return format.format(date);
+  }
+
+  // Filter all rows except: 924 and 940
+  public static void intAnyRowFilter(VectorizedRowBatch batch) {
+    // Dummy Filter implementation passing just one Batch row
+    int newSize = 2;
+    batch.selected[0] = batch.size-100;
+    batch.selected[1] = 940;
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  // Filter all rows except the first one
+  public static void intFirstRowFilter(VectorizedRowBatch batch) {
+    int newSize = 0;
+    for (int row = 0; row <batch.size; ++row) {
+      if (row == 0) {
+        batch.selected[newSize++] = row;
+      }
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  // Filter out rows in a round-robbin fashion starting with a pass
+  public static void intRoundRobbinRowFilter(VectorizedRowBatch batch) {
+    int newSize = 0;
+    for (int row = 0; row < batch.size; ++row) {
+      if ((row % 2) == 0) {
+        batch.selected[newSize++] = row;
+      }
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  static int rowCount = 0;
+  public static void intCustomValueFilter(VectorizedRowBatch batch) {
+    LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+    int newSize = 0;
+    for (int row = 0; row <batch.size; ++row) {
+      long val = col1.vector[row];
+      if ((val == 2) || (val == 5) || (val == 13) || (val == 29) || (val == 70)) {
+        batch.selected[newSize++] = row;
+      }
+      rowCount++;
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  @Test
+  public void testDecimalRepeatingFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal());
+
+    HiveDecimalWritable passDataVal = new HiveDecimalWritable("100");
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          col2.vector[row] = passDataVal;
+        }
+        col1.isRepeating = true;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].compareTo(passDataVal) == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], passDataVal);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[1020],  passDataVal);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+    }
+  }
+
+  @Test
+  public void testDecimalRoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal());
+
+    HiveDecimalWritable failDataVal = new HiveDecimalWritable("-100");
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = new HiveDecimalWritable(row+1);
+          else
+            col2.vector[row] = failDataVal;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].getHiveDecimal().longValue() > 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0].getHiveDecimal().longValue(), 1);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[1020].getHiveDecimal().longValue(),  1021);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+    }
+  }
+
+  @Test
+  public void testDecimalNullRoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal());
+
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = new HiveDecimalWritable(row+1);
+        }
+        // Make sure we trigger the nullCount path of DecimalTreeReader
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].getHiveDecimal().longValue() > 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0].getHiveDecimal().longValue(), 1);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[1020].getHiveDecimal().longValue(),  1021);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+    }
+  }
+
+
+  @Test
+  public void testMultiDecimalSingleFilterCallback() throws Exception {
+    /// Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal())
+        .addField("decimal2", TypeDescription.createDecimal());
+
+    HiveDecimalWritable passDataVal = new HiveDecimalWritable("12");
+    HiveDecimalWritable failDataVal = new HiveDecimalWritable("100");
+    HiveDecimalWritable nullDataVal = new HiveDecimalWritable("0");
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      // Write 50 batches where each batch has a single value for str.
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      DecimalColumnVector col3 = (DecimalColumnVector) batch.cols[2];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if (row == 924 || row == 940) {
+            col2.vector[row] = passDataVal;
+            col3.vector[row] = passDataVal;
+          } else {
+            col2.vector[row] = failDataVal;
+            col3.vector[row] = failDataVal;
+          }
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intAnyRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DecimalColumnVector col2 = (DecimalColumnVector) batch.cols[1];
+      DecimalColumnVector col3 = (DecimalColumnVector) batch.cols[2];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r].compareTo(passDataVal) == 0 && col3.vector[r].compareTo(passDataVal) == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 2, noNullCnt);
+      Assert.assertEquals(924, batch.selected[0]);
+      Assert.assertEquals(940, batch.selected[1]);
+      Assert.assertEquals(0, batch.selected[2]);
+      Assert.assertEquals(col2.vector[0],  nullDataVal);
+      Assert.assertEquals(col3.vector[0],  nullDataVal);
+      Assert.assertEquals(col2.vector[511], nullDataVal);
+      Assert.assertEquals(col3.vector[511], nullDataVal);
+      Assert.assertEquals(col2.vector[924], passDataVal);
+      Assert.assertEquals(col3.vector[940], passDataVal);
+      Assert.assertEquals(col2.vector[1020], nullDataVal);
+      Assert.assertEquals(col3.vector[1020], nullDataVal);
+      Assert.assertEquals(col2.vector[1021], nullDataVal);
+      Assert.assertEquals(col3.vector[1021], nullDataVal);
+    }
+  }
+
+  @Test
+  public void testDecimal64RoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal().withPrecision(10).withScale(2));
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = row + 1;
+          else
+            col2.vector[row] = -1 * row;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], 1);
+      Assert.assertEquals(col2.vector[511], 0);
+      Assert.assertEquals(col2.vector[1020],  1021);
+      Assert.assertEquals(col2.vector[1021], 0);
+    }
+  }
+
+  @Test
+  public void testDecimal64NullRoundRobbinFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("decimal1", TypeDescription.createDecimal().withPrecision(10).withScale(2));
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = row + 1;
+        }
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertEquals(col2.vector[0], 1);
+      Assert.assertEquals(col2.vector[511], 0);
+      Assert.assertEquals(col2.vector[1020],  1021);
+      Assert.assertEquals(col2.vector[1021], 0);
+    }
+  }
+
+  @Test
+  public void testDoubleRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("double2", TypeDescription.createDouble());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.vector[row] = 100;
+          else
+            col2.vector[row] = 999;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 100)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 100.0);
+      Assert.assertTrue(col2.vector[511] == 0.0);
+      Assert.assertTrue(col2.vector[1020] == 100);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testFloatRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("float2", TypeDescription.createFloat());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.vector[row] = 100+row;
+          else
+            col2.vector[row] = 999;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      DoubleColumnVector col2 = (DoubleColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] != 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] != 999.0);
+      Assert.assertTrue(col2.vector[511] == 0.0);
+      Assert.assertTrue(col2.vector[1020] == 1120.0);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testCharRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("char2", TypeDescription.createChar());
+
+    byte[] passData = ("p").getBytes(StandardCharsets.UTF_8);
+    byte[] failData = ("f").getBytes(StandardCharsets.UTF_8);
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.setVal(row, passData);
+          else
+            col2.setVal(row, failData);
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).equals("p"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).equals("p"));
+      Assert.assertTrue(col2.toString(1021).isEmpty());
+    }
+  }
+
+  @Test
+  public void testVarCharRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("varchar2", TypeDescription.createVarchar());
+
+    byte[] passData = ("p").getBytes(StandardCharsets.UTF_8);
+    byte[] failData = ("f").getBytes(StandardCharsets.UTF_8);
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.setVal(row, passData);
+          else
+            col2.setVal(row, failData);
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).equals("p"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).equals("p"));
+      Assert.assertTrue(col2.toString(1021).isEmpty());
+    }
+  }
+
+  @Test
+  public void testDirectStringRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 10 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("string1", TypeDescription.createString());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      // Write 50 batches where each batch has a single value for str.
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) ==0 )
+            col2.setVal(row, ("passData-" + row).getBytes(StandardCharsets.UTF_8));
+          else
+            col2.setVal(row, ("failData-" + row).getBytes(StandardCharsets.UTF_8));
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).startsWith("pass"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).startsWith("pass"));
+    }
+  }
+
+  @Test
+  public void testDictionaryStringRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 100 * ColumnBatchRows;
+    final int NUM_BATCHES = 100;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("string1", TypeDescription.createString());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if (row % 2 ==0)
+            col2.setVal(row, ("passData").getBytes(StandardCharsets.UTF_8));
+          else
+            col2.setVal(row, ("failData").getBytes(StandardCharsets.UTF_8));
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      BytesColumnVector col2 = (BytesColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (!col2.toString(r).isEmpty())
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.toString(0).startsWith("pass"));
+      Assert.assertTrue(col2.toString(511).isEmpty());
+      Assert.assertTrue(col2.toString(1020).startsWith("pass"));
+    }
+  }
+
+  @Test
+  public void testRepeatingBooleanRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("bool2", TypeDescription.createBoolean());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          col2.vector[row] = 0;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * ColumnBatchRows, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 0);
+      Assert.assertTrue(col2.vector[511] == 0);
+      Assert.assertTrue(col2.vector[1020] == 0);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testBooleanRoundRobbinRowFilterCallback() throws Exception {
+    final int INDEX_STRIDE = 0;
+    final int NUM_BATCHES = 10;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("bool2", TypeDescription.createBoolean());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.vector[row] = 1;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 1);
+      Assert.assertTrue(col2.vector[511] == 0);
+      Assert.assertTrue(col2.vector[1020] == 1);
+      Assert.assertTrue(col2.vector[1021] == 0);
+    }
+  }
+
+  @Test
+  public void testBooleanAnyRowFilterCallback() throws Exception {
+    final int INDEX_STRIDE = 0;
+    final int NUM_BATCHES = 100;
+
+    // ORC write some data (one PASSing row per batch)
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("bool2", TypeDescription.createBoolean());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if (row == 924 || row == 940)
+            col2.vector[row] = 1;
+        }
+        col1.isRepeating = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intAnyRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      LongColumnVector col2 = (LongColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.vector[r] == 1)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 2, noNullCnt);
+      Assert.assertEquals(924, batch.selected[0]);
+      Assert.assertEquals(940, batch.selected[1]);
+      Assert.assertTrue(col2.vector[0] == 0);
+      Assert.assertTrue(col2.vector[511] == 0);
+      Assert.assertTrue(col2.vector[1020] == 0);
+      Assert.assertTrue(col2.vector[924] == 1);
+      Assert.assertTrue(col2.vector[940] == 1);
+    }
+  }
+
+  @Test
+  public void testTimestampRoundRobbinRowFilterCallback() throws Exception {
+    // Set the row stride to a multiple of the batch size
+    final int INDEX_STRIDE = 16 * ColumnBatchRows;
+    final int NUM_BATCHES = 10;
+
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("ts2", TypeDescription.createTimestamp());
+
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .rowIndexStride(INDEX_STRIDE))) {
+      VectorizedRowBatch batch = schema.createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col2 = (TimestampColumnVector) batch.cols[1];
+      for (int b=0; b < NUM_BATCHES; ++b) {
+        batch.reset();
+        batch.size = ColumnBatchRows;
+        for (int row = 0; row < batch.size; row++) {
+          col1.vector[row] = row;
+          if ((row % 2) == 0)
+            col2.set(row, Timestamp.valueOf((1900+row)+"-04-01 12:34:56.9"));
+          else {
+            col2.isNull[row] = true;
+            col2.set(row, null);
+          }
+        }
+        col1.isRepeating = true;
+        col1.noNulls = false;
+        col2.noNulls = false;
+        writer.addRowBatch(batch);
+      }
+    }
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"int1"}, TestRowFilteringSkip::intRoundRobbinRowFilter))) {
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col2 = (TimestampColumnVector) batch.cols[1];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col2.getTime(r) == 0)
+            noNullCnt ++;
+        }
+      }
+      // Make sure that our filter worked
+      Assert.assertEquals(NUM_BATCHES * 512, noNullCnt);
+      Assert.assertEquals(false, col2.isRepeating);
+      Assert.assertEquals(0, batch.selected[0]);
+      Assert.assertEquals(2, batch.selected[1]);
+      Assert.assertTrue(convertTime(col2.getTime(0)).compareTo("1900-04-1 12:34:56.900") == 0);
+      Assert.assertTrue(col2.getTime(511) == 0);
+      Assert.assertTrue(convertTime(col2.getTime(1020)).compareTo("2920-04-1 12:34:56.900") == 0);
+      Assert.assertTrue(col2.getTime(1021) == 0);
+    }
+  }
+
+  @Test
+  public void testcustomFileTimestampRoundRobbinRowFilterCallback() throws Exception {
+    testFilePath = new Path(getClass().getClassLoader().
+        getSystemResource("orc_split_elim.orc").getPath());
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
+
+    try (RecordReaderImpl rows = (RecordReaderImpl) reader.rows(
+        reader.options()
+            .setRowFilter(new String[]{"userid"}, TestRowFilteringSkip::intCustomValueFilter))) {
+
+      VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+      LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+      TimestampColumnVector col5 = (TimestampColumnVector) batch.cols[4];
+
+      // We assume that it fits in a single stripe
+      assertEquals(1, reader.getStripes().size());
+
+      int noNullCnt = 0;
+      while (rows.nextBatch(batch)) {
+        Assert.assertTrue(batch.selectedInUse);
+        Assert.assertTrue(batch.selected != null);
+        // Rows are filtered so it should never be 1024
+        Assert.assertTrue(batch.size != ColumnBatchRows);
+        assertEquals( true, col1.noNulls);
+        for (int r = 0; r < ColumnBatchRows; ++r) {
+          if (col1.vector[r] != 100)
+            noNullCnt ++;
+        }
+      }
+
+      // Total rows of the file should be 25k
+      Assert.assertEquals(25000, rowCount);
+      // Make sure that our filter worked ( 5 rows with userId != 100)
+      Assert.assertEquals(5, noNullCnt);
+      Assert.assertEquals(false, col5.isRepeating);
+      Assert.assertEquals(544, batch.selected[0]);
+      Assert.assertEquals(0, batch.selected[1]);
+      Assert.assertTrue(col5.getTime(0) == 0);
+      Assert.assertTrue(col5.getTime(544) != 0);
+    }
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index bc2ff94..252c4f2 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -169,7 +169,7 @@ public class TestVectorOrcFile {
     return result;
   }
 
-  private static BytesWritable bytes(int... items) {
+  protected static BytesWritable bytes(int... items) {
     BytesWritable result = new BytesWritable();
     result.setSize(items.length);
     for(int i=0; i < items.length; ++i) {
@@ -178,7 +178,7 @@ public class TestVectorOrcFile {
     return result;
   }
 
-  private static byte[] bytesArray(int... items) {
+  protected static byte[] bytesArray(int... items) {
     byte[] result = new byte[items.length];
     for(int i=0; i < items.length; ++i) {
       result[i] = (byte) items[i];
@@ -876,7 +876,7 @@ public class TestVectorOrcFile {
     return ((DoubleColumnVector) batch.cols[6]).vector[rowId];
   }
 
-  private static BytesWritable getBinary(BytesColumnVector column, int rowId) {
+  protected static BytesWritable getBinary(BytesColumnVector column, int rowId) {
     if (column.isRepeating) {
       rowId = 0;
     }
diff --git a/java/core/src/test/resources/orc_split_elim.orc b/java/core/src/test/resources/orc_split_elim.orc
new file mode 100644
index 0000000..ff3557f
Binary files /dev/null and b/java/core/src/test/resources/orc_split_elim.orc differ
diff --git a/java/pom.xml b/java/pom.xml
index c45844f..053bedb 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -70,7 +70,7 @@
 
     <min.hadoop.version>2.2.0</min.hadoop.version>
     <hadoop.version>2.7.3</hadoop.version>
-    <storage-api.version>2.7.1</storage-api.version>
+    <storage-api.version>2.7.2</storage-api.version>
     <zookeeper.version>3.4.6</zookeeper.version>
   </properties>