You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/03/04 18:18:07 UTC

[orc] branch master updated: ORC-755: Added OrcFilterContext that accomplishes the following: * Change the filter from Consumer to Consumer * OrcFilterContext has findColumnVector method that allows cached determination of ColumnVectors from name * OrcFilterContext offers utility methods for null determination on the vector batch * Refactors ParserUtils to introduce TypeWalker and VectorWalker to centralize the walk logic * Tests have been modified to support the new interface

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 853d574  ORC-755: Added OrcFilterContext that accomplishes the following: * Change the filter from Consumer<VectorizedRowBatch> to Consumer<OrcFilterContext> * OrcFilterContext has findColumnVector method that allows cached determination of ColumnVectors from name * OrcFilterContext offers utility methods for null determination on the vector batch * Refactors ParserUtils to introduce TypeWalker and VectorWalker to centralize the walk logic * Tests have been modified to support th [...]
853d574 is described below

commit 853d574df4c23f8dbe892b82220ad82f123f2bf2
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Tue Feb 23 16:55:03 2021 -0800

    ORC-755: Added OrcFilterContext that accomplishes the following:
    * Change the filter from Consumer<VectorizedRowBatch> to Consumer<OrcFilterContext>
    * OrcFilterContext has findColumnVector method that allows cached determination of ColumnVectors from name
    * OrcFilterContext offers utility methods for null determination on the vector batch
    * Refactors ParserUtils to introduce TypeWalker and VectorWalker to centralize the walk logic
    * Tests have been modified to support the new interface
    
    Fixes #647
    
    Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
 .../bench/hive/RowFilterProjectionBenchmark.java   |  11 +-
 .../hive/rowfilter/BooleanRowFilterBenchmark.java  |  11 +-
 .../hive/rowfilter/DecimalRowFilterBenchmark.java  |  11 +-
 .../hive/rowfilter/DoubleRowFilterBenchmark.java   |  11 +-
 .../hive/rowfilter/StringRowFilterBenchmark.java   |  11 +-
 .../rowfilter/TimestampRowFilterBenchmark.java     |  11 +-
 .../src/java/org/apache/orc/OrcFilterContext.java  |  91 +++++++++
 java/core/src/java/org/apache/orc/Reader.java      |   7 +-
 .../src/java/org/apache/orc/TypeDescription.java   |  21 +-
 .../org/apache/orc/impl/OrcFilterContextImpl.java  | 130 ++++++++++++
 .../src/java/org/apache/orc/impl/ParserUtils.java  | 207 ++++++++++++++++---
 .../org/apache/orc/impl/TreeReaderFactory.java     |   9 +-
 .../orc/impl/reader/tree/StructBatchReader.java    |   5 +-
 .../test/org/apache/orc/TestOrcFilterContext.java  | 224 +++++++++++++++++++++
 .../test/org/apache/orc/TestRowFilteringSkip.java  |  97 +++++----
 .../apache/orc/impl/TestOrcFilterContextImpl.java  | 209 +++++++++++++++++++
 16 files changed, 940 insertions(+), 126 deletions(-)

diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
index d125606..0f2718d 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.core.OrcBenchmark;
 import org.apache.orc.bench.core.ReadCounters;
 import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Mode;
@@ -93,16 +94,16 @@ public class RowFilterProjectionBenchmark implements OrcBenchmark {
     }
   }
 
-  public static void customIntRowFilter(VectorizedRowBatch batch) {
+  public static void customIntRowFilter(OrcFilterContext batch) {
     int newSize = 0;
-    for (int row = 0; row < batch.size; ++row) {
+    for (int row = 0; row < batch.getSelectedSize(); ++row) {
       // Select ONLY specific keys
       if (filterValues.contains(row)) {
-        batch.selected[newSize++] = row;
+        batch.getSelected()[newSize++] = row;
       }
     }
-    batch.selectedInUse = true;
-    batch.size = newSize;
+    batch.setSelectedInUse(true);
+    batch.setSelectedSize(newSize);
   }
 
   @Benchmark
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
index b7a0e69..789dda4 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class BooleanRowFilterBenchmark extends org.openjdk.jmh.Main {
       return filterValues;
     }
 
-    public static void customIntRowFilter(VectorizedRowBatch batch) {
+    public static void customIntRowFilter(OrcFilterContext batch) {
       int newSize = 0;
-      for (int row = 0; row < batch.size; ++row) {
+      for (int row = 0; row < batch.getSelectedSize(); ++row) {
         if (filterValues[row]) {
-          batch.selected[newSize++] = row;
+          batch.getSelected()[newSize++] = row;
         }
       }
-      batch.selectedInUse = true;
-      batch.size = newSize;
+      batch.setSelectedInUse(true);
+      batch.setSelectedSize(newSize);
     }
   }
 
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
index a43c4c5..dc15b30 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -128,15 +129,15 @@ public class DecimalRowFilterBenchmark extends org.openjdk.jmh.Main {
       return filterValues;
     }
 
-    public static void customIntRowFilter(VectorizedRowBatch batch) {
+    public static void customIntRowFilter(OrcFilterContext batch) {
       int newSize = 0;
-      for (int row = 0; row < batch.size; ++row) {
+      for (int row = 0; row < batch.getSelectedSize(); ++row) {
         if (filterValues[row]) {
-          batch.selected[newSize++] = row;
+          batch.getSelected()[newSize++] = row;
         }
       }
-      batch.selectedInUse = true;
-      batch.size = newSize;
+      batch.setSelectedInUse(true);
+      batch.setSelectedSize(newSize);
     }
   }
 
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
index d5afb48..616207e 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class DoubleRowFilterBenchmark extends org.openjdk.jmh.Main {
       return filterValues;
     }
 
-    public static void customIntRowFilter(VectorizedRowBatch batch) {
+    public static void customIntRowFilter(OrcFilterContext batch) {
       int newSize = 0;
-      for (int row = 0; row < batch.size; ++row) {
+      for (int row = 0; row < batch.getSelectedSize(); ++row) {
         if (filterValues[row]) {
-          batch.selected[newSize++] = row;
+          batch.getSelected()[newSize++] = row;
         }
       }
-      batch.selectedInUse = true;
-      batch.size = newSize;
+      batch.setSelectedInUse(true);
+      batch.setSelectedSize(newSize);
     }
   }
 
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
index 33447cd..4170523 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class StringRowFilterBenchmark extends org.openjdk.jmh.Main {
       return filterValues;
     }
 
-    public static void customIntRowFilter(VectorizedRowBatch batch) {
+    public static void customIntRowFilter(OrcFilterContext batch) {
       int newSize = 0;
-      for (int row = 0; row < batch.size; ++row) {
+      for (int row = 0; row < batch.getSelectedSize(); ++row) {
         if (filterValues[row]) {
-          batch.selected[newSize++] = row;
+          batch.getSelected()[newSize++] = row;
         }
       }
-      batch.selectedInUse = true;
-      batch.size = newSize;
+      batch.setSelectedInUse(true);
+      batch.setSelectedSize(newSize);
     }
   }
 
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
index f4438a0..363db35 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class TimestampRowFilterBenchmark extends org.openjdk.jmh.Main {
       return filterValues;
     }
 
-    public static void customIntRowFilter(VectorizedRowBatch batch) {
+    public static void customIntRowFilter(OrcFilterContext batch) {
       int newSize = 0;
-      for (int row = 0; row < batch.size; ++row) {
+      for (int row = 0; row < batch.getSelectedSize(); ++row) {
         if (filterValues[row]) {
-          batch.selected[newSize++] = row;
+          batch.getSelected()[newSize++] = row;
         }
       }
-      batch.selectedInUse = true;
-      batch.size = newSize;
+      batch.setSelectedInUse(true);
+      batch.setSelectedSize(newSize);
     }
   }
 
diff --git a/java/core/src/java/org/apache/orc/OrcFilterContext.java b/java/core/src/java/org/apache/orc/OrcFilterContext.java
new file mode 100644
index 0000000..52c1500
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/OrcFilterContext.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext;
+
+/**
+ * This defines the input for any filter operation. This is an extension of
+ * [[{@link VectorizedRowBatch}]] with schema.
+ * <p>
+ * This offers a convenience method of finding the column vector from a given column name
+ * that the filters can invoke to get access to the column vector.
+ */
+public interface OrcFilterContext extends MutableFilterContext {
+  /**
+   * Retrieves the column vector that matches the specified name. Allows support for nested struct
+   * references e.g. order.date where date is a field in a struct called order.
+   *
+   * @param name The column name whose vector should be retrieved
+   * @return The column vectors from the root to the column name. The array levels match the name
+   * levels with Array[0] referring to the top level, followed by the subsequent levels. For
+   * example of order.date Array[0] refers to order and Array[1] refers to date
+   * @throws IllegalArgumentException if the field is not found or if the nested field is not part
+   *                                  of a struct
+   */
+  ColumnVector[] findColumnVector(String name);
+
+  /**
+   * Utility method for determining if the leaf vector of the branch can be treated as having
+   * noNulls.
+   * This method navigates from the top to the leaf and checks if we have nulls anywhere in the
+   * branch as compared to checking just the leaf vector.
+   *
+   * @param vectorBranch The input vector branch from the root to the leaf
+   * @return true if the entire branch satisfies noNull else false
+   */
+  static boolean noNulls(ColumnVector[] vectorBranch) {
+    for (ColumnVector v : vectorBranch) {
+      if (!v.noNulls) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Utility method for determining if a particular row element in the vector branch is null.
+   * This method navigates from the top to the leaf and checks if we have nulls anywhere in the
+   * branch as compared to checking just the leaf vector.
+   *
+   * @param vectorBranch The input vector branch from the root to the leaf
+   * @param idx          The row index being tested
+   * @return true if the entire branch is not null for the idx otherwise false
+   * @throws IllegalArgumentException If a multivalued vector such as List or Map is encountered in
+   *                                  the branch.
+   */
+  static boolean isNull(ColumnVector[] vectorBranch, int idx) throws IllegalArgumentException {
+    for (ColumnVector v : vectorBranch) {
+      if (v instanceof ListColumnVector || v instanceof MapColumnVector) {
+        throw new IllegalArgumentException(String.format(
+          "Found vector: %s in branch. List and Map vectors are not supported in isNull "
+          + "determination", v));
+      }
+      // v.noNulls = false does not mean that we have at least one null value
+      if (!v.noNulls && v.isNull[v.isRepeating ? 0 : idx]) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 84c5d91..7f88bb7 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -26,7 +26,6 @@ import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 /**
  * The interface for reading ORC files.
@@ -191,7 +190,7 @@ public interface Reader extends Closeable {
     private Boolean skipCorruptRecords = null;
     private TypeDescription schema = null;
     private String[] preFilterColumns = null;
-    Consumer<VectorizedRowBatch> skipRowCallback = null;
+    Consumer<OrcFilterContext> skipRowCallback = null;
     private DataReader dataReader = null;
     private Boolean tolerateMissingSchema = null;
     private boolean forcePositionalEvolution;
@@ -265,7 +264,7 @@ public interface Reader extends Closeable {
      *
      * @return this
      */
-    public Options setRowFilter(String[] filterColumnNames, Consumer<VectorizedRowBatch> filterCallback) {
+    public Options setRowFilter(String[] filterColumnNames, Consumer<OrcFilterContext> filterCallback) {
       this.preFilterColumns = filterColumnNames;
       this.skipRowCallback =  filterCallback;
       return this;
@@ -382,7 +381,7 @@ public interface Reader extends Closeable {
       return sarg;
     }
 
-    public Consumer<VectorizedRowBatch> getFilterCallback() {
+    public Consumer<OrcFilterContext> getFilterCallback() {
       return skipRowCallback;
     }
 
diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java b/java/core/src/java/org/apache/orc/TypeDescription.java
index 72beb45..a6c9944 100644
--- a/java/core/src/java/org/apache/orc/TypeDescription.java
+++ b/java/core/src/java/org/apache/orc/TypeDescription.java
@@ -783,24 +783,9 @@ public class TypeDescription
    * @return the subtype
    */
   public TypeDescription findSubtype(int goal) {
-    // call getId method to make sure the ids are assigned
-    int id = getId();
-    if (goal < id || goal > maxId) {
-      throw new IllegalArgumentException("Unknown type id " + id + " in " +
-          toJson());
-    }
-    if (goal == id) {
-      return this;
-    } else {
-      TypeDescription prev = null;
-      for(TypeDescription next: children) {
-        if (next.id > goal) {
-          return prev.findSubtype(goal);
-        }
-        prev = next;
-      }
-      return prev.findSubtype(goal);
-    }
+    ParserUtils.TypeFinder result = new ParserUtils.TypeFinder(this);
+    ParserUtils.findSubtype(this, goal, result);
+    return result.current;
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/impl/OrcFilterContextImpl.java b/java/core/src/java/org/apache/orc/impl/OrcFilterContextImpl.java
new file mode 100644
index 0000000..8769c5f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcFilterContextImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This defines the input for any filter operation. This is an extension of
+ * [[{@link VectorizedRowBatch}]] with schema.
+ * <p>
+ * This offers a convenience method of finding the column vector from a given column name
+ * that the filters can invoke to get access to the column vector.
+ */
+public class OrcFilterContextImpl implements OrcFilterContext {
+  private VectorizedRowBatch batch = null;
+  // Cache of field to ColumnVector, this is reset everytime the batch reference changes
+  private final Map<String, ColumnVector[]> vectors;
+  private final TypeDescription readSchema;
+
+  public OrcFilterContextImpl(TypeDescription readSchema) {
+    this.readSchema = readSchema;
+    this.vectors = new HashMap<>();
+  }
+
+  public OrcFilterContext setBatch(@NotNull VectorizedRowBatch batch) {
+    if (batch != this.batch) {
+      this.batch = batch;
+      vectors.clear();
+    }
+    return this;
+  }
+
+  /**
+   * For testing only
+   * @return The batch reference against which the cache is maintained
+   */
+  VectorizedRowBatch getBatch() {
+    return batch;
+  }
+
+  @Override
+  public void setFilterContext(boolean selectedInUse, int[] selected, int selectedSize) {
+    batch.setFilterContext(selectedInUse, selected, selectedSize);
+  }
+
+  @Override
+  public boolean validateSelected() {
+    return batch.validateSelected();
+  }
+
+  @Override
+  public int[] updateSelected(int i) {
+    return batch.updateSelected(i);
+  }
+
+  @Override
+  public void setSelectedInUse(boolean b) {
+    batch.setSelectedInUse(b);
+  }
+
+  @Override
+  public void setSelected(int[] ints) {
+    batch.setSelected(ints);
+  }
+
+  @Override
+  public void setSelectedSize(int i) {
+    batch.setSelectedSize(i);
+  }
+
+  @Override
+  public void reset() {
+    batch.reset();
+  }
+
+  @Override
+  public boolean isSelectedInUse() {
+    return batch.isSelectedInUse();
+  }
+
+  @Override
+  public int[] getSelected() {
+    return batch.getSelected();
+  }
+
+  @Override
+  public int getSelectedSize() {
+    return batch.getSelectedSize();
+  }
+
+  // For testing only
+  public ColumnVector[] getCols() {
+    return batch.cols;
+  }
+
+  @Override
+  public ColumnVector[] findColumnVector(String name) {
+    if (!vectors.containsKey(name)) {
+      vectors.put(name,
+                  ParserUtils.findColumnVectors(readSchema,
+                                                new ParserUtils.StringPosition(name),
+                                                true,
+                                                batch));
+    }
+    return vectors.get(name);
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/ParserUtils.java b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
index 559e3e1..fe2111b 100644
--- a/java/core/src/java/org/apache/orc/impl/ParserUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
@@ -18,6 +18,12 @@
 
 package org.apache.orc.impl;
 
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
 
 import java.util.ArrayList;
@@ -249,63 +255,136 @@ public class ParserUtils {
     return findSubtype(schema, source, true);
   }
 
+
+  public interface TypeVisitor {
+    /**
+     * As we navigate to the column, call this on each level
+     * @param type new level we are moving to
+     * @param posn the position relative to the parent
+     */
+    void visit(TypeDescription type, int posn);
+  }
+
+  public static class TypeFinder implements TypeVisitor {
+    public TypeDescription current;
+
+    public TypeFinder(TypeDescription schema) {
+      current = schema;
+    }
+
+    @Override
+    public void visit(TypeDescription type, int posn) {
+      current = type;
+    }
+  }
+
   public static TypeDescription findSubtype(TypeDescription schema,
                                             ParserUtils.StringPosition source,
                                             boolean isSchemaEvolutionCaseAware) {
-    List<String> names = ParserUtils.splitName(source);
+    TypeFinder result = new TypeFinder(removeAcid(schema));
+    findColumn(result.current, source, isSchemaEvolutionCaseAware, result);
+    return result.current;
+  }
+
+  private static TypeDescription removeAcid(TypeDescription schema) {
+    return SchemaEvolution.checkAcidSchema(schema)
+        ? SchemaEvolution.getBaseRow(schema) : schema;
+  }
+
+  private static int findCaseInsensitive(List<String> list, String goal) {
+    for (int i = 0; i < list.size(); i++) {
+      if (list.get(i).equalsIgnoreCase(goal)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static void findSubtype(TypeDescription schema,
+                                 int goal,
+                                 TypeVisitor visitor) {
+    TypeDescription current = schema;
+    int id = schema.getId();
+    if (goal < id || goal > schema.getMaximumId()) {
+      throw new IllegalArgumentException("Unknown type id " + id + " in " +
+          current.toJson());
+    }
+    while (id != goal) {
+      List<TypeDescription> children = current.getChildren();
+      for(int i=0; i < children.size(); ++i) {
+        TypeDescription child = children.get(i);
+        if (goal <= child.getMaximumId()) {
+          current = child;
+          visitor.visit(current, i);
+          break;
+        }
+      }
+      id = current.getId();
+    }
+  }
+
+  /**
+   * Find a column in a schema by walking down the type tree to find the right column.
+   * @param schema the schema to look in
+   * @param source the name of the column
+   * @param isSchemaEvolutionCaseAware should the string compare be case sensitive
+   * @param visitor The visitor, which is called on each level
+   */
+  public static void findColumn(TypeDescription schema,
+                                ParserUtils.StringPosition source,
+                                boolean isSchemaEvolutionCaseAware,
+                                TypeVisitor visitor) {
+    findColumn(schema, ParserUtils.splitName(source), isSchemaEvolutionCaseAware, visitor);
+  }
+
+  /**
+   * Find a column in a schema by walking down the type tree to find the right column.
+   * @param schema the schema to look in
+   * @param names the name of the column broken into a list of names per level
+   * @param isSchemaEvolutionCaseAware should the string compare be case sensitive
+   * @param visitor The visitor, which is called on each level
+   */
+  public static void findColumn(TypeDescription schema,
+                                List<String> names,
+                                boolean isSchemaEvolutionCaseAware,
+                                TypeVisitor visitor) {
     if (names.size() == 1 && INTEGER_PATTERN.matcher(names.get(0)).matches()) {
-      return schema.findSubtype(Integer.parseInt(names.get(0)));
+      findSubtype(schema, Integer.parseInt(names.get(0)), visitor);
+      return;
     }
-    TypeDescription current = SchemaEvolution.checkAcidSchema(schema)
-        ? SchemaEvolution.getBaseRow(schema) : schema;
+    TypeDescription current = schema;
+    int posn;
     while (names.size() > 0) {
       String first = names.remove(0);
       switch (current.getCategory()) {
         case STRUCT: {
-          int posn = -1;
-          if (isSchemaEvolutionCaseAware) {
-            posn = current.getFieldNames().indexOf(first);
-          } else {
-            // Case-insensitive search like ORC 1.5
-            for (int i = 0; i < current.getFieldNames().size(); i++) {
-              if (current.getFieldNames().get(i).equalsIgnoreCase(first)) {
-                posn = i;
-                break;
-              }
-            }
-          }
-          if (posn == -1) {
-            throw new IllegalArgumentException("Field " + first +
-                " not found in " + current.toString());
-          }
-          current = current.getChildren().get(posn);
+          posn = isSchemaEvolutionCaseAware
+              ? current.getFieldNames().indexOf(first)
+              : findCaseInsensitive(current.getFieldNames(), first);
           break;
         }
         case LIST:
           if (first.equals("_elem")) {
-            current = current.getChildren().get(0);
+            posn = 0;
           } else {
-            throw new IllegalArgumentException("Field " + first +
-                "not found in " + current.toString());
+            posn = -1;
           }
           break;
         case MAP:
           if (first.equals("_key")) {
-            current = current.getChildren().get(0);
+            posn = 0;
           } else if (first.equals("_value")) {
-            current = current.getChildren().get(1);
+            posn = 1;
           } else {
-            throw new IllegalArgumentException("Field " + first +
-                "not found in " + current.toString());
+            posn = -1;
           }
           break;
         case UNION: {
           try {
-            int posn = Integer.parseInt(first);
+            posn = Integer.parseInt(first);
             if (posn < 0 || posn >= current.getChildren().size()) {
               throw new NumberFormatException("off end of union");
             }
-            current = current.getChildren().get(posn);
           } catch (NumberFormatException e) {
             throw new IllegalArgumentException("Field " + first +
                 "not found in " + current.toString(), e);
@@ -313,11 +392,73 @@ public class ParserUtils {
           break;
         }
         default:
-          throw new IllegalArgumentException("Field " + first +
-              "not found in " + current.toString());
+          posn = -1;
+      }
+      if (posn < 0) {
+        throw new IllegalArgumentException("Field " + first +
+                                           " not found in " + current.toString());
+      }
+      current = current.getChildren().get(posn);
+      visitor.visit(current, posn);
+    }
+  }
+
+  static class ColumnFinder implements TypeVisitor {
+    private ColumnVector[] top;
+    private ColumnVector current = null;
+    private final ColumnVector[] result;
+    private int resultIdx = 0;
+
+    ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) {
+      top = batch.cols;
+      if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+        result = new ColumnVector[levels];
+      } else {
+        result = new ColumnVector[levels + 1];
+        current = top[0];
+        addResult(current);
       }
     }
-    return current;
+
+    private void addResult(ColumnVector vector) {
+      result[resultIdx] = vector;
+      resultIdx += 1;
+    }
+
+    @Override
+    public void visit(TypeDescription type, int posn) {
+      if (current == null) {
+        current = top[posn];
+        top = null;
+      } else {
+        current = navigate(current, posn);
+      }
+      addResult(current);
+    }
+
+    private ColumnVector navigate(ColumnVector parent, int posn) {
+      if (parent instanceof ListColumnVector) {
+        return ((ListColumnVector) parent).child;
+      } else if (parent instanceof StructColumnVector) {
+        return ((StructColumnVector) parent).fields[posn];
+      } else if (parent instanceof UnionColumnVector) {
+        return ((UnionColumnVector) parent).fields[posn];
+      } else if (parent instanceof MapColumnVector) {
+        MapColumnVector m = (MapColumnVector) parent;
+        return posn == 0 ? m.keys : m.values;
+      }
+      throw new IllegalArgumentException("Unknown complex column vector " + parent.getClass());
+    }
+  }
+
+  public static ColumnVector[] findColumnVectors(TypeDescription schema,
+                                                 StringPosition source,
+                                                 boolean isCaseSensitive,
+                                                 VectorizedRowBatch batch) {
+    List<String> names = ParserUtils.splitName(source);
+    ColumnFinder result = new ColumnFinder(schema, batch, names.size());
+    findColumn(removeAcid(schema), names, isCaseSensitive, result);
+    return result.result;
   }
 
   public static List<TypeDescription> findSubtypeList(TypeDescription schema,
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 6d27508..bb1ff9a 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.OrcProto;
+import org.apache.orc.OrcFilterContext;
 import org.apache.orc.impl.reader.ReaderEncryption;
 import org.apache.orc.impl.reader.StripePlanner;
 import org.apache.orc.impl.reader.tree.BatchReader;
@@ -67,7 +68,7 @@ public class TreeReaderFactory {
 
     Set<Integer> getColumnFilterIds();
 
-    Consumer<VectorizedRowBatch> getColumnFilterCallback();
+    Consumer<OrcFilterContext> getColumnFilterCallback();
 
     boolean isSkipCorrupt();
 
@@ -94,7 +95,7 @@ public class TreeReaderFactory {
     private boolean useProlepticGregorian;
     private boolean fileUsedProlepticGregorian;
     private Set<Integer> filterColumnIds = Collections.emptySet();
-    Consumer<VectorizedRowBatch> filterCallback;
+    Consumer<OrcFilterContext> filterCallback;
 
     public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
       this.evolution = evolution;
@@ -106,7 +107,7 @@ public class TreeReaderFactory {
       return this;
     }
 
-    public ReaderContext setFilterCallback(Set<Integer> filterColumnsList, Consumer<VectorizedRowBatch> filterCallback) {
+    public ReaderContext setFilterCallback(Set<Integer> filterColumnsList, Consumer<OrcFilterContext> filterCallback) {
       this.filterColumnIds = filterColumnsList;
       this.filterCallback = filterCallback;
       return this;
@@ -150,7 +151,7 @@ public class TreeReaderFactory {
     }
 
     @Override
-    public Consumer<VectorizedRowBatch> getColumnFilterCallback() {
+    public Consumer<OrcFilterContext> getColumnFilterCallback() {
       return filterCallback;
     }
 
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
index e6a2db8..6153c5d 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
@@ -19,6 +19,7 @@ package org.apache.orc.impl.reader.tree;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.impl.OrcFilterContextImpl;
 import org.apache.orc.impl.TreeReaderFactory;
 
 import java.io.IOException;
@@ -27,10 +28,12 @@ import java.util.Set;
 public class StructBatchReader extends BatchReader {
   // The reader context including row-filtering details
   private final TreeReaderFactory.Context context;
+  private final OrcFilterContextImpl filterContext;
 
   public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader, TreeReaderFactory.Context context) {
     super(rowReader);
     this.context = context;
+    this.filterContext = new OrcFilterContextImpl(context.getSchemaEvolution().getReaderSchema());
   }
 
   private void readBatchColumn(VectorizedRowBatch batch, TypeReader[] children, int batchSize, int index)
@@ -62,7 +65,7 @@ public class StructBatchReader extends BatchReader {
 
     // Apply filter callback to reduce number of # rows selected for decoding in the next TreeReaders
     if (!earlyExpandCols.isEmpty() && this.context.getColumnFilterCallback() != null) {
-      this.context.getColumnFilterCallback().accept(batch);
+      this.context.getColumnFilterCallback().accept(filterContext.setBatch(batch));
     }
 
     // Read the remaining columns applying row-level filtering
diff --git a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
new file mode 100644
index 0000000..8631202
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestOrcFilterContext {
+  private final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createString())
+    .addField("f3",
+              TypeDescription.createStruct()
+                .addField("a", TypeDescription.createInt())
+                .addField("b", TypeDescription.createLong())
+                .addField("c",
+                          TypeDescription.createMap(TypeDescription.createInt(),
+                                                    TypeDescription.createDate())))
+    .addField("f4",
+              TypeDescription.createList(TypeDescription.createStruct()
+                                           .addField("a", TypeDescription.createChar())
+                                           .addField("b", TypeDescription.createBoolean())))
+    .addField("f5",
+              TypeDescription.createMap(TypeDescription.createInt(),
+                                        TypeDescription.createDate()))
+    .addField("f6",
+              TypeDescription.createUnion()
+                .addUnionChild(TypeDescription.createInt())
+                .addUnionChild(TypeDescription.createStruct()
+                                 .addField("a", TypeDescription.createDate())
+                                 .addField("b",
+                                           TypeDescription.createList(TypeDescription.createChar()))
+                )
+    );
+  private final OrcFilterContext filterContext = new OrcFilterContextImpl(schema)
+    .setBatch(schema.createRowBatch());
+
+  @Before
+  public void setup() {
+    filterContext.reset();
+  }
+
+  @Test
+  public void testTopLevelElementaryType() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f1");
+    assertEquals(1, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(LongColumnVector.class));
+  }
+
+  @Test
+  public void testTopLevelCompositeType() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f3");
+    assertEquals(1, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(StructColumnVector.class));
+
+    vectorBranch = filterContext.findColumnVector("f4");
+    assertEquals(1, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(ListColumnVector.class));
+
+    vectorBranch = filterContext.findColumnVector("f5");
+    assertEquals(1, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(MapColumnVector.class));
+
+    vectorBranch = filterContext.findColumnVector("f6");
+    assertEquals(1, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(UnionColumnVector.class));
+  }
+
+  @Test
+  public void testNestedType() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f3.a");
+    assertEquals(2, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(StructColumnVector.class));
+    assertThat(vectorBranch[1], instanceOf(LongColumnVector.class));
+
+    vectorBranch = filterContext.findColumnVector("f3.c");
+    assertEquals(2, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(StructColumnVector.class));
+    assertThat(vectorBranch[1], instanceOf(MapColumnVector.class));
+
+    vectorBranch = filterContext.findColumnVector("f6.1.b");
+    assertEquals(3, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(UnionColumnVector.class));
+    assertThat(vectorBranch[1], instanceOf(StructColumnVector.class));
+    assertThat(vectorBranch[2], instanceOf(ListColumnVector.class));
+  }
+
+  @Test
+  public void testTopLevelVector() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f3");
+    vectorBranch[0].noNulls = true;
+    assertTrue(OrcFilterContext.noNulls(vectorBranch));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+
+    vectorBranch[0].noNulls = false;
+    vectorBranch[0].isNull[0] = true;
+    assertFalse(OrcFilterContext.noNulls(vectorBranch));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+  }
+
+  @Test
+  public void testNestedVector() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f3.a");
+    vectorBranch[0].noNulls = true;
+    vectorBranch[1].noNulls = true;
+    assertTrue(OrcFilterContext.noNulls(vectorBranch));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 2));
+
+    vectorBranch = filterContext.findColumnVector("f3.a");
+    vectorBranch[0].noNulls = false;
+    vectorBranch[0].isNull[0] = true;
+    vectorBranch[1].noNulls = true;
+    assertFalse(OrcFilterContext.noNulls(vectorBranch));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 2));
+
+    vectorBranch = filterContext.findColumnVector("f3.a");
+    vectorBranch[0].noNulls = true;
+    vectorBranch[1].noNulls = false;
+    vectorBranch[1].isNull[2] = true;
+    assertFalse(OrcFilterContext.noNulls(vectorBranch));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
+
+    vectorBranch = filterContext.findColumnVector("f3.a");
+    vectorBranch[0].noNulls = false;
+    vectorBranch[0].isNull[0] = true;
+    vectorBranch[1].noNulls = false;
+    vectorBranch[1].isNull[2] = true;
+    assertFalse(OrcFilterContext.noNulls(vectorBranch));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
+  }
+
+  @Test
+  public void testTopLevelList() {
+    TypeDescription topListSchema = TypeDescription.createList(
+      TypeDescription.createStruct()
+        .addField("a", TypeDescription.createChar())
+        .addField("b", TypeDescription
+          .createBoolean()));
+    OrcFilterContext fc = new OrcFilterContextImpl(topListSchema)
+      .setBatch(topListSchema.createRowBatch());
+    ColumnVector[] vectorBranch = fc.findColumnVector("_elem");
+    assertEquals(2, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(ListColumnVector.class));
+    assertThat(vectorBranch[1], instanceOf(StructColumnVector.class));
+  }
+
+  @Test
+  public void testUnsupportedIsNullUse() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f4._elem.a");
+    assertEquals(3, vectorBranch.length);
+    assertThat(vectorBranch[0], instanceOf(ListColumnVector.class));
+    assertThat(vectorBranch[1], instanceOf(StructColumnVector.class));
+    assertThat(vectorBranch[2], instanceOf(BytesColumnVector.class));
+
+    assertTrue(OrcFilterContext.noNulls(vectorBranch));
+    IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+                                                      () -> OrcFilterContext.isNull(vectorBranch,
+                                                                                    0));
+    assertThat(exception.getMessage(), containsString("ListColumnVector"));
+    assertThat(exception.getMessage(), containsString("List and Map vectors are not supported"));
+  }
+
+  @Test
+  public void testRepeatingVector() {
+    ColumnVector[] vectorBranch = filterContext.findColumnVector("f3.a");
+    vectorBranch[0].noNulls = true;
+    vectorBranch[0].isRepeating = true;
+    vectorBranch[1].noNulls = true;
+    assertTrue(OrcFilterContext.noNulls(vectorBranch));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+    assertFalse(OrcFilterContext.isNull(vectorBranch, 2));
+
+    vectorBranch[0].noNulls = false;
+    vectorBranch[0].isRepeating = true;
+    vectorBranch[0].isNull[0] = true;
+    vectorBranch[1].noNulls = true;
+    assertFalse(OrcFilterContext.noNulls(vectorBranch));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 1));
+    assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
index 10d5594..2052bea 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.OrcFilterContextImpl;
 import org.apache.orc.impl.RecordReaderImpl;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,6 +45,7 @@ import java.text.Format;
 import java.text.SimpleDateFormat;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Types that are skipped at row-level include: Decimal, Decimal64, Double, Float, Char, VarChar, String, Boolean, Timestamp
@@ -77,52 +80,54 @@ public class TestRowFilteringSkip {
   }
 
   // Filter all rows except: 924 and 940
-  public static void intAnyRowFilter(VectorizedRowBatch batch) {
+  public static void intAnyRowFilter(OrcFilterContext batch) {
     // Dummy Filter implementation passing just one Batch row
     int newSize = 2;
-    batch.selected[0] = batch.size-100;
-    batch.selected[1] = 940;
-    batch.selectedInUse = true;
-    batch.size = newSize;
+    batch.getSelected()[0] = batch.getSelectedSize()-100;
+    batch.getSelected()[1] = 940;
+    batch.setSelectedInUse(true);
+    batch.setSelectedSize(newSize);
   }
 
   // Filter all rows except the first one
-  public static void intFirstRowFilter(VectorizedRowBatch batch) {
+  public static void intFirstRowFilter(OrcFilterContext batch) {
     int newSize = 0;
-    for (int row = 0; row <batch.size; ++row) {
+    for (int row = 0; row <batch.getSelectedSize(); ++row) {
       if (row == 0) {
-        batch.selected[newSize++] = row;
+        batch.getSelected()[newSize++] = row;
       }
     }
-    batch.selectedInUse = true;
-    batch.size = newSize;
+    batch.setSelectedInUse(true);
+    batch.setSelectedSize(newSize);
   }
 
   // Filter out rows in a round-robbin fashion starting with a pass
-  public static void intRoundRobbinRowFilter(VectorizedRowBatch batch) {
+  public static void intRoundRobbinRowFilter(OrcFilterContext batch) {
     int newSize = 0;
-    for (int row = 0; row < batch.size; ++row) {
+    int[] selected = batch.getSelected();
+    for (int row = 0; row < batch.getSelectedSize(); ++row) {
       if ((row % 2) == 0) {
-        batch.selected[newSize++] = row;
+        selected[newSize++] = row;
       }
     }
-    batch.selectedInUse = true;
-    batch.size = newSize;
+    batch.setSelectedInUse(true);
+    batch.setSelected(selected);
+    batch.setSelectedSize(newSize);
   }
 
   static int rowCount = 0;
-  public static void intCustomValueFilter(VectorizedRowBatch batch) {
-    LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+  public static void intCustomValueFilter(OrcFilterContext batch) {
+    LongColumnVector col1 = (LongColumnVector) ((OrcFilterContextImpl) batch).getCols()[0];
     int newSize = 0;
-    for (int row = 0; row <batch.size; ++row) {
+    for (int row = 0; row <batch.getSelectedSize(); ++row) {
       long val = col1.vector[row];
       if ((val == 2) || (val == 5) || (val == 13) || (val == 29) || (val == 70)) {
-        batch.selected[newSize++] = row;
+        batch.getSelected()[newSize++] = row;
       }
       rowCount++;
     }
-    batch.selectedInUse = true;
-    batch.size = newSize;
+    batch.setSelectedInUse(true);
+    batch.setSelectedSize(newSize);
   }
 
   @Test
@@ -1293,15 +1298,24 @@ public class TestRowFilteringSkip {
     }
   }
 
-  private static void notNullFilterMissing(VectorizedRowBatch batch) {
+  private static void notNullFilterMissing(OrcFilterContext batch) {
     int selIdx = 0;
-    for (int i = 0; i < batch.size; i++) {
-      if (!batch.cols[2].isNull[i]) {
-        batch.selected[selIdx++] = i;
+    ColumnVector cv = ((OrcFilterContextImpl) batch).getCols()[2];
+    if (cv.isRepeating) {
+      if (!cv.isNull[0]) {
+        for (int i = 0; i < batch.getSelectedSize(); i++) {
+          batch.getSelected()[selIdx++] = i;
+        }
+      }
+    } else {
+      for (int i = 0; i < batch.getSelectedSize(); i++) {
+        if (!((OrcFilterContextImpl) batch).getCols()[2].isNull[i]) {
+          batch.getSelected()[selIdx++] = i;
+        }
       }
     }
-    batch.selectedInUse = true;
-    batch.size = selIdx;
+    batch.setSelectedInUse(true);
+    batch.setSelectedSize(selIdx);
   }
 
   @Test
@@ -1325,25 +1339,36 @@ public class TestRowFilteringSkip {
       int noNullCnt = 0;
       while (rows.nextBatch(batch)) {
         Assert.assertTrue(batch.selectedInUse);
-        Assert.assertTrue(batch.selected != null);
+        Assert.assertNotNull(batch.selected);
         // Rows are filtered so it should never be 1024
         Assert.assertTrue(batch.size != ColumnBatchRows);
-        assertEquals( true, col1.noNulls);
+        assertTrue(col1.noNulls);
         for (int r = 0; r < ColumnBatchRows; ++r) {
-          if (col1.vector[r] != 100)
-            noNullCnt ++;
+          if (col1.vector[r] != 100) noNullCnt ++;
+        }
+        // We should always select 1 row as the file is spaced as such. We could get 0 in case all
+        // the rows are filtered out.
+        if (batch.size == 0) {
+          continue;
         }
+        Assert.assertEquals(1, batch.size);
+        long val = col1.vector[batch.selected[0]] ;
+        // Check that we have read the valid value
+        Assert.assertTrue((val == 2) || (val == 5) || (val == 13) || (val == 29) || (val == 70));
+        if (val == 2) {
+          Assert.assertEquals(0, col5.getTime(batch.selected[0]));
+        } else {
+          Assert.assertNotEquals(0, col5.getTime(batch.selected[0]));
+        }
+
+        // Check that unselected is not populated
+        Assert.assertEquals(0, batch.selected[1]);
       }
 
       // Total rows of the file should be 25k
       Assert.assertEquals(25000, rowCount);
       // Make sure that our filter worked ( 5 rows with userId != 100)
       Assert.assertEquals(5, noNullCnt);
-      Assert.assertEquals(false, col5.isRepeating);
-      Assert.assertEquals(544, batch.selected[0]);
-      Assert.assertEquals(0, batch.selected[1]);
-      Assert.assertTrue(col5.getTime(0) == 0);
-      Assert.assertTrue(col5.getTime(544) != 0);
     }
   }
 }
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcFilterContextImpl.java b/java/core/src/test/org/apache/orc/impl/TestOrcFilterContextImpl.java
new file mode 100644
index 0000000..acf14ce
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestOrcFilterContextImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class TestOrcFilterContextImpl {
+
+  private final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createStruct()
+      .addField("f2a", TypeDescription.createLong())
+      .addField("f2b", TypeDescription.createString()))
+    .addField("f3", TypeDescription.createString());
+
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testSuccessfulRetrieval() {
+    VectorizedRowBatch b = createBatch();
+    OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+    fc.setBatch(b);
+
+    validateF1Vector(fc.findColumnVector("f1"), 1);
+    validateF2Vector(fc.findColumnVector("f2"));
+    validateF2AVector(fc.findColumnVector("f2.f2a"));
+    validateF2BVector(fc.findColumnVector("f2.f2b"));
+    validateF3Vector(fc.findColumnVector("f3"));
+  }
+
+  @Test
+  public void testSuccessfulRetrievalWithBatchChange() {
+    VectorizedRowBatch b1 = createBatch();
+    VectorizedRowBatch b2 = createBatch();
+    ((LongColumnVector) b2.cols[0]).vector[0] = 100;
+    OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+    fc.setBatch(b1);
+    validateF1Vector(fc.findColumnVector("f1"), 1);
+    // Change the batch
+    fc.setBatch(b2);
+    validateF1Vector(fc.findColumnVector("f1"), 100);
+  }
+
+  @Test
+  public void testMissingFieldTopLevel() {
+    VectorizedRowBatch b = createBatch();
+    OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+    fc.setBatch(b);
+
+    // Missing field at top level
+    IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+                                                      () -> fc.findColumnVector("f4"));
+    assertThat(exception.getMessage(), containsString("Field f4 not found in"));
+  }
+
+  @Test
+  public void testMissingFieldNestedLevel() {
+    VectorizedRowBatch b = createBatch();
+    OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+    fc.setBatch(b);
+
+    // Missing field at top level
+    IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+                                                      () -> fc.findColumnVector("f2.c"));
+    assertThat(exception.getMessage(),
+               containsString("Field c not found in struct<f2a:bigint,f2b:string>"));
+  }
+
+  @Test
+  public void testPropagations() {
+    OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+    assertNull(fc.getBatch());
+    fc.setBatch(schema.createRowBatch());
+    assertNotNull(fc.getBatch());
+    assertFalse(fc.isSelectedInUse());
+
+    // Set selections
+    fc.setSelectedInUse(true);
+    fc.getSelected()[0] = 5;
+    fc.setSelectedSize(1);
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(1, fc.getSelectedSize());
+    assertEquals(fc.getBatch().getMaxSize(), fc.getSelected().length);
+    assertArrayEquals(new int[] {5}, Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+    assertTrue(fc.validateSelected());
+    fc.setSelectedSize(2);
+    assertFalse(fc.validateSelected());
+
+    // Use a new selected vector
+    fc.setSelected(new int[fc.getBatch().getMaxSize()]);
+    assertArrayEquals(new int[] {0, 0}, Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+
+    // Increase the size of the vector
+    fc.reset();
+    assertFalse(fc.isSelectedInUse());
+    int currSize = fc.getBatch().getMaxSize();
+    assertEquals(currSize, fc.getSelected().length);
+    fc.updateSelected(currSize + 1);
+    assertEquals(currSize + 1, fc.getSelected().length);
+
+    // Set the filter context
+    fc.setFilterContext(true, new int[3], 1);
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(3, fc.getBatch().getMaxSize());
+    assertEquals(1, fc.getSelectedSize());
+  }
+
+  private VectorizedRowBatch createBatch() {
+    VectorizedRowBatch b = schema.createRowBatch();
+    LongColumnVector v1 = (LongColumnVector) b.cols[0];
+    StructColumnVector v2 = (StructColumnVector) b.cols[1];
+    LongColumnVector v2a = (LongColumnVector) v2.fields[0];
+    BytesColumnVector v2b = (BytesColumnVector) v2.fields[1];
+    BytesColumnVector v3 = (BytesColumnVector) b.cols[2];
+
+    v1.vector[0] = 1;
+    v2a.vector[0] = 2;
+    v2b.setVal(0, "3".getBytes(StandardCharsets.UTF_8));
+    v3.setVal(0, "4".getBytes(StandardCharsets.UTF_8));
+    return b;
+  }
+
+  private void validateF1Vector(ColumnVector[] v, long headValue) {
+    assertEquals(1, v.length);
+    validateF1Vector(v[0], headValue);
+  }
+
+  private void validateF1Vector(ColumnVector v, long headValue) {
+    LongColumnVector l = (LongColumnVector) v;
+    assertEquals(headValue, l.vector[0]);
+  }
+
+  private void validateF2Vector(ColumnVector[] v) {
+    assertEquals(1, v.length);
+    validateF2Vector(v[0]);
+  }
+
+  private void validateF2Vector(ColumnVector v) {
+    StructColumnVector s = (StructColumnVector) v;
+    validateF2AVector(s.fields[0]);
+    validateF2BVector(s.fields[1]);
+  }
+
+  private void validateF2AVector(ColumnVector[] v) {
+    assertEquals(2, v.length);
+    validateF2Vector(v[0]);
+    validateF2AVector(v[1]);
+  }
+
+  private void validateF2AVector(ColumnVector v) {
+    LongColumnVector l = (LongColumnVector) v;
+    assertEquals(2, l.vector[0]);
+  }
+
+  private void validateF2BVector(ColumnVector[] v) {
+    assertEquals(2, v.length);
+    validateF2Vector(v[0]);
+    validateF2BVector(v[1]);
+  }
+
+  private void validateF2BVector(ColumnVector v) {
+    BytesColumnVector b = (BytesColumnVector) v;
+    assertEquals("3", b.toString(0));
+  }
+
+  private void validateF3Vector(ColumnVector[] v) {
+    assertEquals(1, v.length);
+    BytesColumnVector b = (BytesColumnVector) v[0];
+    assertEquals("4", b.toString(0));
+  }
+}
\ No newline at end of file