You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/08/10 23:27:43 UTC

[orc] branch branch-1.7 updated: ORC-743: Added conversion of SArg into filters to take advantage of the LazyIO introduced by ORC-742. * Created Vector filters for leaf, And, Or, Batch * Created specific filters for each sarg operator and type * Test code for data type and operator filters

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new f100c3a  ORC-743: Added conversion of SArg into filters to take advantage of the LazyIO introduced by ORC-742. * Created Vector filters for leaf, And, Or, Batch * Created specific filters for each sarg operator and type * Test code for data type and operator filters
f100c3a is described below

commit f100c3aae060aafce7259548e6b9f213a26e9190
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Tue Jun 8 12:15:52 2021 -0700

    ORC-743: Added conversion of SArg into filters to take advantage of the LazyIO
    introduced by ORC-742.
    * Created Vector filters for leaf, And, Or, Batch
    * Created specific filters for each sarg operator and type
    * Test code for data type and operator filters
    
    Fixes #716
    
    Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
 java/core/src/java/org/apache/orc/OrcConf.java     |   9 +-
 java/core/src/java/org/apache/orc/Reader.java      |  24 ++
 .../java/org/apache/orc/filter/BatchFilter.java    |  38 ++
 .../java/org/apache/orc/impl/RecordReaderImpl.java |  35 +-
 .../org/apache/orc/impl/TreeReaderFactory.java     |  10 +-
 .../java/org/apache/orc/impl/filter/AndFilter.java |  48 +++
 .../apache/orc/impl/filter/BatchFilterFactory.java | 140 +++++++
 .../org/apache/orc/impl/filter/FilterFactory.java  | 142 +++++++
 .../apache/orc/impl/filter/IsNotNullFilter.java    |  60 +++
 .../org/apache/orc/impl/filter/IsNullFilter.java   |  60 +++
 .../org/apache/orc/impl/filter/LeafFilter.java     |  84 ++++
 .../java/org/apache/orc/impl/filter/OrFilter.java  |  49 +++
 .../java/org/apache/orc/impl/filter/Selected.java  | 139 +++++++
 .../org/apache/orc/impl/filter/VectorFilter.java   |  42 ++
 .../orc/impl/filter/leaf/DecimalFilters.java       | 109 +++++
 .../apache/orc/impl/filter/leaf/FloatFilters.java  | 108 +++++
 .../orc/impl/filter/leaf/LeafFilterFactory.java    | 275 ++++++++++++
 .../apache/orc/impl/filter/leaf/LongFilters.java   | 108 +++++
 .../apache/orc/impl/filter/leaf/StringFilters.java | 123 ++++++
 .../orc/impl/filter/leaf/TimestampFilters.java     | 110 +++++
 .../java/org/apache/orc/util/CuckooSetBytes.java   | 462 +++++++++++++++++++++
 .../apache/orc/TestRowFilteringComplexTypes.java   |   1 +
 .../org/apache/orc/TestRowFilteringIOSkip.java     | 254 ++++++++++-
 .../org/apache/orc/TestRowFilteringNoSkip.java     |   1 +
 .../test/org/apache/orc/TestRowFilteringSkip.java  |   1 +
 .../org/apache/orc/impl/filter/ATestFilter.java    | 167 ++++++++
 .../org/apache/orc/impl/filter/FilterUtils.java    |  36 ++
 .../apache/orc/impl/filter/IsNullFilterTest.java   | 131 ++++++
 .../org/apache/orc/impl/filter/TestAndFilter.java  |  89 ++++
 .../org/apache/orc/impl/filter/TestConvFilter.java | 231 +++++++++++
 .../org/apache/orc/impl/filter/TestNotFilter.java  | 141 +++++++
 .../org/apache/orc/impl/filter/TestOrFilter.java   |  63 +++
 .../org/apache/orc/impl/filter/TestSelected.java   | 183 ++++++++
 .../orc/impl/filter/leaf/ATestLeafFilter.java      | 131 ++++++
 .../orc/impl/filter/leaf/TestDecimalFilters.java   | 182 ++++++++
 .../apache/orc/impl/filter/leaf/TestEquals.java    |  70 ++++
 .../apache/orc/impl/filter/leaf/TestFilters.java   | 232 +++++++++++
 .../orc/impl/filter/leaf/TestFloatFilters.java     | 183 ++++++++
 .../orc/impl/filter/leaf/TestLongFilters.java      | 183 ++++++++
 .../orc/impl/filter/leaf/TestStringFilters.java    | 183 ++++++++
 .../orc/impl/filter/leaf/TestTimestampFilters.java | 183 ++++++++
 .../org/apache/orc/util/CuckooSetBytesTest.java    | 127 ++++++
 java/mapreduce/pom.xml                             |   4 +
 .../java/org/apache/orc/mapred/OrcInputFormat.java |   6 +-
 .../apache/orc/mapred/OrcMapredRecordReader.java   |   5 +-
 .../org/apache/orc/mapreduce/OrcInputFormat.java   |  11 +-
 .../orc/mapreduce/OrcMapreduceRecordReader.java    |   5 +-
 .../org/apache/orc/mapred/TestMapRedFiltering.java | 152 +++++++
 .../org/apache/orc/mapreduce/FilterTestUtil.java   | 157 +++++++
 .../orc/mapreduce/TestMapReduceFiltering.java      | 164 ++++++++
 50 files changed, 5430 insertions(+), 21 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java
index 391ef5f..a2c1ccb 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -160,11 +160,18 @@ public enum OrcConf {
   ROWS_BETWEEN_CHECKS("orc.rows.between.memory.checks", "orc.rows.between.memory.checks", 5000,
     "How often should MemoryManager check the memory sizes? Measured in rows\n" +
       "added to all of the writers.  Valid range is [1,10000] and is primarily meant for" +
-      "n\testing.  Setting this too low may negatively affect performance."),
+      "testing.  Setting this too low may negatively affect performance."),
   OVERWRITE_OUTPUT_FILE("orc.overwrite.output.file", "orc.overwrite.output.file", false,
     "A boolean flag to enable overwriting of the output file if it already exists.\n"),
   IS_SCHEMA_EVOLUTION_CASE_SENSITIVE("orc.schema.evolution.case.sensitive", "orc.schema.evolution.case.sensitive", true,
           "A boolean flag to determine if the comparision of field names in schema evolution is case sensitive .\n"),
+  ALLOW_SARG_TO_FILTER("orc.sarg.to.filter", "org.sarg.to.filter", false,
+                       "A boolean flag to determine if a SArg is allowed to become a filter"),
+  READER_USE_SELECTED("orc.filter.use.selected", "orc.filter.use.selected", false,
+                        "A boolean flag to determine if the selected vector is supported by\n"
+                        + "the reading application. If false, the output of the ORC reader must have the filter\n"
+                        + "reapplied to avoid using unset values in the unselected rows.\n"
+                        + "If unsure please leave this as false."),
   WRITE_VARIABLE_LENGTH_BLOCKS("orc.write.variable.length.blocks", null, false,
       "A boolean flag as to whether the ORC writer should write variable length\n"
       + "HDFS blocks."),
diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 7f88bb7..51c57a2 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -197,6 +197,8 @@ public interface Reader extends Closeable {
     private boolean isSchemaEvolutionCaseAware =
         (boolean) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.getDefaultValue();
     private boolean includeAcidColumns = true;
+    private boolean allowSARGToFilter = false;
+    private boolean useSelected = false;
 
     public Options() {
       // PASS
@@ -210,6 +212,8 @@ public interface Reader extends Closeable {
       positionalEvolutionLevel = OrcConf.FORCE_POSITIONAL_EVOLUTION_LEVEL.getInt(conf);
       isSchemaEvolutionCaseAware =
           OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.getBoolean(conf);
+      allowSARGToFilter = OrcConf.ALLOW_SARG_TO_FILTER.getBoolean(conf);
+      useSelected = OrcConf.READER_USE_SELECTED.getBoolean(conf);
     }
 
     /**
@@ -282,6 +286,15 @@ public interface Reader extends Closeable {
       return this;
     }
 
+    public Options allowSARGToFilter(boolean allowSARGToFilter) {
+      this.allowSARGToFilter = allowSARGToFilter;
+      return this;
+    }
+
+    public boolean isAllowSARGToFilter() {
+      return allowSARGToFilter;
+    }
+
     /**
      * Set whether to use zero copy from HDFS.
      * @param value the new zero copy flag
@@ -471,6 +484,8 @@ public interface Reader extends Closeable {
         schema.printToBuffer(buffer);
       }
       buffer.append(", includeAcidColumns: ").append(includeAcidColumns);
+      buffer.append(", allowSARGToFilter: ").append(allowSARGToFilter);
+      buffer.append(", useSelected: ").append(useSelected);
       buffer.append("}");
       return buffer.toString();
     }
@@ -479,6 +494,15 @@ public interface Reader extends Closeable {
       return tolerateMissingSchema != null ? tolerateMissingSchema :
           (Boolean) OrcConf.TOLERATE_MISSING_SCHEMA.getDefaultValue();
     }
+
+    public boolean useSelected() {
+      return useSelected;
+    }
+
+    public Options useSelected(boolean newValue) {
+      this.useSelected = newValue;
+      return this;
+    }
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/filter/BatchFilter.java b/java/core/src/java/org/apache/orc/filter/BatchFilter.java
new file mode 100644
index 0000000..513f56c
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/filter/BatchFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.filter;
+
+import org.apache.orc.OrcFilterContext;
+
+import java.util.function.Consumer;
+
+/**
+ * Defines a batch filter that can operate on a
+ * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch} and filter rows by using the
+ * selected vector to determine the eligible rows.
+ */
+public interface BatchFilter extends Consumer<OrcFilterContext> {
+
+  /**
+   * Identifies the filter column names. These columns will be read before the filter is applied.
+   *
+   * @return Names of the filter columns
+   */
+  String[] getColumnNames();
+}
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index ed8fa83..1d58548 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -45,6 +45,7 @@ import org.apache.orc.DoubleColumnStatistics;
 import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
 import org.apache.orc.OrcProto;
 import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
@@ -52,6 +53,8 @@ import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TimestampColumnStatistics;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.impl.filter.FilterFactory;
 import org.apache.orc.impl.reader.ReaderEncryption;
 import org.apache.orc.impl.reader.StripePlanner;
 import org.apache.orc.impl.reader.tree.BatchReader;
@@ -71,6 +74,7 @@ import java.util.List;
 import java.util.SortedSet;
 import java.util.TimeZone;
 import java.util.TreeSet;
+import java.util.function.Consumer;
 
 public class RecordReaderImpl implements RecordReader {
   static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
@@ -104,6 +108,7 @@ public class RecordReaderImpl implements RecordReader {
   private final TypeReader.ReadPhase startReadPhase;
   // identifies that follow columns bytes must be read
   private boolean needsFollowColumnsRead;
+  private final boolean noSelectedVector;
 
   /**
    * Given a list of column names, find the given column and return the index.
@@ -204,6 +209,9 @@ public class RecordReaderImpl implements RecordReader {
             "file schema:   " + fileReader.getSchema());
       }
     }
+
+    this.noSelectedVector = !options.useSelected();
+    LOG.info("noSelectedVector={}", this.noSelectedVector);
     this.schema = evolution.getReaderSchema();
     this.path = fileReader.path;
     this.rowIndexStride = fileReader.rowIndexStride;
@@ -271,10 +279,22 @@ public class RecordReaderImpl implements RecordReader {
       skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
     }
 
+    String[] filterCols = null;
+    Consumer<OrcFilterContext> filterCallBack = null;
+    BatchFilter filter = FilterFactory.createBatchFilter(options,
+                                                         evolution.getReaderBaseSchema(),
+                                                         fileReader.getFileVersion(),
+                                                         false);
+    if (filter != null) {
+      // If a filter is determined then use this
+      filterCallBack = filter;
+      filterCols = filter.getColumnNames();
+    }
+
     // Map columnNames to ColumnIds
     SortedSet<Integer> filterColIds = new TreeSet<>();
-    if (options.getPreFilterColumnNames() != null) {
-      for (String colName : options.getPreFilterColumnNames()) {
+    if (filterCols != null) {
+      for (String colName : filterCols) {
         TypeDescription expandCol = findColumnType(evolution, colName);
         // If the column is not present in the file then this can be ignored from read.
         if (expandCol == null || expandCol.getId() == -1) {
@@ -300,7 +320,7 @@ public class RecordReaderImpl implements RecordReader {
     TreeReaderFactory.ReaderContext readerContext =
         new TreeReaderFactory.ReaderContext()
           .setSchemaEvolution(evolution)
-          .setFilterCallback(filterColIds, options.getFilterCallback())
+          .setFilterCallback(filterColIds, filterCallBack)
           .skipCorrupt(skipCorrupt)
           .fileFormat(fileReader.getFileVersion())
           .useUTCTimestamp(fileReader.useUTCTimestamp)
@@ -1313,6 +1333,15 @@ public class RecordReaderImpl implements RecordReader {
         advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
         // batch.size can be modified by filter so only batchSize can tell if we actually read rows
       } while (batchSize != 0 && batch.size == 0);
+
+      if (noSelectedVector) {
+        // In case selected vector is not supported we leave the size to be read size. In this case
+        // the non filter columns might be read selectively, however the filter after the reader
+        // should eliminate rows that don't match predicate conditions
+        batch.size = batchSize;
+        batch.selectedInUse = false;
+      }
+
       return batchSize != 0;
     } catch (IOException e) {
       // Rethrow exception with file name in log message
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index eeff360..b80f2fc 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -302,7 +302,7 @@ public class TreeReaderFactory {
      * Seek to the given position.
      *
      * @param index the indexes loaded from the file
-     * @param readPhase
+     * @param readPhase the current readPhase
      * @throws IOException
      */
     public void seek(PositionProvider[] index, ReadPhase readPhase) throws IOException {
@@ -2941,8 +2941,7 @@ public class TreeReaderFactory {
       case DATE:
         return new DateTreeReader(fileType.getId(), context);
       case DECIMAL:
-        if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
-            fileType.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION){
+        if (isDecimalAsLong(version, fileType.getPrecision())){
           return new Decimal64TreeReader(fileType.getId(), fileType.getPrecision(),
               fileType.getScale(), context);
         }
@@ -2962,6 +2961,11 @@ public class TreeReaderFactory {
     }
   }
 
+  public static boolean isDecimalAsLong(OrcFile.Version version, int precision) {
+    return version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
+    precision <= TypeDescription.MAX_DECIMAL64_PRECISION;
+  }
+
   public static BatchReader createRootReader(TypeDescription readerType, Context context)
           throws IOException {
     TypeReader reader = createTreeReader(readerType, context);
diff --git a/java/core/src/java/org/apache/orc/impl/filter/AndFilter.java b/java/core/src/java/org/apache/orc/impl/filter/AndFilter.java
new file mode 100644
index 0000000..0266862
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/AndFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.orc.OrcFilterContext;
+
+public class AndFilter implements VectorFilter {
+  public final VectorFilter[] filters;
+  private final Selected andBound = new Selected();
+  private final Selected andOut = new Selected();
+
+  public AndFilter(VectorFilter[] filters) {
+    this.filters = filters;
+  }
+
+  @Override
+  public void filter(OrcFilterContext fc,
+                     Selected bound,
+                     Selected selOut) {
+    // Each filter restricts the current selection. Make a copy of the current selection that will
+    // be used for the next filter and finally output to selOut
+    andBound.set(bound);
+    andOut.ensureSize(bound.selSize);
+    for (VectorFilter f : filters) {
+      andOut.clear();
+      f.filter(fc, andBound, andOut);
+      // Make the current selection the bound for the next filter in AND
+      andBound.set(andOut);
+    }
+    selOut.set(andOut);
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java b/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java
new file mode 100644
index 0000000..da2afee
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.filter.BatchFilter;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * Provides an abstraction layer between the VectorFilters and the
+ * Consumer&lt;OrcFilterContext&gt;.
+ */
+class BatchFilterFactory {
+  static BatchFilter create(List<BatchFilter> filters) {
+    if (filters.isEmpty()) {
+      return null;
+    } else if (filters.size() == 1) {
+      return filters.get(0);
+    } else {
+      return new AndBatchFilterImpl(filters.toArray(new BatchFilter[0]));
+    }
+  }
+
+  static BatchFilter create(Consumer<OrcFilterContext> filter,
+                                   String[] colNames) {
+    return filter instanceof BatchFilter ? (BatchFilter) filter
+        : new WrappedFilterImpl(filter, colNames);
+  }
+
+  static BatchFilter create(VectorFilter filter, String[] colNames) {
+    return new BatchFilterImpl(filter, colNames);
+  }
+
+  /**
+   * Use to wrap the VectorFilter for application by the BatchReader
+   */
+  private static class BatchFilterImpl implements BatchFilter {
+    final VectorFilter filter;
+    private final String[] colNames;
+    private final Selected bound = new Selected();
+    private final Selected selOut = new Selected();
+
+    private BatchFilterImpl(VectorFilter filter, String[] colNames) {
+      this.filter = filter;
+      this.colNames = colNames;
+    }
+
+    @Override
+    public void accept(OrcFilterContext fc) {
+      // Define the bound to be the batch size
+      bound.initialize(fc);
+      // selOut is set to the selectedVector
+      selOut.sel = fc.getSelected();
+      selOut.selSize = 0;
+      filter.filter(fc, bound, selOut);
+
+      if (selOut.selSize < fc.getSelectedSize()) {
+        fc.setSelectedSize(selOut.selSize);
+        fc.setSelectedInUse(true);
+      } else if (selOut.selSize > fc.getSelectedSize()) {
+        throw new RuntimeException(
+          String.format("Unexpected state: Filtered size %s > input size %s",
+                        selOut.selSize, fc.getSelectedSize()));
+      }
+    }
+
+    @Override
+    public String[] getColumnNames() {
+      return colNames;
+    }
+  }
+
+  private static class AndBatchFilterImpl implements BatchFilter {
+    private final BatchFilter[] filters;
+    private final String[] colNames;
+
+    AndBatchFilterImpl(BatchFilter... filters) {
+      this.filters = filters;
+      Set<String> names = new HashSet<>();
+      for (BatchFilter filter : this.filters) {
+        names.addAll(Arrays.asList(filter.getColumnNames()));
+      }
+      this.colNames = names.toArray(new String[0]);
+    }
+
+    @Override
+    public void accept(OrcFilterContext fc) {
+      for (int i = 0; fc.getSelectedSize() > 0 && i < filters.length; i++) {
+        filters[i].accept(fc);
+      }
+    }
+
+    @Override
+    public String[] getColumnNames() {
+      return colNames;
+    }
+
+  }
+
+  private static class WrappedFilterImpl implements BatchFilter {
+    private final Consumer<OrcFilterContext> filter;
+    private final String[] colNames;
+
+    private WrappedFilterImpl(Consumer<OrcFilterContext> filter, String[] colNames) {
+      this.filter = filter;
+      this.colNames = colNames;
+    }
+
+    @Override
+    public String[] getColumnNames() {
+      return colNames;
+    }
+
+    @Override
+    public void accept(OrcFilterContext filterContext) {
+      filter.accept(filterContext);
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java b/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java
new file mode 100644
index 0000000..7fbf991
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.impl.filter.leaf.LeafFilterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class FilterFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(FilterFactory.class);
+
+  /**
+   * Create a BatchFilter. This considers both the input filter and the SearchArgument filter. If
+   * both are available then they are compounded by AND.
+   *
+   * @param opts       for reading the file
+   * @param readSchema that should be used
+   * @param version    provides the ORC file version
+   * @param normalize  identifies if the SArg should be normalized or not
+   * @return BatchFilter that represents the SearchArgument or null
+   */
+  public static BatchFilter createBatchFilter(Reader.Options opts,
+                                              TypeDescription readSchema,
+                                              OrcFile.Version version,
+                                              boolean normalize) {
+    List<BatchFilter> filters = new ArrayList<>(2);
+
+    // 1. Process SArgFilter
+    if (opts.isAllowSARGToFilter() && opts.getSearchArgument() != null) {
+      SearchArgument sArg = opts.getSearchArgument();
+      Set<String> colNames = new HashSet<>();
+      try {
+        ExpressionTree exprTree = normalize ? sArg.getExpression() : sArg.getCompactExpression();
+        LOG.debug("normalize={}, using expressionTree={}", normalize, exprTree);
+        filters.add(BatchFilterFactory.create(createSArgFilter(exprTree,
+                                                               colNames,
+                                                               sArg.getLeaves(),
+                                                               readSchema,
+                                                               version),
+                                              colNames.toArray(new String[0])));
+      } catch (UnSupportedSArgException e) {
+        LOG.warn("SArg: {} is not supported\n{}", sArg, e.getMessage());
+      }
+    }
+
+    // 2. Process input filter
+    if (opts.getFilterCallback() != null) {
+      filters.add(BatchFilterFactory.create(opts.getFilterCallback(),
+                                            opts.getPreFilterColumnNames()));
+    }
+    return BatchFilterFactory.create(filters);
+  }
+
+  public static VectorFilter createSArgFilter(ExpressionTree expr,
+                                              Set<String> colIds,
+                                              List<PredicateLeaf> leaves,
+                                              TypeDescription readSchema,
+                                              OrcFile.Version version)
+    throws UnSupportedSArgException {
+    VectorFilter result;
+    switch (expr.getOperator()) {
+      case OR:
+        VectorFilter[] orFilters = new VectorFilter[expr.getChildren().size()];
+        for (int i = 0; i < expr.getChildren().size(); i++) {
+          orFilters[i] = createSArgFilter(expr.getChildren().get(i),
+                                          colIds,
+                                          leaves,
+                                          readSchema,
+                                          version);
+        }
+        result = new OrFilter(orFilters);
+        break;
+      case AND:
+        VectorFilter[] andFilters = new VectorFilter[expr.getChildren().size()];
+        for (int i = 0; i < expr.getChildren().size(); i++) {
+          andFilters[i] = createSArgFilter(expr.getChildren().get(i),
+                                           colIds,
+                                           leaves,
+                                           readSchema,
+                                           version);
+        }
+        result = new AndFilter(andFilters);
+        break;
+      case NOT:
+        // Not is expected to be pushed down that it only happens on leaf filters
+        ExpressionTree leaf = expr.getChildren().get(0);
+        assert leaf.getOperator() == ExpressionTree.Operator.LEAF;
+        result = LeafFilterFactory.createLeafVectorFilter(leaves.get(leaf.getLeaf()),
+                                                             colIds,
+                                                             readSchema,
+                                                             version,
+            true);
+        break;
+      case LEAF:
+        result = LeafFilterFactory.createLeafVectorFilter(leaves.get(expr.getLeaf()),
+                                                          colIds,
+                                                          readSchema,
+                                                          version,
+            false);
+        break;
+      default:
+        throw new UnSupportedSArgException(String.format("SArg expression: %s is not supported",
+                                                         expr));
+    }
+    return result;
+  }
+
+  public static class UnSupportedSArgException extends Exception {
+
+    public UnSupportedSArgException(String message) {
+      super(message);
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/IsNotNullFilter.java b/java/core/src/java/org/apache/orc/impl/filter/IsNotNullFilter.java
new file mode 100644
index 0000000..d9828cc
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/IsNotNullFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.OrcFilterContext;
+
+public class IsNotNullFilter implements VectorFilter {
+
+  private final String colName;
+
+  public IsNotNullFilter(String colName) {
+    this.colName = colName;
+  }
+
+  @Override
+  public void filter(OrcFilterContext fc,
+                     Selected bound,
+                     Selected selOut) {
+    ColumnVector[] branch = fc.findColumnVector(colName);
+    ColumnVector v = branch[branch.length - 1];
+    boolean noNulls = OrcFilterContext.noNulls(branch);
+
+    if (noNulls || (v.isRepeating && !OrcFilterContext.isNull(branch, 0))) {
+      // In case we don't have any nulls, then irrespective of the repeating status, select all the
+      // values
+      selOut.selectAll(bound);
+    } else if (!v.isRepeating) {
+      int currSize = 0;
+      int rowIdx;
+      // As we have at least one null in this branch, we only need to check if it is repeating
+      // otherwise the repeating value will be null.
+      for (int i = 0; i < bound.selSize; i++) {
+        rowIdx = bound.sel[i];
+
+        // Select if the value is not null
+        if (!OrcFilterContext.isNull(branch, rowIdx)) {
+          selOut.sel[currSize++] = rowIdx;
+        }
+      }
+      selOut.selSize = currSize;
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/IsNullFilter.java b/java/core/src/java/org/apache/orc/impl/filter/IsNullFilter.java
new file mode 100644
index 0000000..8189ddf
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/IsNullFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.OrcFilterContext;
+
+public class IsNullFilter implements VectorFilter {
+
+  private final String colName;
+
+  public IsNullFilter(String colName) {
+    this.colName = colName;
+  }
+
+  @Override
+  public void filter(OrcFilterContext fc,
+                     Selected bound,
+                     Selected selOut) {
+    ColumnVector[] branch = fc.findColumnVector(colName);
+    ColumnVector v = branch[branch.length - 1];
+    boolean noNulls = OrcFilterContext.noNulls(branch);
+
+    // If the vector does not have nulls then none of them are selected and nothing to do
+    if (!noNulls) {
+      if (v.isRepeating && OrcFilterContext.isNull(branch, 0)) {
+        // If the repeating vector is null then set all as selected.
+        selOut.selectAll(bound);
+      } else {
+        int currSize = 0;
+        int rowIdx;
+        for (int i = 0; i < bound.selSize; i++) {
+          // Identify the rowIdx from the selected vector
+          rowIdx = bound.sel[i];
+
+          if (OrcFilterContext.isNull(branch, rowIdx)) {
+            selOut.sel[currSize++] = rowIdx;
+          }
+        }
+        selOut.selSize = currSize;
+      }
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/LeafFilter.java b/java/core/src/java/org/apache/orc/impl/filter/LeafFilter.java
new file mode 100644
index 0000000..fc59e54
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/LeafFilter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.OrcFilterContext;
+
+public abstract class LeafFilter implements VectorFilter {
+  public String getColName() {
+    return colName;
+  }
+
+  private final String colName;
+  private final boolean negated;
+
+  protected LeafFilter(String colName, boolean negated) {
+    this.colName = colName;
+    this.negated = negated;
+  }
+
+  @Override
+  public void filter(OrcFilterContext fc,
+                     Selected bound,
+                     Selected selOut) {
+    ColumnVector[] branch = fc.findColumnVector(colName);
+    ColumnVector v = branch[branch.length - 1];
+    boolean noNulls = OrcFilterContext.noNulls(branch);
+    int currSize = 0;
+    int rowIdx;
+
+    if (v.isRepeating) {
+      if (!OrcFilterContext.isNull(branch, 0) && allowWithNegation(v, 0)) {
+        // If the repeating value is allowed then allow the current selSize
+        for (int i = 0; i < bound.selSize; i++) {
+          rowIdx = bound.sel[i];
+          selOut.sel[currSize++] = rowIdx;
+        }
+      }
+    } else if (noNulls) {
+      for (int i = 0; i < bound.selSize; i++) {
+        rowIdx = bound.sel[i];
+
+        // Check the value
+        if (allowWithNegation(v, rowIdx)) {
+          selOut.sel[currSize++] = rowIdx;
+        }
+      }
+    } else {
+      for (int i = 0; i < bound.selSize; i++) {
+        rowIdx = bound.sel[i];
+
+        // Check the value only if not null
+        if (!OrcFilterContext.isNull(branch, rowIdx) &&
+            allowWithNegation(v, rowIdx)) {
+          selOut.sel[currSize++] = rowIdx;
+        }
+      }
+    }
+
+    selOut.selSize = currSize;
+  }
+
+  private boolean allowWithNegation(ColumnVector v, int rowIdx) {
+    return allow(v, rowIdx) != negated;
+  }
+
+  protected abstract boolean allow(ColumnVector v, int rowIdx);
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/OrFilter.java b/java/core/src/java/org/apache/orc/impl/filter/OrFilter.java
new file mode 100644
index 0000000..5b91f61
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/OrFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.orc.OrcFilterContext;
+
+public class OrFilter implements VectorFilter {
+
+  public final VectorFilter[] filters;
+  private final Selected orOut = new Selected();
+  private final Selected orBound = new Selected();
+
+  public OrFilter(VectorFilter[] filters) {
+    this.filters = filters;
+  }
+
+  @Override
+  public void filter(OrcFilterContext fc,
+                     Selected bound,
+                     Selected selOut) {
+    orOut.ensureSize(bound.selSize);
+    orBound.set(bound);
+    for (VectorFilter f : filters) {
+      // In case of OR since we have to add to existing output, pass the out as empty
+      orOut.clear();
+      f.filter(fc, orBound, orOut);
+      // During an OR operation the size cannot decrease, merge the current selections into selOut
+      selOut.unionDisjoint(orOut);
+      // Remove these from the bound as they don't need any further evaluation
+      orBound.minus(orOut);
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/Selected.java b/java/core/src/java/org/apache/orc/impl/filter/Selected.java
new file mode 100644
index 0000000..e237d7f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/Selected.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.orc.OrcFilterContext;
+
+/**
+ * Wrapper class for the selected vector that centralizes the convenience functions
+ */
+public class Selected {
+  // Sorted array of row indices
+  int[] sel;
+  int selSize;
+
+  Selected(int[] sel) {
+    this.sel = sel;
+    this.selSize = 0;
+  }
+
+  Selected() {
+    this(new int[1024]);
+  }
+
+  void clear() {
+    this.selSize = 0;
+  }
+
+  void selectAll(Selected src) {
+    System.arraycopy(src.sel, 0, this.sel, 0, src.selSize);
+    this.selSize = src.selSize;
+  }
+
+  /**
+   * Initialize the selected vector from the supplied filter context
+   *
+   * @param fc Input filterContext
+   */
+  void initialize(OrcFilterContext fc) {
+    ensureSize(fc.getSelectedSize());
+    selSize = fc.getSelectedSize();
+
+    if (fc.isSelectedInUse()) {
+      System.arraycopy(fc.getSelected(), 0, sel, 0, selSize);
+    } else {
+      for (int i = 0; i < selSize; i++) {
+        sel[i] = i;
+      }
+    }
+  }
+
+  /**
+   * Only adjust the size and don't worry about the state, if required this is handled before
+   * this is
+   * called.
+   *
+   * @param size Desired size
+   */
+  void ensureSize(int size) {
+    if (size > sel.length) {
+      sel = new int[size];
+      selSize = 0;
+    }
+  }
+
+  void set(Selected inBound) {
+    ensureSize(inBound.selSize);
+    System.arraycopy(inBound.sel, 0, sel, 0, inBound.selSize);
+    selSize = inBound.selSize;
+  }
+
+  /**
+   * Expects the elements the src to be disjoint with respect to this and is not validated.
+   *
+   * @param src The disjoint selection indices that should be merged into this.
+   */
+  void unionDisjoint(Selected src) {
+    // merge from the back to avoid the need for an intermediate store
+    int writeIdx = src.selSize + this.selSize - 1;
+    int srcIdx = src.selSize - 1;
+    int thisIdx = this.selSize - 1;
+
+    while (thisIdx >= 0 || srcIdx >= 0) {
+      if (srcIdx < 0 || (thisIdx >= 0 && src.sel[srcIdx] < this.sel[thisIdx])) {
+        // src is exhausted or this is larger
+        this.sel[writeIdx--] = this.sel[thisIdx--];
+      } else {
+        this.sel[writeIdx--] = src.sel[srcIdx--];
+      }
+    }
+    this.selSize += src.selSize;
+  }
+
+  /**
+   * Remove the elements of src from this.
+   *
+   * @param src The selection indices that should be removed from the current selection.
+   */
+  void minus(Selected src) {
+    int writeidx = 0;
+    int evalIdx = 0;
+    int srcIdx = 0;
+    while (srcIdx < src.selSize && evalIdx < this.selSize) {
+      if (this.sel[evalIdx] < src.sel[srcIdx]) {
+        // Evaluation is smaller so retain this
+        this.sel[writeidx] = this.sel[evalIdx];
+        evalIdx += 1;
+        writeidx += 1;
+      } else if (this.sel[evalIdx] > src.sel[srcIdx]) {
+        // Evaluation is larger cannot decide, navigate src forward
+        srcIdx += 1;
+      } else {
+        // Equal should be ignored so move both evalIdx and srcIdx forward
+        evalIdx += 1;
+        srcIdx += 1;
+      }
+    }
+    if (evalIdx < this.selSize) {
+      System.arraycopy(this.sel, evalIdx, this.sel, writeidx, this.selSize - evalIdx);
+      writeidx += this.selSize - evalIdx;
+    }
+    this.selSize = writeidx;
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/VectorFilter.java b/java/core/src/java/org/apache/orc/impl/filter/VectorFilter.java
new file mode 100644
index 0000000..cbf188c
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/VectorFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.orc.OrcFilterContext;
+
+/**
+ * A filter that operates on the supplied
+ * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch} and updates the selections.
+ *
+ * This is the interface that is the basis of both the leaf filters such as Equals, In and logical
+ * filters such as And, Or and Not
+ */
+public interface VectorFilter {
+
+  /**
+   * Filter the vectorized row batch that is wrapped into the FilterContext.
+   * @param fc     The filter context that wraps the VectorizedRowBatch
+   * @param bound  The bound of the scan, it is expected that the filter only operates on the bound
+   *               and change the selection status of the rows scoped by the bound. The filter is
+   *               expected to leave the bound unchanged.
+   * @param selOut The filter should update the selOut for the elements scoped by bound. The selOut
+   *               should be sorted in ascending order
+   */
+  void filter(OrcFilterContext fc, Selected bound, Selected selOut);
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/leaf/DecimalFilters.java b/java/core/src/java/org/apache/orc/impl/filter/leaf/DecimalFilters.java
new file mode 100644
index 0000000..3f05a0f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/leaf/DecimalFilters.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.filter.LeafFilter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+class DecimalFilters {
+  private DecimalFilters() {
+  }
+
+  static class DecimalBetween extends LeafFilter {
+    private final HiveDecimalWritable low;
+    private final HiveDecimalWritable high;
+
+    DecimalBetween(String colName, Object low, Object high, boolean negated) {
+      super(colName, negated);
+      this.low = (HiveDecimalWritable) low;
+      this.high = (HiveDecimalWritable) high;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((DecimalColumnVector) v).vector[rowIdx].compareTo(low) >= 0
+             && ((DecimalColumnVector) v).vector[rowIdx].compareTo(high) <= 0;
+    }
+  }
+
+  static class DecimalEquals extends LeafFilter {
+    private final HiveDecimalWritable aValue;
+
+    DecimalEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (HiveDecimalWritable) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((DecimalColumnVector) v).vector[rowIdx].compareTo(aValue) == 0;
+    }
+  }
+
+  static class DecimalIn extends LeafFilter {
+    private final Set<HiveDecimalWritable> inValues;
+
+    DecimalIn(String colName, List<Object> values, boolean negated) {
+      super(colName, negated);
+      inValues = new HashSet<>(values.size());
+      for (Object value : values) {
+        inValues.add((HiveDecimalWritable) value);
+      }
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return inValues.contains(((DecimalColumnVector) v).vector[rowIdx]);
+    }
+  }
+
+  static class DecimalLessThan extends LeafFilter {
+    private final HiveDecimalWritable aValue;
+
+    DecimalLessThan(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (HiveDecimalWritable) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((DecimalColumnVector) v).vector[rowIdx].compareTo(aValue) < 0;
+    }
+  }
+
+  static class DecimalLessThanEquals extends LeafFilter {
+    private final HiveDecimalWritable aValue;
+
+    DecimalLessThanEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (HiveDecimalWritable) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((DecimalColumnVector) v).vector[rowIdx].compareTo(aValue) <= 0;
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/leaf/FloatFilters.java b/java/core/src/java/org/apache/orc/impl/filter/leaf/FloatFilters.java
new file mode 100644
index 0000000..41bcdcd
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/leaf/FloatFilters.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.impl.filter.LeafFilter;
+
+import java.util.Arrays;
+import java.util.List;
+
+class FloatFilters {
+  private FloatFilters() {
+  }
+
+ static class FloatBetween extends LeafFilter {
+   private final double low;
+   private final double high;
+
+   FloatBetween(String colName, Object low, Object high, boolean negated) {
+     super(colName, negated);
+     this.low = (double) low;
+     this.high = (double) high;
+   }
+
+   @Override
+   protected boolean allow(ColumnVector v, int rowIdx) {
+     return ((DoubleColumnVector) v).vector[rowIdx] >= low
+            && ((DoubleColumnVector) v).vector[rowIdx] <= high;
+   }
+ }
+
+ static class FloatEquals extends LeafFilter {
+   private final double aValue;
+
+   FloatEquals(String colName, Object aValue, boolean negated) {
+     super(colName, negated);
+     this.aValue = (double) aValue;
+   }
+
+   @Override
+   protected boolean allow(ColumnVector v, int rowIdx) {
+     return ((DoubleColumnVector) v).vector[rowIdx] == aValue;
+   }
+ }
+
+ static class FloatIn extends LeafFilter {
+   private final double[] inValues;
+
+   FloatIn(String colName, List<Object> values, boolean negated) {
+     super(colName, negated);
+     inValues = new double[values.size()];
+     for (int i = 0; i < values.size(); i++) {
+       inValues[i] = (double) values.get(i);
+     }
+     Arrays.sort(inValues);
+   }
+
+   @Override
+   protected boolean allow(ColumnVector v, int rowIdx) {
+     return Arrays.binarySearch(inValues, ((DoubleColumnVector) v).vector[rowIdx]) >= 0;
+   }
+ }
+
+ static class FloatLessThan extends LeafFilter {
+   private final double aValue;
+
+   FloatLessThan(String colName, Object aValue, boolean negated) {
+     super(colName, negated);
+     this.aValue = (double) aValue;
+   }
+
+   @Override
+   protected boolean allow(ColumnVector v, int rowIdx) {
+     return ((DoubleColumnVector) v).vector[rowIdx] < aValue;
+   }
+ }
+
+ static class FloatLessThanEquals extends LeafFilter {
+   private final double aValue;
+
+   FloatLessThanEquals(String colName, Object aValue, boolean negated) {
+     super(colName, negated);
+     this.aValue = (double) aValue;
+   }
+
+   @Override
+   protected boolean allow(ColumnVector v, int rowIdx) {
+     return ((DoubleColumnVector) v).vector[rowIdx] <= aValue;
+   }
+ }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/leaf/LeafFilterFactory.java b/java/core/src/java/org/apache/orc/impl/filter/leaf/LeafFilterFactory.java
new file mode 100644
index 0000000..14f73d5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/leaf/LeafFilterFactory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.filter.VectorFilter;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.filter.IsNotNullFilter;
+import org.apache.orc.impl.filter.IsNullFilter;
+import org.apache.orc.impl.filter.LeafFilter;
+
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.Set;
+
+import static org.apache.orc.impl.TreeReaderFactory.isDecimalAsLong;
+
+public class LeafFilterFactory {
+  private LeafFilterFactory() {}
+
+  private static LeafFilter createEqualsFilter(String colName,
+                                               PredicateLeaf.Type type,
+                                               Object literal,
+                                               TypeDescription colType,
+                                               OrcFile.Version version,
+                                               boolean negated) {
+    switch (type) {
+      case BOOLEAN:
+        return new LongFilters.LongEquals(colName, (boolean) literal ? 1L : 0L, negated);
+      case DATE:
+        return new LongFilters.LongEquals(colName,
+                                          ((Date) literal).toLocalDate().toEpochDay(), negated);
+      case DECIMAL:
+        HiveDecimalWritable d = (HiveDecimalWritable) literal;
+        assert d.scale() <= colType.getScale();
+        if (isDecimalAsLong(version, colType.getPrecision())) {
+          return new LongFilters.LongEquals(colName, d.serialize64(colType.getScale()), negated);
+        } else {
+          return new DecimalFilters.DecimalEquals(colName, d, negated);
+        }
+      case FLOAT:
+        return new FloatFilters.FloatEquals(colName, literal, negated);
+      case LONG:
+        return new LongFilters.LongEquals(colName, literal, negated);
+      case STRING:
+        return new StringFilters.StringEquals(colName, literal, negated);
+      case TIMESTAMP:
+        return new TimestampFilters.TimestampEquals(colName, literal, negated);
+      default:
+        throw new IllegalArgumentException(String.format("Equals does not support type: %s", type));
+    }
+  }
+
+  private static LeafFilter createLessThanFilter(String colName,
+                                                 PredicateLeaf.Type type,
+                                                 Object literal,
+                                                 TypeDescription colType,
+                                                 OrcFile.Version version,
+                                                 boolean negated) {
+    switch (type) {
+      case BOOLEAN:
+        return new LongFilters.LongLessThan(colName, (boolean) literal ? 1L : 0L, negated);
+      case DATE:
+        return new LongFilters.LongLessThan(colName,
+                                            ((Date) literal).toLocalDate().toEpochDay(), negated);
+      case DECIMAL:
+        HiveDecimalWritable d = (HiveDecimalWritable) literal;
+        assert d.scale() <= colType.getScale();
+        if (isDecimalAsLong(version, colType.getPrecision())) {
+          return new LongFilters.LongLessThan(colName, d.serialize64(colType.getScale()),
+                                              negated);
+        } else {
+          return new DecimalFilters.DecimalLessThan(colName, d, negated);
+        }
+      case FLOAT:
+        return new FloatFilters.FloatLessThan(colName, literal, negated);
+      case LONG:
+        return new LongFilters.LongLessThan(colName, literal, negated);
+      case STRING:
+        return new StringFilters.StringLessThan(colName, literal, negated);
+      case TIMESTAMP:
+        return new TimestampFilters.TimestampLessThan(colName, literal, negated);
+      default:
+        throw new IllegalArgumentException(String.format("LessThan does not support type: %s", type));
+    }
+  }
+
+  private static LeafFilter createLessThanEqualsFilter(String colName,
+                                                       PredicateLeaf.Type type,
+                                                       Object literal,
+                                                       TypeDescription colType,
+                                                       OrcFile.Version version,
+                                                       boolean negated) {
+    switch (type) {
+      case BOOLEAN:
+        return new LongFilters.LongLessThanEquals(colName, (boolean) literal ? 1L : 0L,
+                                                  negated);
+      case DATE:
+        return new LongFilters.LongLessThanEquals(colName,
+                                                  ((Date) literal).toLocalDate().toEpochDay(), negated);
+      case DECIMAL:
+        HiveDecimalWritable d = (HiveDecimalWritable) literal;
+        assert d.scale() <= colType.getScale();
+        if (isDecimalAsLong(version, colType.getPrecision())) {
+          return new LongFilters.LongLessThanEquals(colName,
+                                                    d.serialize64(colType.getScale()), negated);
+        } else {
+          return new DecimalFilters.DecimalLessThanEquals(colName, d, negated);
+        }
+      case FLOAT:
+        return new FloatFilters.FloatLessThanEquals(colName, literal, negated);
+      case LONG:
+        return new LongFilters.LongLessThanEquals(colName, literal, negated);
+      case STRING:
+        return new StringFilters.StringLessThanEquals(colName, literal, negated);
+      case TIMESTAMP:
+        return new TimestampFilters.TimestampLessThanEquals(colName, literal, negated);
+      default:
+        throw new IllegalArgumentException(String.format("LessThanEquals does not support type: %s", type));
+    }
+  }
+
+  private static LeafFilter createBetweenFilter(String colName,
+                                                PredicateLeaf.Type type,
+                                                Object low,
+                                                Object high,
+                                                TypeDescription colType,
+                                                OrcFile.Version version,
+                                                boolean negated) {
+    switch (type) {
+      case BOOLEAN:
+        return new LongFilters.LongBetween(colName, (boolean) low ? 1L : 0L,
+                                           (boolean) high ? 1L : 0L, negated);
+      case DATE:
+        return new LongFilters.LongBetween(colName, ((Date) low).toLocalDate().toEpochDay(),
+                                           ((Date) high).toLocalDate().toEpochDay(), negated);
+      case DECIMAL:
+        HiveDecimalWritable dLow = (HiveDecimalWritable) low;
+        HiveDecimalWritable dHigh = (HiveDecimalWritable) high;
+        assert dLow.scale() <= colType.getScale() && dLow.scale() <= colType.getScale();
+        if (isDecimalAsLong(version, colType.getPrecision())) {
+          return new LongFilters.LongBetween(colName, dLow.serialize64(colType.getScale()),
+                                             dHigh.serialize64(colType.getScale()), negated);
+        } else {
+          return new DecimalFilters.DecimalBetween(colName, dLow, dHigh, negated);
+        }
+      case FLOAT:
+        return new FloatFilters.FloatBetween(colName, low, high, negated);
+      case LONG:
+        return new LongFilters.LongBetween(colName, low, high, negated);
+      case STRING:
+        return new StringFilters.StringBetween(colName, low, high, negated);
+      case TIMESTAMP:
+        return new TimestampFilters.TimestampBetween(colName, low, high, negated);
+      default:
+        throw new IllegalArgumentException(String.format("Between does not support type: %s", type));
+    }
+  }
+
+  private static LeafFilter createInFilter(String colName,
+                                           PredicateLeaf.Type type,
+                                           List<Object> inList,
+                                           TypeDescription colType,
+                                           OrcFile.Version version,
+                                           boolean negated) {
+    switch (type) {
+      case BOOLEAN:
+        return new LongFilters.LongIn(colName,
+                                      inList.stream().map((Object v) -> (boolean) v ? 1L : 0L)
+                .collect(Collectors.toList()), negated);
+      case DATE:
+        return new LongFilters.LongIn(colName,
+                                      inList.stream()
+                .map((Object v) -> ((Date) v).toLocalDate().toEpochDay())
+                .collect(Collectors.toList()), negated);
+      case DECIMAL:
+        if (isDecimalAsLong(version, colType.getPrecision())) {
+          List<Object> values = new ArrayList<>(inList.size());
+          for (Object o : inList) {
+            HiveDecimalWritable v = (HiveDecimalWritable) o;
+            assert v.scale() <= colType.getScale();
+            values.add(v.serialize64(colType.getScale()));
+          }
+          return new LongFilters.LongIn(colName, values, negated);
+        } else {
+          return new DecimalFilters.DecimalIn(colName, inList, negated);
+        }
+      case FLOAT:
+        return new FloatFilters.FloatIn(colName, inList, negated);
+      case LONG:
+        return new LongFilters.LongIn(colName, inList, negated);
+      case STRING:
+        return new StringFilters.StringIn(colName, inList, negated);
+      case TIMESTAMP:
+        return new TimestampFilters.TimestampIn(colName, inList, negated);
+      default:
+        throw new IllegalArgumentException(String.format("In does not support type: %s", type));
+    }
+  }
+
+  public static VectorFilter createLeafVectorFilter(PredicateLeaf leaf,
+                                                    Set<String> colIds,
+                                                    TypeDescription readSchema,
+                                                    OrcFile.Version version,
+                                                    boolean negated)
+      throws FilterFactory.UnSupportedSArgException {
+    colIds.add(leaf.getColumnName());
+    TypeDescription colType = readSchema.findSubtype(leaf.getColumnName());
+
+    switch (leaf.getOperator()) {
+      case IN:
+        return createInFilter(leaf.getColumnName(),
+            leaf.getType(),
+            leaf.getLiteralList(),
+            colType,
+            version,
+            negated);
+      case EQUALS:
+        return createEqualsFilter(leaf.getColumnName(),
+            leaf.getType(),
+            leaf.getLiteral(),
+            colType,
+            version,
+            negated);
+      case LESS_THAN:
+        return createLessThanFilter(leaf.getColumnName(),
+            leaf.getType(),
+            leaf.getLiteral(),
+            colType,
+            version,
+            negated);
+      case LESS_THAN_EQUALS:
+        return createLessThanEqualsFilter(leaf.getColumnName(),
+            leaf.getType(),
+            leaf.getLiteral(),
+            colType,
+            version,
+            negated);
+      case BETWEEN:
+        return createBetweenFilter(leaf.getColumnName(),
+            leaf.getType(),
+            leaf.getLiteralList().get(0),
+            leaf.getLiteralList().get(1),
+            colType,
+            version,
+            negated);
+      case IS_NULL:
+        return negated ? new IsNotNullFilter(leaf.getColumnName()) :
+            new IsNullFilter(leaf.getColumnName());
+      default:
+        throw new FilterFactory.UnSupportedSArgException(String.format("Predicate: %s is not supported", leaf));
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/leaf/LongFilters.java b/java/core/src/java/org/apache/orc/impl/filter/leaf/LongFilters.java
new file mode 100644
index 0000000..5fe09cf
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/leaf/LongFilters.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.orc.impl.filter.LeafFilter;
+
+import java.util.Arrays;
+import java.util.List;
+
+class LongFilters {
+  private LongFilters() {
+  }
+
+  static class LongBetween extends LeafFilter {
+    private final long low;
+    private final long high;
+
+    LongBetween(String colName, Object low, Object high, boolean negated) {
+      super(colName, negated);
+      this.low = (long) low;
+      this.high = (long) high;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((LongColumnVector) v).vector[rowIdx] >= low
+             && ((LongColumnVector) v).vector[rowIdx] <= high;
+    }
+  }
+
+  static class LongEquals extends LeafFilter {
+    private final long aValue;
+
+    LongEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (long) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((LongColumnVector) v).vector[rowIdx] == aValue;
+    }
+  }
+
+  static class LongIn extends LeafFilter {
+    private final long[] inValues;
+
+    LongIn(String colName, List<Object> values, boolean negated) {
+      super(colName, negated);
+      inValues = new long[values.size()];
+      for (int i = 0; i < values.size(); i++) {
+        inValues[i] = (long) values.get(i);
+      }
+      Arrays.sort(inValues);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return Arrays.binarySearch(inValues, ((LongColumnVector) v).vector[rowIdx]) >= 0;
+    }
+  }
+
+  static class LongLessThan extends LeafFilter {
+    private final long aValue;
+
+    LongLessThan(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (long) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((LongColumnVector) v).vector[rowIdx] < aValue;
+    }
+  }
+
+  static class LongLessThanEquals extends LeafFilter {
+    private final long aValue;
+
+    LongLessThanEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (long) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((LongColumnVector) v).vector[rowIdx] <= aValue;
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/leaf/StringFilters.java b/java/core/src/java/org/apache/orc/impl/filter/leaf/StringFilters.java
new file mode 100644
index 0000000..8948abe
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/leaf/StringFilters.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.orc.impl.filter.LeafFilter;
+import org.apache.orc.util.CuckooSetBytes;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+class StringFilters {
+  private StringFilters() {
+  }
+
+  static class StringBetween extends LeafFilter {
+    private final byte[] low;
+    private final byte[] high;
+
+    StringBetween(String colName, Object low, Object high, boolean negated) {
+      super(colName, negated);
+      this.low = ((String) low).getBytes(StandardCharsets.UTF_8);
+      this.high = ((String) high).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      BytesColumnVector bv = (BytesColumnVector) v;
+      return StringExpr.compare(bv.vector[rowIdx], bv.start[rowIdx], bv.length[rowIdx],
+                                low, 0, low.length) >= 0
+             && StringExpr.compare(bv.vector[rowIdx], bv.start[rowIdx], bv.length[rowIdx],
+                                   high, 0, high.length) <= 0;
+    }
+  }
+
+  static class StringEquals extends LeafFilter {
+    private final byte[] aValue;
+
+    StringEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = ((String) aValue).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      BytesColumnVector bv = (BytesColumnVector) v;
+      return StringExpr.equal(aValue, 0, aValue.length,
+                              bv.vector[rowIdx], bv.start[rowIdx], bv.length[rowIdx]);
+    }
+  }
+
+  static class StringIn extends LeafFilter {
+    // The set object containing the IN list. This is optimized for lookup
+    // of the data type of the column.
+    private final CuckooSetBytes inSet;
+
+    StringIn(String colName, List<Object> values, boolean negated) {
+      super(colName, negated);
+      final byte[][] inValues = new byte[values.size()][];
+      for (int i = 0; i < values.size(); i++) {
+        inValues[i] = ((String) values.get(i)).getBytes(StandardCharsets.UTF_8);
+      }
+      inSet = new CuckooSetBytes(inValues.length);
+      inSet.load(inValues);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      BytesColumnVector bv = (BytesColumnVector) v;
+      return inSet.lookup(bv.vector[rowIdx], bv.start[rowIdx], bv.length[rowIdx]);
+    }
+  }
+
+  static class StringLessThan extends LeafFilter {
+    private final byte[] aValue;
+
+    StringLessThan(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = ((String) aValue).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      BytesColumnVector bv = (BytesColumnVector) v;
+      return StringExpr.compare(bv.vector[rowIdx], bv.start[rowIdx], bv.length[rowIdx],
+                                aValue, 0, aValue.length) < 0;
+    }
+  }
+
+  static class StringLessThanEquals extends LeafFilter {
+    private final byte[] aValue;
+
+    StringLessThanEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = ((String) aValue).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      BytesColumnVector bv = (BytesColumnVector) v;
+      return StringExpr.compare(bv.vector[rowIdx], bv.start[rowIdx], bv.length[rowIdx],
+                                aValue, 0, aValue.length) <= 0;
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/filter/leaf/TimestampFilters.java b/java/core/src/java/org/apache/orc/impl/filter/leaf/TimestampFilters.java
new file mode 100644
index 0000000..c5b34c4
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/filter/leaf/TimestampFilters.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.impl.filter.LeafFilter;
+
+import java.sql.Timestamp;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+class TimestampFilters {
+  private TimestampFilters() {
+  }
+
+
+  static class TimestampBetween extends LeafFilter {
+    private final Timestamp low;
+    private final Timestamp high;
+
+    TimestampBetween(String colName, Object low, Object high, boolean negated) {
+      super(colName, negated);
+      this.low = (Timestamp) low;
+      this.high = (Timestamp) high;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((TimestampColumnVector) v).compareTo(rowIdx, low) >= 0
+             && ((TimestampColumnVector) v).compareTo(rowIdx, high) <= 0;
+    }
+  }
+
+  static class TimestampEquals extends LeafFilter {
+    private final Timestamp aValue;
+
+    TimestampEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (Timestamp) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((TimestampColumnVector) v).compareTo(rowIdx, aValue) == 0;
+    }
+  }
+
+  static class TimestampIn extends LeafFilter {
+    private final Set<Timestamp> inValues;
+
+    TimestampIn(String colName, List<Object> values, boolean negated) {
+      super(colName, negated);
+      inValues = new HashSet<>(values.size());
+      for (Object value : values) {
+        inValues.add((Timestamp) value);
+      }
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return inValues.contains(((TimestampColumnVector) v).asScratchTimestamp(rowIdx));
+    }
+  }
+
+  static class TimestampLessThan extends LeafFilter {
+    private final Timestamp aValue;
+
+    TimestampLessThan(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (Timestamp) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((TimestampColumnVector) v).compareTo(rowIdx, aValue) < 0;
+    }
+  }
+
+  static class TimestampLessThanEquals extends LeafFilter {
+    private final Timestamp aValue;
+
+    TimestampLessThanEquals(String colName, Object aValue, boolean negated) {
+      super(colName, negated);
+      this.aValue = (Timestamp) aValue;
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((TimestampColumnVector) v).compareTo(rowIdx, aValue) <= 0;
+    }
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/util/CuckooSetBytes.java b/java/core/src/java/org/apache/orc/util/CuckooSetBytes.java
new file mode 100644
index 0000000..b60956d
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/CuckooSetBytes.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+
+import java.util.Random;
+
+/**
+ * A high-performance set implementation used to support fast set membership testing,
+ * using Cuckoo hashing. This is used to support fast tests of the form
+ *
+ *       column IN ( <list-of-values )
+ *
+ * For details on the algorithm, see R. Pagh and F. F. Rodler, "Cuckoo Hashing,"
+ * Elsevier Science preprint, Dec. 2003. http://www.itu.dk/people/pagh/papers/cuckoo-jour.pdf.
+ *
+ * Copied from CuckooSetBytes@Apache Hive project for convenience
+ */
+public class CuckooSetBytes {
+  private byte[][] t1;
+  private byte[][] t2;
+  private byte[][] prev1 = null; // used for rehashing to get last set of values
+  private byte[][] prev2 = null; // " "
+  private int n; // current array size
+  private static final double PADDING_FACTOR = 1.0/0.40; // have minimum 40% fill factor
+  private int salt = 0;
+  private final Random gen = new Random(676983475);
+  private int rehashCount = 0;
+  private static final long INT_MASK  = 0x00000000ffffffffL;
+  private static final long BYTE_MASK = 0x00000000000000ffL;
+  // some prime numbers spaced about at powers of 2 in magnitude
+  static final int[] primes = {7, 13, 17, 23, 31, 53, 67, 89, 127, 269, 571, 1019, 2089,
+    4507, 8263, 16361, 32327, 65437, 131111, 258887, 525961, 999983, 2158909, 4074073,
+    8321801, 15485863, 32452867, 67867967, 122949829, 256203221, 553105253, 982451653,
+    1645333507, 2147483647};
+
+  /**
+   * Allocate a new set to hold expectedSize values. Re-allocation to expand
+   * the set is not implemented, so the expected size must be at least the
+   * size of the set to be inserted.
+   * @param expectedSize At least the size of the set of values that will be inserted.
+   */
+  public CuckooSetBytes(int expectedSize) {
+
+    // Choose array size. We have two hash tables to hold entries, so the sum
+    // of the two should have a bit more than twice as much space as the
+    // minimum required.
+    n = (int) (expectedSize * PADDING_FACTOR / 2.0);
+
+    // some prime numbers spaced about at powers of 2 in magnitude
+
+    // try to get prime number table size to have less dependence on good hash function
+    for (int i = 0; i != primes.length; i++) {
+      if (n <= primes[i]) {
+        n = primes[i];
+        break;
+      }
+    }
+
+    t1 = new byte[n][];
+    t2 = new byte[n][];
+    updateHashSalt();
+  }
+
+  /**
+   * Return true if and only if the value in byte array b beginning at start
+   * and ending at start+len is present in the set.
+   */
+  public boolean lookup(byte[] b, int start, int len) {
+
+    return entryEqual(t1, h1(b, start, len), b, start, len)
+        || entryEqual(t2, h2(b, start, len), b, start, len);
+  }
+
+  private static boolean entryEqual(byte[][] t, int hash, byte[] b, int start, int len) {
+    return t[hash] != null && StringExpr.equal(t[hash], 0, t[hash].length, b, start, len);
+  }
+
+  public void insert(byte[] x) {
+    byte[] temp;
+    if (lookup(x, 0, x.length)) {
+      return;
+    }
+
+    // Try to insert up to n times. Rehash if that fails.
+    for(int i = 0; i != n; i++) {
+      int hash1 = h1(x, 0, x.length);
+      if (t1[hash1] == null) {
+        t1[hash1] = x;
+        return;
+      }
+
+      // swap x and t1[h1(x)]
+      temp = t1[hash1];
+      t1[hash1] = x;
+      x = temp;
+
+      int hash2 = h2(x, 0, x.length);
+      if (t2[hash2] == null) {
+        t2[hash2] = x;
+        return;
+      }
+
+      // swap x and t2[h2(x)]
+      temp = t2[hash2];
+      t2[hash2] = x;
+      x = temp;
+    }
+    rehash();
+    insert(x);
+  }
+
+  /**
+   * Insert all values in the input array into the set.
+   */
+  public void load(byte[][] a) {
+    for (byte[] x : a) {
+      insert(x);
+    }
+  }
+
+  /**
+   * Try to insert with up to n value's "poked out". Return the last value poked out.
+   * If the value is not blank then we assume there was a cycle.
+   * Don't try to insert the same value twice. This is for use in rehash only,
+   * so you won't see the same value twice.
+   */
+  private byte[] tryInsert(byte[] x) {
+    byte[] temp;
+
+    for(int i = 0; i != n; i++) {
+      int hash1 = h1(x, 0, x.length);
+      if (t1[hash1] == null) {
+        t1[hash1] = x;
+        return null;
+      }
+
+      // swap x and t1[h1(x)]
+      temp = t1[hash1];
+      t1[hash1] = x;
+      x = temp;
+
+      int hash2 = h2(x, 0, x.length);
+      if (t2[hash2] == null) {
+        t2[hash2] = x;
+        return null;
+      }
+
+      // swap x and t2[h2(x)]
+      temp = t2[hash2];
+      t2[hash2] = x;
+      x = temp;
+      if (x == null) {
+        break;
+      }
+    }
+    return x;
+  }
+
+  /**
+   * first hash function
+   */
+  private int h1(byte[] b, int start, int len) {
+
+    // AND hash with mask to 0 out sign bit to make sure it's positive.
+    // Then we know taking the result mod n is in the range (0..n-1).
+    return (hash(b, start, len, 0) & 0x7FFFFFFF) % n;
+  }
+
+  /**
+   * second hash function
+   */
+  private int h2(byte[] b, int start, int len) {
+
+    // AND hash with mask to 0 out sign bit to make sure it's positive.
+    // Then we know taking the result mod n is in the range (0..n-1).
+    // Include salt as argument so this hash function can be varied
+    // if we need to rehash.
+    return (hash(b, start, len, salt) & 0x7FFFFFFF) % n;
+  }
+
+  /**
+   * In case of rehash, hash function h2 is changed by updating the
+   * salt value passed in to the function hash().
+   */
+  private void updateHashSalt() {
+    salt = gen.nextInt(0x7FFFFFFF);
+  }
+
+  private void rehash() {
+    rehashCount++;
+    if (rehashCount > 20) {
+      throw new RuntimeException("Too many rehashes");
+    }
+    updateHashSalt();
+
+    // Save original values
+    if (prev1 == null) {
+      prev1 = t1;
+      prev2 = t2;
+    }
+    t1 = new byte[n][];
+    t2 = new byte[n][];
+    for (byte[] v  : prev1) {
+      if (v != null) {
+        byte[] x = tryInsert(v);
+        if (x != null) {
+          rehash();
+          return;
+        }
+      }
+    }
+    for (byte[] v  : prev2) {
+      if (v != null) {
+        byte[] x = tryInsert(v);
+        if (x != null) {
+          rehash();
+          return;
+        }
+      }
+    }
+
+    // We succeeded in adding all the values, so
+    // clear the previous values recorded.
+    prev1 = null;
+    prev2 = null;
+  }
+
+  /**
+   * This is adapted from the org.apache.hadoop.util.hash.JenkinsHash package.
+   * The interface needed to be modified to suit the use here, by adding
+   * a start offset parameter to the hash function.
+   *
+   * In the future, folding this back into the original Hadoop package should
+   * be considered. This could could them import that package and use it.
+   * The original comments from the source are below.
+   *
+   * taken from  hashlittle() -- hash a variable-length key into a 32-bit value
+   *
+   * @param key the key (the unaligned variable-length array of bytes)
+   * @param nbytes number of bytes to include in hash
+   * @param initval can be any integer value
+   * @return a 32-bit value.  Every bit of the key affects every bit of the
+   * return value.  Two keys differing by one or two bits will have totally
+   * different hash values.
+   *
+   * <p>The best hash table sizes are powers of 2.  There is no need to do mod
+   * a prime (mod is sooo slow!).  If you need less than 32 bits, use a bitmask.
+   * For example, if you need only 10 bits, do
+   * <code>h = (h & hashmask(10));</code>
+   * In which case, the hash table should have hashsize(10) elements.
+   *
+   * <p>If you are hashing n strings byte[][] k, do it like this:
+   * for (int i = 0, h = 0; i < n; ++i) h = hash( k[i], h);
+   *
+   * <p>By Bob Jenkins, 2006.  bob_jenkins@burtleburtle.net.  You may use this
+   * code any way you wish, private, educational, or commercial.  It's free.
+   *
+   * <p>Use for hash table lookup, or anything where one collision in 2^^32 is
+   * acceptable.  Do NOT use for cryptographic purposes.
+  */
+  @SuppressWarnings("fallthrough")
+  private int hash(byte[] key, int start, int nbytes, int initval) {
+    int length = nbytes;
+    long a, b, c;       // We use longs because we don't have unsigned ints
+    a = b = c = (0x00000000deadbeefL + length + initval) & INT_MASK;
+    int offset = start;
+    for (; length > 12; offset += 12, length -= 12) {
+      a = (a + (key[offset] & BYTE_MASK)) & INT_MASK;
+      a = (a + (((key[offset + 1]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+      a = (a + (((key[offset + 2]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+      a = (a + (((key[offset + 3]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+      b = (b + (key[offset + 4]    & BYTE_MASK)) & INT_MASK;
+      b = (b + (((key[offset + 5]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+      b = (b + (((key[offset + 6]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+      b = (b + (((key[offset + 7]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+      c = (c + (key[offset + 8]    & BYTE_MASK)) & INT_MASK;
+      c = (c + (((key[offset + 9]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+      c = (c + (((key[offset + 10] & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+      c = (c + (((key[offset + 11] & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+
+      /*
+       * mix -- mix 3 32-bit values reversibly.
+       * This is reversible, so any information in (a,b,c) before mix() is
+       * still in (a,b,c) after mix().
+       *
+       * If four pairs of (a,b,c) inputs are run through mix(), or through
+       * mix() in reverse, there are at least 32 bits of the output that
+       * are sometimes the same for one pair and different for another pair.
+       *
+       * This was tested for:
+       * - pairs that differed by one bit, by two bits, in any combination
+       *   of top bits of (a,b,c), or in any combination of bottom bits of
+       *   (a,b,c).
+       * - "differ" is defined as +, -, ^, or ~^.  For + and -, I transformed
+       *   the output delta to a Gray code (a^(a>>1)) so a string of 1's (as
+       *    is commonly produced by subtraction) look like a single 1-bit
+       *    difference.
+       * - the base values were pseudorandom, all zero but one bit set, or
+       *   all zero plus a counter that starts at zero.
+       *
+       * Some k values for my "a-=c; a^=rot(c,k); c+=b;" arrangement that
+       * satisfy this are
+       *     4  6  8 16 19  4
+       *     9 15  3 18 27 15
+       *    14  9  3  7 17  3
+       * Well, "9 15 3 18 27 15" didn't quite get 32 bits diffing for
+       * "differ" defined as + with a one-bit base and a two-bit delta.  I
+       * used http://burtleburtle.net/bob/hash/avalanche.html to choose
+       * the operations, constants, and arrangements of the variables.
+       *
+       * This does not achieve avalanche.  There are input bits of (a,b,c)
+       * that fail to affect some output bits of (a,b,c), especially of a.
+       * The most thoroughly mixed value is c, but it doesn't really even
+       * achieve avalanche in c.
+       *
+       * This allows some parallelism.  Read-after-writes are good at doubling
+       * the number of bits affected, so the goal of mixing pulls in the
+       * opposite direction as the goal of parallelism.  I did what I could.
+       * Rotates seem to cost as much as shifts on every machine I could lay
+       * my hands on, and rotates are much kinder to the top and bottom bits,
+       * so I used rotates.
+       *
+       * #define mix(a,b,c) \
+       * { \
+       *   a -= c;  a ^= rot(c, 4);  c += b; \
+       *   b -= a;  b ^= rot(a, 6);  a += c; \
+       *   c -= b;  c ^= rot(b, 8);  b += a; \
+       *   a -= c;  a ^= rot(c,16);  c += b; \
+       *   b -= a;  b ^= rot(a,19);  a += c; \
+       *   c -= b;  c ^= rot(b, 4);  b += a; \
+       * }
+       *
+       * mix(a,b,c);
+       */
+      a = (a - c) & INT_MASK;
+      a ^= rot(c, 4);
+      c = (c + b) & INT_MASK;
+      b = (b - a) & INT_MASK;
+      b ^= rot(a, 6);
+      a = (a + c) & INT_MASK;
+      c = (c - b) & INT_MASK;
+      c ^= rot(b, 8);
+      b = (b + a) & INT_MASK;
+      a = (a - c) & INT_MASK;
+      a ^= rot(c,16);
+      c = (c + b) & INT_MASK;
+      b = (b - a) & INT_MASK;
+      b ^= rot(a,19);
+      a = (a + c) & INT_MASK;
+      c = (c - b) & INT_MASK;
+      c ^= rot(b, 4);
+      b = (b + a) & INT_MASK;
+    }
+
+    //-------------------------------- last block: affect all 32 bits of (c)
+    switch (length) {                   // all the case statements fall through
+    case 12:
+      c = (c + (((key[offset + 11] & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+    case 11:
+      c = (c + (((key[offset + 10] & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+    case 10:
+      c = (c + (((key[offset + 9]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+    case  9:
+      c = (c + (key[offset + 8]    & BYTE_MASK)) & INT_MASK;
+    case  8:
+      b = (b + (((key[offset + 7]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+    case  7:
+      b = (b + (((key[offset + 6]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+    case  6:
+      b = (b + (((key[offset + 5]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+    case  5:
+      b = (b + (key[offset + 4]    & BYTE_MASK)) & INT_MASK;
+    case  4:
+      a = (a + (((key[offset + 3]  & BYTE_MASK) << 24) & INT_MASK)) & INT_MASK;
+    case  3:
+      a = (a + (((key[offset + 2]  & BYTE_MASK) << 16) & INT_MASK)) & INT_MASK;
+    case  2:
+      a = (a + (((key[offset + 1]  & BYTE_MASK) <<  8) & INT_MASK)) & INT_MASK;
+    case  1:
+      a = (a + (key[offset] & BYTE_MASK)) & INT_MASK;
+      break;
+    case  0:
+      return (int)(c & INT_MASK);
+    }
+    /*
+     * final -- final mixing of 3 32-bit values (a,b,c) into c
+     *
+     * Pairs of (a,b,c) values differing in only a few bits will usually
+     * produce values of c that look totally different.  This was tested for
+     * - pairs that differed by one bit, by two bits, in any combination
+     *   of top bits of (a,b,c), or in any combination of bottom bits of
+     *   (a,b,c).
+     *
+     * - "differ" is defined as +, -, ^, or ~^.  For + and -, I transformed
+     *   the output delta to a Gray code (a^(a>>1)) so a string of 1's (as
+     *   is commonly produced by subtraction) look like a single 1-bit
+     *   difference.
+     *
+     * - the base values were pseudorandom, all zero but one bit set, or
+     *   all zero plus a counter that starts at zero.
+     *
+     * These constants passed:
+     *   14 11 25 16 4 14 24
+     *   12 14 25 16 4 14 24
+     * and these came close:
+     *    4  8 15 26 3 22 24
+     *   10  8 15 26 3 22 24
+     *   11  8 15 26 3 22 24
+     *
+     * #define final(a,b,c) \
+     * {
+     *   c ^= b; c -= rot(b,14); \
+     *   a ^= c; a -= rot(c,11); \
+     *   b ^= a; b -= rot(a,25); \
+     *   c ^= b; c -= rot(b,16); \
+     *   a ^= c; a -= rot(c,4);  \
+     *   b ^= a; b -= rot(a,14); \
+     *   c ^= b; c -= rot(b,24); \
+     * }
+     *
+     */
+    c ^= b;
+    c = (c - rot(b,14)) & INT_MASK;
+    a ^= c;
+    a = (a - rot(c,11)) & INT_MASK;
+    b ^= a;
+    b = (b - rot(a,25)) & INT_MASK;
+    c ^= b;
+    c = (c - rot(b,16)) & INT_MASK;
+    a ^= c;
+    a = (a - rot(c,4))  & INT_MASK;
+    b ^= a;
+    b = (b - rot(a,14)) & INT_MASK;
+    c ^= b;
+    c = (c - rot(b,24)) & INT_MASK;
+
+    return (int)(c & INT_MASK);
+  }
+
+  private static long rot(long val, int pos) {
+    return ((Integer.rotateLeft(
+        (int)(val & INT_MASK), pos)) & INT_MASK);
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
index 0bcdb17..604d431 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypes.java
@@ -49,6 +49,7 @@ public class TestRowFilteringComplexTypes {
     @BeforeEach
     public void openFileSystem(TestInfo testInfo) throws Exception {
         conf = new Configuration();
+        OrcConf.READER_USE_SELECTED.setBoolean(conf, true);
         fs = FileSystem.getLocal(conf);
         testFilePath = new Path(workDir,
             "TestRowFilteringComplexTypes." + testInfo.getTestMethod().get().getName() + ".orc");
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
index b310ee8..3b13632 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
@@ -146,6 +146,79 @@ public class TestRowFilteringIOSkip {
   }
 
   @Test
+  public void readSingleRowWithFilter() throws IOException {
+    int cnt = 100;
+    Random r = new Random(cnt);
+    long ridx;
+
+    while (cnt > 0) {
+      ridx = r.nextInt((int) RowCount);
+      readSingleRowWithFilter(ridx);
+      cnt--;
+    }
+  }
+
+  private void readSingleRowWithFilter(long idx) throws IOException {
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("ridx", PredicateLeaf.Type.LONG, idx)
+      .build();
+    Reader.Options options = r.options()
+      .searchArgument(sarg, new String[] {"ridx"})
+      .useSelected(true)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = schema.createRowBatch();
+    long rowCount = 0;
+    try (RecordReader rr = r.rows(options)) {
+      assertTrue(rr.nextBatch(b));
+      validateBatch(b, idx);
+      rowCount += b.size;
+      assertFalse(rr.nextBatch(b));
+    }
+    assertEquals(1, rowCount);
+  }
+
+  @Test
+  public void readWithoutSelectedSupport() throws IOException {
+    // When selected vector is not supported we will read more rows than just the filtered rows.
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    long rowIdx = 12345;
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("ridx", PredicateLeaf.Type.LONG, rowIdx)
+      .build();
+    Reader.Options options = r.options()
+      .searchArgument(sarg, new String[] {"ridx"})
+      .useSelected(false)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = schema.createRowBatch();
+    long rowCount = 0;
+    HiveDecimalWritable d = new HiveDecimalWritable();
+    readStart();
+    try (RecordReader rr = r.rows(options)) {
+      while (rr.nextBatch(b)) {
+        rowCount += b.size;
+        for (int i = 0; i < b.size; i++) {
+          if (i == b.selected[0]) {
+            // All the values are expected to match only for the selected row
+            long expValue = ((LongColumnVector) b.cols[0]).vector[i];
+            d.setFromLongAndScale(expValue, scale);
+            assertEquals(d, ((DecimalColumnVector) b.cols[1]).vector[i]);
+            assertEquals(expValue, ((LongColumnVector) b.cols[2]).vector[i]);
+            BytesColumnVector sv = (BytesColumnVector) b.cols[3];
+            assertEquals(String.valueOf(expValue),
+                                sv.toString(i));
+            assertEquals(rowIdx, ((LongColumnVector) b.cols[4]).vector[i]);
+          }
+        }
+      }
+    }
+    double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
+    assertTrue(rowCount > 0 && rowCount <= b.getMaxSize(),
+               String.format("RowCount: %s should be between 1 and 1024", rowCount));
+    assertTrue(p <= 3, String.format("Read p: %s should be less than 3", p));
+  }
+
+  @Test
   public void readWithSArg() throws IOException {
     readStart();
     Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
@@ -153,6 +226,7 @@ public class TestRowFilteringIOSkip {
       .in("f1", PredicateLeaf.Type.LONG, 0L)
       .build();
     Reader.Options options = r.options()
+      .useSelected(true)
       .searchArgument(sarg, new String[] {"f1"});
     VectorizedRowBatch b = schema.createRowBatch();
     long rowCount;
@@ -164,6 +238,50 @@ public class TestRowFilteringIOSkip {
     assertTrue(p >= 100);
   }
 
+  @Test
+  public void readWithSArgAsFilter() throws IOException {
+    readStart();
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 0L)
+      .build();
+    Reader.Options options = r.options()
+      .searchArgument(sarg, new String[] {"f1"})
+      .useSelected(true)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = schema.createRowBatch();
+    long rowCount;
+    try (RecordReader rr = r.rows(options)) {
+      rowCount = validateFilteredRecordReader(rr, b);
+    }
+    double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
+    assertEquals(0, rowCount);
+    assertTrue(p < 30);
+  }
+
+  @Test
+  public void readWithInvalidSArgAs() throws IOException {
+    readStart();
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .end()
+      .build();
+    Reader.Options options = r.options()
+      .searchArgument(sarg, new String[] {"f1"})
+      .useSelected(true)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = schema.createRowBatch();
+    long rowCount;
+    try (RecordReader rr = r.rows(options)) {
+      rowCount = validateFilteredRecordReader(rr, b);
+    }
+    double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
+    assertEquals(RowCount, rowCount);
+    assertTrue(p > 100);
+  }
+
   private long validateFilteredRecordReader(RecordReader rr, VectorizedRowBatch b)
     throws IOException {
     long rowCount = 0;
@@ -198,12 +316,19 @@ public class TestRowFilteringIOSkip {
   }
 
   @Test
-  public void filterAllRows() throws IOException {
+  public void filterAllRowsWithFilter() throws IOException {
     readStart();
     Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    r.options();
+    filterAllRows(r,
+                  r.options()
+                    .useSelected(true)
+                    .setRowFilter(FilterColumns,
+                                  new InFilter(new HashSet<>(0), 0)));
+  }
+
+  private void filterAllRows(Reader r, Reader.Options options) throws IOException {
     VectorizedRowBatch b = schema.createRowBatch();
-    Reader.Options options = r.options()
-      .setRowFilter(FilterColumns, new InFilter(new HashSet<>(0), 0));
     long rowCount = 0;
     try (RecordReader rr = r.rows(options)) {
       while (rr.nextBatch(b)) {
@@ -226,7 +351,7 @@ public class TestRowFilteringIOSkip {
     Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
     VectorizedRowBatch b = schema.createRowBatch();
     long rowCount;
-    try (RecordReader rr = r.rows()) {
+    try (RecordReader rr = r.rows(r.options().useSelected(true))) {
       rowCount = validateFilteredRecordReader(rr, b);
     }
     double p = readPercentage(readEnd(), fs.getFileStatus(filePath).getLen());
@@ -250,6 +375,7 @@ public class TestRowFilteringIOSkip {
     VectorizedRowBatch b = schema.createRowBatch();
     long rowCount;
     try (RecordReader rr = r.rows(r.options()
+                                    .useSelected(true)
                                     .setRowFilter(FilterColumns, new AllowAllFilter()))) {
       rowCount = validateFilteredRecordReader(rr, b);
     }
@@ -264,12 +390,15 @@ public class TestRowFilteringIOSkip {
     Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
     VectorizedRowBatch b = schema.createRowBatch();
     Reader.Options options = r.options()
+      .useSelected(true)
       .setRowFilter(FilterColumns, new AlternateFilter());
     long rowCount;
     try (RecordReader rr = r.rows(options)) {
       rowCount = validateFilteredRecordReader(rr, b);
     }
-    readEnd();
+    FileSystem.Statistics stats = readEnd();
+    double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
+    assertTrue(readPercentage > 100);
     assertTrue(RowCount > rowCount);
   }
 
@@ -279,6 +408,7 @@ public class TestRowFilteringIOSkip {
     Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
     VectorizedRowBatch b = schema.createRowBatch();
     Reader.Options options = r.options()
+      .useSelected(true)
       .setRowFilter(FilterColumns, new AlternateFilter());
     long seekRow;
     try (RecordReader rr = r.rows(options)) {
@@ -323,6 +453,120 @@ public class TestRowFilteringIOSkip {
     assertTrue(readPercentage > 130);
   }
 
+  @Test
+  public void readFewRGWithSArg() throws IOException {
+    readStart();
+    Reader r = OrcFile.createReader(filePath,
+                                    OrcFile.readerOptions(conf).filesystem(fs));
+    VectorizedRowBatch b = schema.createRowBatch();
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("ridx", PredicateLeaf.Type.LONG, 0L, 1000000L, 2000000L, 3000000L)
+      .build();
+    Reader.Options options = r.options()
+      .useSelected(true)
+      .searchArgument(sarg, new String[] {"ridx"});
+
+    long rowCount;
+    try (RecordReader rr = r.rows(options)) {
+      rowCount = validateFilteredRecordReader(rr, b);
+    }
+    assertEquals(8192 * 4, rowCount);
+    FileSystem.Statistics stats = readEnd();
+    double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
+    assertTrue(readPercentage < 10);
+  }
+
+  @Test
+  public void readFewRGWithSArgAndFilter() throws IOException {
+    readStart();
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    VectorizedRowBatch b = schema.createRowBatch();
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("ridx", PredicateLeaf.Type.LONG, 0L, 1000000L, 2000000L, 3000000L)
+      .build();
+    Reader.Options options = r.options()
+      .searchArgument(sarg, new String[] {"ridx"})
+      .useSelected(true)
+      .allowSARGToFilter(true);
+
+    long rowCount;
+    try (RecordReader rr = r.rows(options)) {
+      rowCount = validateFilteredRecordReader(rr, b);
+    }
+    assertEquals(4, rowCount);
+    FileSystem.Statistics stats = readEnd();
+    double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
+    assertTrue(readPercentage < 10);
+  }
+
+  @Test
+  public void schemaEvolutionMissingFilterColumn() throws IOException {
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription readSchema = schema
+      .clone()
+      .addField("missing", TypeDescription.createLong());
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("missing", PredicateLeaf.Type.LONG)
+      .end()
+      .build();
+    Reader.Options options = r.options()
+      .schema(readSchema)
+      .searchArgument(sarg, new String[] {"missing"})
+      .useSelected(true)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = readSchema.createRowBatch();
+    long rowCount = 0;
+    try (RecordReader rr = r.rows(options)) {
+      assertFalse(rr.nextBatch(b));
+    }
+    assertEquals(0, rowCount);
+  }
+
+  @Test
+  public void schemaEvolutionLong2StringColumn() throws IOException {
+    Reader r = OrcFile.createReader(filePath, OrcFile.readerOptions(conf).filesystem(fs));
+    // Change ridx column from long to string and swap the positions of ridx and f4 columns
+    TypeDescription readSchema = TypeDescription.createStruct()
+      .addField("f1", TypeDescription.createLong())
+      .addField("f2", TypeDescription.createDecimal().withPrecision(20).withScale(6))
+      .addField("f3", TypeDescription.createLong())
+      .addField("ridx", TypeDescription.createString())
+      .addField("f4", TypeDescription.createString());
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("ridx", PredicateLeaf.Type.STRING, "1")
+      .build();
+    Reader.Options options = r.options()
+      .schema(readSchema)
+      .searchArgument(sarg, new String[] {"ridx"})
+      .useSelected(true)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = readSchema.createRowBatch();
+    long rowCount = 0;
+    try (RecordReader rr = r.rows(options)) {
+      assertTrue(rr.nextBatch(b));
+      assertEquals(1, b.size);
+      rowCount += b.size;
+      HiveDecimalWritable d = new HiveDecimalWritable();
+      int rowIdx = 1;
+      long expValue = ((LongColumnVector) b.cols[0]).vector[rowIdx];
+      d.setFromLongAndScale(expValue, scale);
+      assertEquals(d, ((DecimalColumnVector) b.cols[1]).vector[rowIdx]);
+      assertEquals(expValue, ((LongColumnVector) b.cols[2]).vector[rowIdx]);
+      // The columns ridx and f4 are swapped, which is reflected in the updated index value
+      BytesColumnVector sv = (BytesColumnVector) b.cols[4];
+      assertEquals(String.valueOf(expValue),
+                          sv.toString(rowIdx));
+      sv = (BytesColumnVector) b.cols[3];
+      assertEquals(String.valueOf(rowIdx),
+                          sv.toString(rowIdx));
+
+      assertFalse(rr.nextBatch(b));
+    }
+
+    assertEquals(1, rowCount);
+  }
+
   private void seekToRow(RecordReader rr, VectorizedRowBatch b, long row) throws IOException {
     rr.seekToRow(row);
     assertTrue(rr.nextBatch(b));
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
index a845fdc..626944c 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringNoSkip.java
@@ -50,6 +50,7 @@ public class TestRowFilteringNoSkip {
   @BeforeEach
   public void openFileSystem(TestInfo testInfo) throws Exception {
     conf = new Configuration();
+    OrcConf.READER_USE_SELECTED.setBoolean(conf, true);
     fs = FileSystem.getLocal(conf);
     testFilePath = new Path(workDir, "TestRowFilteringNoSkip." +
         testInfo.getTestMethod().get().getName() + ".orc");
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
index 309d743..2742a24 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
@@ -61,6 +61,7 @@ public class TestRowFilteringSkip {
   @BeforeEach
   public void openFileSystem(TestInfo testInfo) throws Exception {
     conf = new Configuration();
+    OrcConf.READER_USE_SELECTED.setBoolean(conf, true);
     fs = FileSystem.getLocal(conf);
     testFilePath = new Path(workDir, "TestRowFilteringSkip." +
         testInfo.getTestMethod().get().getName() + ".orc");
diff --git a/java/core/src/test/org/apache/orc/impl/filter/ATestFilter.java b/java/core/src/test/org/apache/orc/impl/filter/ATestFilter.java
new file mode 100644
index 0000000..551930d
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/ATestFilter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcFilterContextImpl;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ATestFilter {
+  protected final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createString())
+    .addField("f3", TypeDescription.createDecimal().withPrecision(38).withScale(2))
+    .addField("f4", TypeDescription.createDouble())
+    .addField("f5", TypeDescription.createTimestamp());
+  protected final OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+
+  protected final VectorizedRowBatch batch = schema.createRowBatch();
+
+  protected void setBatch(Long[] f1Values, String[] f2Values) {
+    setBatch(f1Values, f2Values, null, null, null);
+  }
+
+  protected void setBatch(Long[] f1Values,
+                          String[] f2Values,
+                          HiveDecimalWritable[] f3Values,
+                          Double[] f4Values,
+                          Timestamp[] f5Values) {
+
+    batch.reset();
+    for (int i = 0; i < f1Values.length; i++) {
+      setLong(f1Values[i], (LongColumnVector) batch.cols[0], i);
+      setString(f2Values[i], (BytesColumnVector) batch.cols[1], i);
+      if (f3Values != null) {
+        setDecimal(f3Values[i], (DecimalColumnVector) batch.cols[2], i);
+      }
+      if (f4Values != null) {
+        setDouble(f4Values[i], (DoubleColumnVector) batch.cols[3], i);
+      }
+      if (f5Values != null) {
+        setTimestamp(f5Values[i], (TimestampColumnVector) batch.cols[4], i);
+      }
+    }
+
+    batch.size = f1Values.length;
+    fc.setBatch(batch);
+  }
+
+  private void setTimestamp(Timestamp value, TimestampColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      v.getScratchTimestamp().setTime(value.getTime());
+      v.getScratchTimestamp().setNanos(value.getNanos());
+      v.setFromScratchTimestamp(idx);
+    }
+  }
+
+  private void setDouble(Double value, DoubleColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      v.vector[idx] = value;
+    }
+  }
+
+  private void setDecimal(HiveDecimalWritable value, DecimalColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      v.vector[idx] = value;
+    }
+  }
+
+  private void setString(String value, BytesColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+      v.vector[idx] = bytes;
+      v.start[idx] = 0;
+      v.length[idx] = bytes.length;
+    }
+  }
+
+  private void setLong(Long value, LongColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      v.vector[idx] = value;
+    }
+  }
+
+  protected void validateSelected(int... v) {
+    validateSelected(fc, v);
+  }
+
+  static void validateSelected(OrcFilterContext fc, int... v) {
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(v.length, fc.getSelectedSize());
+    assertArrayEquals(v, Arrays.copyOf(fc.getSelected(), v.length));
+  }
+
+  protected void validateAllSelected(int size) {
+    validateAllSelected(fc, size);
+  }
+
+  static void validateAllSelected(OrcFilterContext fc, int size) {
+    assertFalse(fc.isSelectedInUse());
+    assertEquals(size, fc.getSelectedSize());
+  }
+
+  protected void validateNoneSelected() {
+    validateNoneSelected(fc);
+  }
+
+  static void validateNoneSelected(OrcFilterContext fc) {
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(0, fc.getSelectedSize());
+  }
+
+  protected void filter(VectorFilter filter) {
+    BatchFilterFactory.create(filter, null).accept(fc);
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/FilterUtils.java b/java/core/src/test/org/apache/orc/impl/filter/FilterUtils.java
new file mode 100644
index 0000000..36ebdcc
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/FilterUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.impl.filter.leaf.TestFilters;
+
+public class FilterUtils {
+
+  public static BatchFilter createVectorFilter(SearchArgument sArg,
+                                               TypeDescription readSchema) {
+    return TestFilters.createBatchFilter(sArg,
+                                         readSchema,
+                                         OrcFile.Version.UNSTABLE_PRE_2_0,
+                                         false);
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/IsNullFilterTest.java b/java/core/src/test/org/apache/orc/impl/filter/IsNullFilterTest.java
new file mode 100644
index 0000000..515e5a3
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/IsNullFilterTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class IsNullFilterTest extends ATestFilter {
+  @Test
+  public void nullFilterTest() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .isNull("f2", PredicateLeaf.Type.STRING)
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, null},
+             new String[] {"a", "b", "c", null, "e", "f"});
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(2, 3, 5);
+  }
+
+  @Test
+  public void repeatedNullFilterTest() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .equals("f2", PredicateLeaf.Type.STRING, "c")
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .end()
+      .build();
+
+    setBatch(new Long[] {null, null, null, null, null, null},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    batch.cols[0].isRepeating = true;
+    batch.cols[0].noNulls = false;
+    batch.cols[1].isRepeating = false;
+    batch.cols[1].noNulls = false;
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateAllSelected(6);
+  }
+
+  @Test
+  public void notNullFilterTest() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .startOr()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .isNull("f2", PredicateLeaf.Type.STRING)
+      .end()
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, null},
+             new String[] {"a", "b", "c", null, "e", "f"});
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(0, 1, 4);
+  }
+
+  @Test
+  public void repeatedNotNullFilterTest() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .equals("f2", PredicateLeaf.Type.STRING, "c")
+      .startNot()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .end()
+      .end()
+      .build();
+
+    setBatch(new Long[] {null, null, null, null, null, null},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    batch.cols[0].isRepeating = true;
+    batch.cols[0].noNulls = false;
+    batch.cols[1].isRepeating = false;
+    batch.cols[1].noNulls = true;
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(2);
+  }
+
+  @Test
+  public void repeatedNotNullFilterNoNullsTest() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .equals("f2", PredicateLeaf.Type.STRING, "c")
+      .startNot()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .end()
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 1L, 1L, 1L, 1L, 1L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    batch.cols[0].isRepeating = true;
+    batch.cols[0].noNulls = true;
+    batch.cols[1].isRepeating = false;
+    batch.cols[1].noNulls = true;
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateAllSelected(6);
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/filter/TestAndFilter.java b/java/core/src/test/org/apache/orc/impl/filter/TestAndFilter.java
new file mode 100644
index 0000000..485524b
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestAndFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.impl.filter.leaf.TestFilters;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TestAndFilter extends ATestFilter {
+
+  @Test
+  public void testAndSelectsNothing() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument s = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .equals("f1", PredicateLeaf.Type.LONG, 3L)
+      .equals("f1", PredicateLeaf.Type.LONG, 4L)
+      .end()
+      .build();
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(s,
+                                                                 schema,
+                                                                 OrcFile.Version.CURRENT);
+    assertFalse(fc.isSelectedInUse());
+    f.accept(fc);
+
+    validateNoneSelected();
+  }
+
+  @Test
+  public void testANDConversion() throws FilterFactory.UnSupportedSArgException {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+      .in("f2", PredicateLeaf.Type.STRING, "a", "b", "c")
+      .end()
+      .build();
+
+    Set<String> colIds = new HashSet<>();
+    VectorFilter f = FilterFactory.createSArgFilter(sarg.getCompactExpression(),
+                                                    colIds,
+                                                    sarg.getLeaves(),
+                                                    schema,
+                                                    OrcFile.Version.CURRENT);
+    assertNotNull(f);
+    assertTrue(f instanceof AndFilter);
+    assertEquals(2, ((AndFilter) f).filters.length);
+    assertEquals(2, colIds.size());
+    assertTrue(colIds.contains("f1"));
+    assertTrue(colIds.contains("f2"));
+
+    // Setup the data such that the AND condition should not select any row
+    setBatch(
+      new Long[] {1L, 0L, 2L, 4L, 3L},
+      new String[] {"z", "a", "y", "b", "x"});
+    fc.setBatch(batch);
+
+    filter(f);
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(0, fc.getSelectedSize());
+  }
+
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/filter/TestConvFilter.java b/java/core/src/test/org/apache/orc/impl/filter/TestConvFilter.java
new file mode 100644
index 0000000..cf3b3d4
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestConvFilter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+public class TestConvFilter {
+  private final int scale = 4;
+  private final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createBoolean())
+    .addField("f2", TypeDescription.createDate())
+    .addField("f3", TypeDescription.createDecimal().withPrecision(18).withScale(scale));
+
+  private final OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+  private final VectorizedRowBatch batch = schema.createRowBatchV2();
+
+  @BeforeEach
+  public void setup() throws ParseException {
+    setBatch();
+  }
+
+  @Test
+  public void testBooleanEquals() {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f1", PredicateLeaf.Type.BOOLEAN, true)
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 0, 3, 4);
+  }
+
+  @Test
+  public void testBooleanIn() {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f1", PredicateLeaf.Type.BOOLEAN, false)
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 1, 2);
+  }
+
+  @Test
+  public void testBooleanBetween() {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .between("f1", PredicateLeaf.Type.BOOLEAN, false, true)
+      .end()
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 0, 1, 2, 3, 4);
+  }
+
+  @Test
+  public void testDateEquals() throws ParseException {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f2", PredicateLeaf.Type.DATE, date("2000-01-01"))
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 1);
+  }
+
+  @Test
+  public void testDateIn() throws ParseException {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f2", PredicateLeaf.Type.DATE, date("2000-01-01"), date("2100-06-07"))
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 1, 4);
+  }
+
+  @Test
+  public void testDateBetween() throws ParseException {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .between("f2", PredicateLeaf.Type.DATE, date("2000-01-01"), date("2100-06-07"))
+      .end()
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 1, 2, 3, 4);
+  }
+
+  @Test
+  public void testDecimalEquals() {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f3", PredicateLeaf.Type.DECIMAL, decimal(12345678))
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 2);
+  }
+
+  @Test
+  public void testDecimalIn() {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f3", PredicateLeaf.Type.DECIMAL, decimal(0), decimal(Long.MAX_VALUE / 18))
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 1, 3);
+  }
+
+  @Test
+  public void testDecimalBetween() {
+    // Equals
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .between("f3", PredicateLeaf.Type.DECIMAL, decimal(0), decimal(Long.MAX_VALUE / 18))
+      .end()
+      .build();
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+    ATestFilter.validateSelected(fc, 1, 2, 3);
+  }
+
+  protected void setBatch(Boolean[] f1Values, Date[] f2Values, HiveDecimalWritable[] f3Values) {
+    batch.reset();
+    for (int i = 0; i < f1Values.length; i++) {
+      setBoolean(f1Values[i], (LongColumnVector) batch.cols[0], i);
+      setDate(f2Values[i], (LongColumnVector) batch.cols[1], i);
+      setDecimal(f3Values[i], (LongColumnVector) batch.cols[2], i);
+    }
+
+    batch.size = f1Values.length;
+    fc.setBatch(batch);
+  }
+
+  private void setDecimal(HiveDecimalWritable value, LongColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      assert (HiveDecimalWritable.isPrecisionDecimal64(value.precision())
+              && value.scale() <= scale);
+      v.isNull[idx] = false;
+      v.vector[idx] = value.serialize64(scale);
+    }
+  }
+
+  private void setBoolean(Boolean value, LongColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      v.vector[idx] = value ? 1 : 0;
+    }
+  }
+
+  private void setDate(Date value, LongColumnVector v, int idx) {
+    if (value == null) {
+      v.noNulls = false;
+      v.isNull[idx] = true;
+    } else {
+      v.isNull[idx] = false;
+      v.vector[idx] = value.toLocalDate().toEpochDay();
+    }
+  }
+
+  private final SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
+
+  private Date date(String value) throws ParseException {
+    return new Date(fmt.parse(value).getTime());
+  }
+
+  private HiveDecimalWritable decimal(long lValue) {
+    return new HiveDecimalWritable(HiveDecimal.create(lValue, scale));
+  }
+
+  private void setBatch() throws ParseException {
+    setBatch(new Boolean[] {true, false, false, true, true, null},
+             new Date[] {
+               date("1900-01-01"),
+               date("2000-01-01"),
+               date("2000-01-02"),
+               date("2019-12-31"),
+               date("2100-06-07"),
+               null
+             },
+             new HiveDecimalWritable[] {
+               new HiveDecimalWritable(HiveDecimal.create(Long.MIN_VALUE / 9, scale)),
+               new HiveDecimalWritable(HiveDecimal.create(0, scale)),
+               new HiveDecimalWritable(HiveDecimal.create(12345678, scale)),
+               new HiveDecimalWritable(HiveDecimal.create(Long.MAX_VALUE / 18, scale)),
+               new HiveDecimalWritable(HiveDecimal.create(Long.MAX_VALUE / 9, scale)),
+               null
+             });
+
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/TestNotFilter.java b/java/core/src/test/org/apache/orc/impl/filter/TestNotFilter.java
new file mode 100644
index 0000000..236f75a
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestNotFilter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.impl.filter.leaf.TestFilters;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestNotFilter extends ATestFilter {
+
+  @Test
+  public void testUnboundedNot() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f1", PredicateLeaf.Type.LONG, 3L, 5L)
+      .end()
+      .build();
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(sArg, schema);
+    f.accept(fc.setBatch(batch));
+
+    validateSelected(0, 1, 3, 5);
+  }
+
+  @Test
+  public void testEmptyUnbounded() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f1", PredicateLeaf.Type.LONG, 7L, 8L)
+      .end()
+      .build();
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(sArg, schema);
+    f.accept(fc.setBatch(batch));
+
+    assertEquals(6, fc.getSelectedSize());
+    assertArrayEquals(new int[] {0, 1, 2, 3, 4, 5},
+                             Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+  }
+
+  @Test
+  public void testBounded() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .in("f2", PredicateLeaf.Type.STRING, "b", "c")
+      .startNot()
+      .in("f1", PredicateLeaf.Type.LONG, 2L, 8L)
+      .end()
+      .end()
+      .build();
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(sArg, schema);
+    f.accept(fc.setBatch(batch));
+
+    validateSelected(2);
+  }
+
+  @Test
+  public void testEmptyBounded() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .in("f2", PredicateLeaf.Type.STRING, "b", "c")
+      .startNot()
+      .in("f1", PredicateLeaf.Type.LONG, 7L, 8L)
+      .end()
+      .end()
+      .build();
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(sArg, schema);
+    f.accept(fc.setBatch(batch));
+
+    validateSelected(1, 2);
+  }
+
+  @Test
+  public void testNotAndPushDown() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .startAnd()
+      .equals("f1", PredicateLeaf.Type.LONG, 3L)
+      .equals("f2", PredicateLeaf.Type.STRING, "c")
+      .end()
+      .end()
+      .build();
+
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(sArg, schema);
+    f.accept(fc.setBatch(batch));
+
+    validateSelected(0, 1, 3, 4, 5);
+  }
+
+  @Test
+  public void testNotOrPushDown() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .startOr()
+      .equals("f1", PredicateLeaf.Type.LONG, 3L)
+      .equals("f2", PredicateLeaf.Type.STRING, "d")
+      .end()
+      .end()
+      .build();
+
+    Consumer<OrcFilterContext> f = TestFilters.createBatchFilter(sArg, schema);
+    f.accept(fc.setBatch(batch));
+
+    validateSelected(0, 1, 4, 5);
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/filter/TestOrFilter.java b/java/core/src/test/org/apache/orc/impl/filter/TestOrFilter.java
new file mode 100644
index 0000000..430d57c
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestOrFilter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFile;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TestOrFilter extends ATestFilter {
+  @Test
+  public void testORConversion() throws FilterFactory.UnSupportedSArgException {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+      .in("f2", PredicateLeaf.Type.STRING, "a", "b", "c")
+      .end()
+      .build();
+
+    Set<String> colIds = new HashSet<>();
+    VectorFilter f = FilterFactory.createSArgFilter(sarg.getCompactExpression(),
+                                                    colIds,
+                                                    sarg.getLeaves(),
+                                                    schema,
+                                                    OrcFile.Version.CURRENT);
+    assertNotNull(f);
+    assertTrue(f instanceof OrFilter);
+    assertEquals(2, ((OrFilter) f).filters.length);
+    assertEquals(2, colIds.size());
+    assertTrue(colIds.contains("f1"));
+    assertTrue(colIds.contains("f2"));
+
+    // Setup the data such that the OR condition should select every row
+    setBatch(
+      new Long[] {1L, 0L, 2L, 4L, 3L},
+      new String[] {"z", "a", "y", "b", "x"});
+    fc.setBatch(batch);
+    filter(f);
+    validateAllSelected(5);
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/TestSelected.java b/java/core/src/test/org/apache/orc/impl/filter/TestSelected.java
new file mode 100644
index 0000000..3572f52
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestSelected.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestSelected {
+  private final Selected src = new Selected();
+  private final Selected tgt = new Selected();
+
+  @Test
+  public void testUnionBothEmpty() {
+    // Both are empty
+    src.sel = new int[10];
+    tgt.sel = new int[10];
+    tgt.unionDisjoint(src);
+    assertArrayEquals(new int[10], tgt.sel);
+    assertEquals(0, tgt.selSize);
+  }
+
+  @Test
+  public void testUnionTgtEmpty() {
+    // tgt has no selection
+    src.sel = new int[] {1, 3, 7, 0, 0};
+    src.selSize = 3;
+    tgt.sel = new int[5];
+    tgt.selSize = 0;
+    tgt.unionDisjoint(src);
+    assertEquals(src.selSize, tgt.selSize);
+    assertArrayEquals(src.sel, tgt.sel);
+  }
+
+  @Test
+  public void testUnionSrcEmpty() {
+    // current size is zero
+    src.sel = new int[5];
+    src.selSize = 0;
+    tgt.sel = new int[] {1, 3, 7, 0, 0};
+    tgt.selSize = 3;
+    tgt.unionDisjoint(src);
+    validate(tgt, 1, 3, 7);
+  }
+
+  @Test
+  public void testUnionCurrSmallerThanAdd() {
+    // current size is zero
+    src.sel = new int[] {7, 0, 0, 0, 0};
+    src.selSize = 1;
+    tgt.sel = new int[] {1, 3, 0, 0, 0};
+    tgt.selSize = 2;
+    tgt.unionDisjoint(src);
+    validate(tgt, 1, 3, 7);
+  }
+
+  @Test
+  public void testUnionAddSmallerThanCurr() {
+    // current size is zero
+    src.sel = new int[] {1, 7, 0, 0, 0};
+    src.selSize = 2;
+    tgt.sel = new int[] {3, 0, 0, 0, 0};
+    tgt.selSize = 1;
+    tgt.unionDisjoint(src);
+    validate(tgt, 1, 3, 7);
+  }
+
+  @Test
+  public void testUnionNoChange() {
+    // current size is zero
+    src.sel = new int[] {0, 0, 0, 0, 0};
+    src.selSize = 0;
+    tgt.sel = new int[] {1, 3, 7, 0, 0};
+    tgt.selSize = 3;
+    tgt.unionDisjoint(src);
+    validate(tgt, 1, 3, 7);
+  }
+
+  @Test
+  public void testUnionNewEnclosed() {
+    // current size is zero
+    src.sel = new int[] {1, 7, 0, 0, 0};
+    src.selSize = 2;
+    tgt.sel = new int[] {3, 4, 0, 0, 0};
+    tgt.selSize = 2;
+    tgt.unionDisjoint(src);
+    validate(tgt, 1, 3, 4, 7);
+  }
+
+  @Test
+  public void testUnionPrevEnclosed() {
+    // current size is zero
+    src.sel = new int[] {3, 4, 0, 0, 0};
+    src.selSize = 2;
+    tgt.sel = new int[] {1, 7, 0, 0, 0};
+    tgt.selSize = 2;
+    tgt.unionDisjoint(src);
+    validate(tgt, 1, 3, 4, 7);
+  }
+
+  @Test
+  public void testMinus() {
+    src.sel = new int[] {3, 4, 0, 0, 0};
+    src.selSize = 2;
+    tgt.sel = new int[] {1, 7, 0, 0, 0};
+    tgt.selSize = 2;
+    tgt.minus(src);
+    validate(tgt, 1, 7);
+  }
+
+  @Test
+  public void testMinusAllElements() {
+    src.sel = new int[] {1, 7, 0, 0, 0};
+    src.selSize = 2;
+    tgt.sel = new int[] {1, 7, 0, 0, 0};
+    tgt.selSize = 2;
+    tgt.minus(src);
+    assertEquals(0, tgt.selSize);
+  }
+
+  @Test
+  public void testMinusInterleavedElements() {
+    src.sel = new int[] {1, 5, 9, 0, 0};
+    src.selSize = 3;
+    tgt.sel = new int[] {1, 3, 5, 7, 9};
+    tgt.selSize = 5;
+    tgt.minus(src);
+    validate(tgt, 3, 7);
+  }
+
+  @Test
+  public void testMinusEmpty() {
+    src.sel = new int[] {1, 5, 9, 0, 0};
+    src.selSize = 0;
+    tgt.sel = new int[] {1, 3, 5, 7, 9};
+    tgt.selSize = 5;
+    tgt.minus(src);
+    validate(tgt, 1, 3, 5, 7, 9);
+  }
+
+  @Test
+  public void testMinusSrcLarger() {
+    src.sel = new int[] {10, 50, 90, 0, 0};
+    src.selSize = 3;
+    tgt.sel = new int[] {1, 3, 5, 7, 9};
+    tgt.selSize = 5;
+    tgt.minus(src);
+    validate(tgt, 1, 3, 5, 7, 9);
+  }
+
+  @Test
+  public void testMinusSrcSmaller() {
+    tgt.sel = new int[] {10, 50, 90, 0, 0};
+    tgt.selSize = 3;
+    src.sel = new int[] {1, 3, 5, 7, 9};
+    src.selSize = 5;
+    tgt.minus(src);
+    validate(tgt, 10, 50, 90);
+  }
+
+  private void validate(Selected tgt, int... expected) {
+    assertArrayEquals(expected, Arrays.copyOf(tgt.sel, tgt.selSize));
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/ATestLeafFilter.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/ATestLeafFilter.java
new file mode 100644
index 0000000..093fcad
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/ATestLeafFilter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.filter.ATestFilter;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.sql.Timestamp;
+import java.util.stream.IntStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class ATestLeafFilter extends ATestFilter {
+  static final int lowIdx = 2;
+  static final int highIdx = 4;
+  static final int size = 6;
+
+  @BeforeEach
+  public void setup() {
+    HiveDecimalWritable[] decValues = new HiveDecimalWritable[] {
+      new HiveDecimalWritable(Long.MIN_VALUE + "100.01"),
+      new HiveDecimalWritable(0),
+      new HiveDecimalWritable(Long.MAX_VALUE + "100.01"),
+      new HiveDecimalWritable(Long.MAX_VALUE + "101.00"),
+      new HiveDecimalWritable(Long.MAX_VALUE + "101.01"),
+      null};
+    Timestamp[] tsValues = new Timestamp[] {
+      createTimestamp(-100000, 55),
+      createTimestamp(0, 0),
+      createTimestamp(0, 1),
+      createTimestamp(0, 2),
+      createTimestamp(123456, 1),
+      null
+    };
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, null},
+             new String[] {"a", "b", "c", "d", "e", null},
+             decValues,
+             new Double[] {1.01, 2.0, 2.1, 3.55, 4.0, null},
+             tsValues);
+  }
+
+  private Timestamp createTimestamp(long time, int nano) {
+    Timestamp result = new Timestamp(0);
+    result.setTime(time);
+    result.setNanos(nano);
+    return result;
+  }
+
+  protected Object getPredicateValue(PredicateLeaf.Type type, int idx) {
+    switch (type) {
+      case LONG:
+        return ((LongColumnVector) batch.cols[0]).vector[idx];
+      case STRING:
+        BytesColumnVector bv = (BytesColumnVector) batch.cols[1];
+        return new String(bv.vector[idx], bv.start[idx], bv.length[idx], UTF_8);
+      case DECIMAL:
+        return ((DecimalColumnVector) batch.cols[2]).vector[idx];
+      case FLOAT:
+        return ((DoubleColumnVector) batch.cols[3]).vector[idx];
+      case TIMESTAMP:
+        TimestampColumnVector tv = (TimestampColumnVector) batch.cols[4];
+        Timestamp value = new Timestamp(0);
+        value.setTime(tv.time[idx]);
+        value.setNanos(tv.nanos[idx]);
+        return value;
+      default:
+        throw new IllegalArgumentException(String.format("Type: %s is unsupported", type));
+    }
+  }
+
+  protected void validateSelected(PredicateLeaf.Operator op, boolean not) {
+    // Except for IS_NULL restrict the range to size - 1 as the last element is a null
+    switch (op) {
+      case EQUALS:
+        validateSelected(IntStream.range(0, size - 1)
+                           .filter(i -> not ^ (i == lowIdx))
+                           .toArray());
+        break;
+      case LESS_THAN:
+        validateSelected(IntStream.range(0, size - 1)
+                           .filter(i -> not ^ (i < lowIdx))
+                           .toArray());
+        break;
+      case LESS_THAN_EQUALS:
+        validateSelected(IntStream.range(0, size - 1)
+                           .filter(i -> not ^ (i <= lowIdx))
+                           .toArray());
+        break;
+      case IN:
+        validateSelected(IntStream.range(0, size - 1)
+                           .filter(i -> not ^ (i == lowIdx || i == highIdx))
+                           .toArray());
+        break;
+      case BETWEEN:
+        validateSelected(IntStream.range(0, size - 1)
+                           .filter(i -> not ^ (i >= lowIdx && i <= highIdx))
+                           .toArray());
+        break;
+      case IS_NULL:
+        validateSelected(IntStream.range(0, size)
+                           .filter(i -> not ^ (i == 5))
+                           .toArray());
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestDecimalFilters.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestDecimalFilters.java
new file mode 100644
index 0000000..b9c7354
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestDecimalFilters.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.impl.filter.FilterUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestDecimalFilters extends ATestLeafFilter {
+
+  @Test
+    public void testEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f3", PredicateLeaf.Type.DECIMAL, getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx))
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, false);
+  }
+
+  @Test
+    public void testNotEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .equals("f3", PredicateLeaf.Type.DECIMAL, getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx))
+      .end()
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, true);
+  }
+
+  @Test
+    public void testLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThan("f3", PredicateLeaf.Type.DECIMAL, getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx))
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, false);
+  }
+
+  @Test
+    public void testNotLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThan("f3", PredicateLeaf.Type.DECIMAL, getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx))
+      .end()
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, true);
+  }
+
+  @Test
+    public void testLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThanEquals("f3", PredicateLeaf.Type.DECIMAL, getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx))
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+  }
+
+  @Test
+    public void testNotLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThanEquals("f3", PredicateLeaf.Type.DECIMAL, getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx))
+      .end()
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+  }
+
+  @Test
+    public void testBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .between("f3", PredicateLeaf.Type.DECIMAL,
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, highIdx))
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, false);
+  }
+
+  @Test
+    public void testNotBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .between("f3", PredicateLeaf.Type.DECIMAL,
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, highIdx))
+      .end()
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, true);
+  }
+
+  @Test
+    public void testIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f3", PredicateLeaf.Type.DECIMAL,
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, highIdx))
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, false);
+  }
+
+  @Test
+    public void testNotIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f3", PredicateLeaf.Type.DECIMAL,
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.DECIMAL, highIdx))
+      .end()
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, true);
+  }
+
+  @Test
+    public void testIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .isNull("f3", PredicateLeaf.Type.DECIMAL)
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, false);
+  }
+
+  @Test
+    public void testNotIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("f3", PredicateLeaf.Type.DECIMAL)
+      .end()
+      .build();
+    Assertions.assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, true);
+  }
+
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestEquals.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestEquals.java
new file mode 100644
index 0000000..7ab22e5
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestEquals.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.orc.impl.filter.VectorFilter;
+import org.apache.orc.impl.filter.ATestFilter;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestEquals extends ATestFilter {
+
+  @Test
+  public void testFoundMatching() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    VectorFilter f = new LongFilters.LongEquals("f1", 3L, false);
+    assertFalse(fc.isSelectedInUse());
+    filter(f);
+
+    validateSelected(2);
+  }
+
+  @Test
+  public void testNothingFound() {
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, null},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    VectorFilter f = new LongFilters.LongEquals("f1", 8L, false);
+    assertFalse(fc.isSelectedInUse());
+    filter(f);
+
+    validateNoneSelected();
+  }
+
+  @Test
+  public void testRepeatingVector() {
+    setBatch(new Long[] {1L, null, null, null, null, null},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    fc.getCols()[0].isRepeating = true;
+    VectorFilter f = new LongFilters.LongEquals("f1", 1L, false);
+    filter(f);
+    validateAllSelected(6);
+  }
+
+  @Test
+  public void testRepeatingNull() {
+    setBatch(new Long[] {null, null, null, null, null, null},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    fc.getCols()[0].isRepeating = true;
+    VectorFilter f = new LongFilters.LongEquals("f1", 1L, false);
+    filter(f);
+    validateNoneSelected();
+  }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java
new file mode 100644
index 0000000..a934c13
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.filter.VectorFilter;
+import org.apache.orc.impl.filter.ATestFilter;
+import org.apache.orc.impl.filter.AndFilter;
+import org.apache.orc.impl.filter.FilterUtils;
+import org.apache.orc.impl.filter.OrFilter;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TestFilters extends ATestFilter {
+
+  public static BatchFilter createBatchFilter(SearchArgument sArg,
+                                              TypeDescription readSchema) {
+    return createBatchFilter(sArg, readSchema, OrcFile.Version.UNSTABLE_PRE_2_0);
+  }
+
+  public static BatchFilter createBatchFilter(SearchArgument sArg,
+                                              TypeDescription readSchema,
+                                              OrcFile.Version version) {
+    return createBatchFilter(sArg, readSchema, version, false);
+  }
+
+  public static BatchFilter createBatchFilter(SearchArgument sArg,
+                                              TypeDescription readSchema,
+                                              OrcFile.Version version,
+                                              boolean normalize) {
+    Reader.Options options = new Reader.Options().allowSARGToFilter(true);
+    options.searchArgument(sArg, new String[0]);
+    return FilterFactory.createBatchFilter(options, readSchema, version, normalize);
+  }
+
+  @Test
+  public void testAndOfOr() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .startOr()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 6L)
+      .in("f1", PredicateLeaf.Type.LONG, 3L, 4L)
+      .end()
+      .startOr()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 6L)
+      .in("f2", PredicateLeaf.Type.STRING, "c", "e")
+      .end()
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+
+    BatchFilter filter = FilterUtils.createVectorFilter(sArg, schema);
+    filter.accept(fc);
+    assertArrayEquals(new String[] {"f1", "f2"}, filter.getColumnNames());
+    validateSelected(0, 2, 5);
+  }
+
+  @Test
+  public void testOrOfAnd() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .startAnd()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 6L)
+      .in("f2", PredicateLeaf.Type.STRING, "a", "c")
+      .end()
+      .startAnd()
+      .in("f1", PredicateLeaf.Type.LONG, 3L, 4L)
+      .in("f2", PredicateLeaf.Type.STRING, "c", "e")
+      .end()
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc.setBatch(batch));
+    validateSelected(0, 2);
+  }
+
+  @Test
+  public void testOrOfAndNative() {
+    VectorFilter f = new OrFilter(
+      new VectorFilter[] {
+        new AndFilter(new VectorFilter[] {
+          new LongFilters.LongIn("f1",
+                                 Arrays.asList(1L, 6L), false),
+          new StringFilters.StringIn("f2",
+                                     Arrays.asList("a", "c"), false)
+        }),
+        new AndFilter(new VectorFilter[] {
+          new LongFilters.LongIn("f1",
+                                 Arrays.asList(3L, 4L), false),
+          new StringFilters.StringIn("f2",
+                                     Arrays.asList("c", "e"), false)
+        })
+      }
+    );
+
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+
+    filter(f);
+    assertEquals(2, fc.getSelectedSize());
+    assertArrayEquals(new int[] {0, 2},
+                      Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+  }
+
+  @Test
+  public void testAndNotNot() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .startNot()
+      .in("f1", PredicateLeaf.Type.LONG, 7L)
+      .end()
+      .startNot()
+      .isNull("f2", PredicateLeaf.Type.STRING)
+      .end()
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, 3L, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+
+    Consumer<OrcFilterContext> filter = createBatchFilter(sArg, schema);
+    filter.accept(fc.setBatch(batch));
+    assertEquals(6, fc.getSelectedSize());
+    assertArrayEquals(new int[] {0, 1, 2, 3, 4, 5},
+                      Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+  }
+
+  @Test
+  public void testUnSupportedSArg() {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .nullSafeEquals("f1", PredicateLeaf.Type.LONG, 0L)
+      .build();
+
+    assertNull(FilterUtils.createVectorFilter(sarg, schema));
+  }
+
+  @Test
+  public void testRepeatedProtected() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .in("f2", PredicateLeaf.Type.STRING, "a", "d")
+      .lessThan("f1", PredicateLeaf.Type.LONG, 6L)
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 1L, 1L, 1L, 1L, 1L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    batch.cols[0].isRepeating = true;
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc.setBatch(batch));
+    validateAllSelected(6);
+  }
+
+  @Test
+  public void testNullProtected() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .in("f2", PredicateLeaf.Type.STRING, "a", "d")
+      .lessThan("f1", PredicateLeaf.Type.LONG, 4L)
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc.setBatch(batch));
+    validateSelected(0, 1, 3);
+  }
+
+  @Test
+  public void testUnsupportedNotLeaf() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .nullSafeEquals("f1", PredicateLeaf.Type.LONG, 2L)
+      .end()
+      .build();
+
+    assertNull(FilterUtils.createVectorFilter(sArg, schema));
+  }
+
+  @Test
+  public void testAndOrAnd() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .startOr()
+      .lessThan("f1", PredicateLeaf.Type.LONG, 3L)
+      .startAnd()
+      .equals("f2", PredicateLeaf.Type.STRING, "a")
+      .equals("f1", PredicateLeaf.Type.LONG, 5L)
+      .end()
+      .end()
+      .in("f2", PredicateLeaf.Type.STRING, "a", "c")
+      .end()
+      .build();
+
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"a", "b", "c", "d", "e", "f"});
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc.setBatch(batch));
+    validateSelected(0);
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFloatFilters.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFloatFilters.java
new file mode 100644
index 0000000..77cf23e
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFloatFilters.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.impl.filter.FilterUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestFloatFilters extends ATestLeafFilter {
+
+  @Test
+    public void testEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f4", PredicateLeaf.Type.FLOAT, getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, false);
+  }
+
+  @Test
+    public void testNotEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .equals("f4", PredicateLeaf.Type.FLOAT, getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, true);
+  }
+
+  @Test
+    public void testLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThan("f4", PredicateLeaf.Type.FLOAT, getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, false);
+  }
+
+  @Test
+    public void testNotLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThan("f4", PredicateLeaf.Type.FLOAT, getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, true);
+  }
+
+  @Test
+    public void testLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThanEquals("f4", PredicateLeaf.Type.FLOAT, getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+  }
+
+  @Test
+    public void testNotLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThanEquals("f4", PredicateLeaf.Type.FLOAT, getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+  }
+
+  @Test
+    public void testBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .between("f4", PredicateLeaf.Type.FLOAT,
+          getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.FLOAT, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, false);
+  }
+
+  @Test
+    public void testNotBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .between("f4", PredicateLeaf.Type.FLOAT,
+          getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.FLOAT, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, true);
+  }
+
+  @Test
+    public void testIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f4", PredicateLeaf.Type.FLOAT,
+          getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.FLOAT, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, false);
+  }
+
+  @Test
+    public void testNotIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f4", PredicateLeaf.Type.FLOAT,
+          getPredicateValue(PredicateLeaf.Type.FLOAT, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.FLOAT, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, true);
+  }
+
+  @Test
+    public void testIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .isNull("f4", PredicateLeaf.Type.FLOAT)
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, false);
+  }
+
+  @Test
+    public void testNotIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("f4", PredicateLeaf.Type.FLOAT)
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, true);
+  }
+
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestLongFilters.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestLongFilters.java
new file mode 100644
index 0000000..abe9326
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestLongFilters.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.impl.filter.FilterUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestLongFilters extends ATestLeafFilter {
+
+  @Test
+    public void testEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f1", PredicateLeaf.Type.LONG, getPredicateValue(PredicateLeaf.Type.LONG, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, false);
+  }
+
+  @Test
+    public void testNotEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .equals("f1", PredicateLeaf.Type.LONG, getPredicateValue(PredicateLeaf.Type.LONG, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, true);
+  }
+
+  @Test
+    public void testLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThan("f1", PredicateLeaf.Type.LONG, getPredicateValue(PredicateLeaf.Type.LONG, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, false);
+  }
+
+  @Test
+    public void testNotLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThan("f1", PredicateLeaf.Type.LONG, getPredicateValue(PredicateLeaf.Type.LONG, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, true);
+  }
+
+  @Test
+    public void testLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThanEquals("f1", PredicateLeaf.Type.LONG, getPredicateValue(PredicateLeaf.Type.LONG, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+  }
+
+  @Test
+    public void testNotLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThanEquals("f1", PredicateLeaf.Type.LONG, getPredicateValue(PredicateLeaf.Type.LONG, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+  }
+
+  @Test
+    public void testBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .between("f1", PredicateLeaf.Type.LONG,
+          getPredicateValue(PredicateLeaf.Type.LONG, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.LONG, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, false);
+  }
+
+  @Test
+    public void testNotBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .between("f1", PredicateLeaf.Type.LONG,
+          getPredicateValue(PredicateLeaf.Type.LONG, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.LONG, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, true);
+  }
+
+  @Test
+    public void testIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG,
+          getPredicateValue(PredicateLeaf.Type.LONG, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.LONG, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, false);
+  }
+
+  @Test
+    public void testNotIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f1", PredicateLeaf.Type.LONG,
+          getPredicateValue(PredicateLeaf.Type.LONG, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.LONG, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, true);
+  }
+
+  @Test
+    public void testIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, false);
+  }
+
+  @Test
+    public void testNotIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("f1", PredicateLeaf.Type.LONG)
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, true);
+  }
+
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestStringFilters.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestStringFilters.java
new file mode 100644
index 0000000..1181c40
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestStringFilters.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.impl.filter.FilterUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestStringFilters extends ATestLeafFilter {
+
+  @Test
+    public void testEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f2", PredicateLeaf.Type.STRING, getPredicateValue(PredicateLeaf.Type.STRING, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, false);
+  }
+
+  @Test
+    public void testNotEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .equals("f2", PredicateLeaf.Type.STRING, getPredicateValue(PredicateLeaf.Type.STRING, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, true);
+  }
+
+  @Test
+    public void testLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThan("f2", PredicateLeaf.Type.STRING, getPredicateValue(PredicateLeaf.Type.STRING, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, false);
+  }
+
+  @Test
+    public void testNotLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThan("f2", PredicateLeaf.Type.STRING, getPredicateValue(PredicateLeaf.Type.STRING, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, true);
+  }
+
+  @Test
+    public void testLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThanEquals("f2", PredicateLeaf.Type.STRING, getPredicateValue(PredicateLeaf.Type.STRING, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+  }
+
+  @Test
+    public void testNotLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThanEquals("f2", PredicateLeaf.Type.STRING, getPredicateValue(PredicateLeaf.Type.STRING, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+  }
+
+  @Test
+    public void testBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .between("f2", PredicateLeaf.Type.STRING,
+          getPredicateValue(PredicateLeaf.Type.STRING, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.STRING, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, false);
+  }
+
+  @Test
+    public void testNotBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .between("f2", PredicateLeaf.Type.STRING,
+          getPredicateValue(PredicateLeaf.Type.STRING, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.STRING, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, true);
+  }
+
+  @Test
+    public void testIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f2", PredicateLeaf.Type.STRING,
+          getPredicateValue(PredicateLeaf.Type.STRING, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.STRING, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, false);
+  }
+
+  @Test
+    public void testNotIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f2", PredicateLeaf.Type.STRING,
+          getPredicateValue(PredicateLeaf.Type.STRING, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.STRING, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, true);
+  }
+
+  @Test
+    public void testIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .isNull("f2", PredicateLeaf.Type.STRING)
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, false);
+  }
+
+  @Test
+    public void testNotIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("f2", PredicateLeaf.Type.STRING)
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, true);
+  }
+
+}
diff --git a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestTimestampFilters.java b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestTimestampFilters.java
new file mode 100644
index 0000000..d4820c8
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestTimestampFilters.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter.leaf;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.impl.filter.FilterUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestTimestampFilters extends ATestLeafFilter {
+
+  @Test
+    public void testEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .equals("f5", PredicateLeaf.Type.TIMESTAMP, getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, false);
+  }
+
+  @Test
+    public void testNotEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .equals("f5", PredicateLeaf.Type.TIMESTAMP, getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.EQUALS, true);
+  }
+
+  @Test
+    public void testLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThan("f5", PredicateLeaf.Type.TIMESTAMP, getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, false);
+  }
+
+  @Test
+    public void testNotLessThan() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThan("f5", PredicateLeaf.Type.TIMESTAMP, getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN, true);
+  }
+
+  @Test
+    public void testLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .lessThanEquals("f5", PredicateLeaf.Type.TIMESTAMP, getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+  }
+
+  @Test
+    public void testNotLessThanEquals() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .lessThanEquals("f5", PredicateLeaf.Type.TIMESTAMP, getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+  }
+
+  @Test
+    public void testBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .between("f5", PredicateLeaf.Type.TIMESTAMP,
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, false);
+  }
+
+  @Test
+    public void testNotBetween() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .between("f5", PredicateLeaf.Type.TIMESTAMP,
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.BETWEEN, true);
+  }
+
+  @Test
+    public void testIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f5", PredicateLeaf.Type.TIMESTAMP,
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, highIdx))
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, false);
+  }
+
+  @Test
+    public void testNotIn() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .in("f5", PredicateLeaf.Type.TIMESTAMP,
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, lowIdx),
+          getPredicateValue(PredicateLeaf.Type.TIMESTAMP, highIdx))
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IN, true);
+  }
+
+  @Test
+    public void testIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .isNull("f5", PredicateLeaf.Type.TIMESTAMP)
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, false);
+  }
+
+  @Test
+    public void testNotIsNull() {
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startNot()
+      .isNull("f5", PredicateLeaf.Type.TIMESTAMP)
+      .end()
+      .build();
+    assertFalse(fc.isSelectedInUse());
+    FilterUtils.createVectorFilter(sArg, schema).accept(fc);
+
+    validateSelected(PredicateLeaf.Operator.IS_NULL, true);
+  }
+
+}
diff --git a/java/core/src/test/org/apache/orc/util/CuckooSetBytesTest.java b/java/core/src/test/org/apache/orc/util/CuckooSetBytesTest.java
new file mode 100644
index 0000000..68ddd17
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/util/CuckooSetBytesTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class CuckooSetBytesTest {
+
+  // maximum table size
+  private static final int MAX_SIZE = 65437;
+
+  @Test
+  public void testSetBytes() {
+    String[] strings = {"foo", "bar", "baz", "a", "", "x1341", "Z"};
+    String[] negativeStrings = {"not", "in", "the", "set", "foobar"};
+    byte[][] values = getByteArrays(strings);
+    byte[][] negatives = getByteArrays(negativeStrings);
+
+    // load set
+    CuckooSetBytes s = new CuckooSetBytes(strings.length);
+    for(byte[] v : values) {
+      s.insert(v);
+    }
+
+    // test that the values we added are there
+    for(byte[] v : values) {
+      assertTrue(s.lookup(v, 0, v.length));
+    }
+
+    // test that values that we know are missing are shown to be absent
+    for (byte[] v : negatives) {
+      assertFalse(s.lookup(v, 0, v.length));
+    }
+
+    // Test that we can search correctly using a buffer and pulling
+    // a sequence of bytes out of the middle of it. In this case it
+    // is the 3 letter sequence "foo".
+    byte[] buf = getUTF8Bytes("thewordfooisinhere");
+    assertTrue(s.lookup(buf, 7, 3));
+  }
+
+  @Test
+  public void testSetBytesLargeRandom() {
+    byte[][] values;
+    Random gen = new Random(98763537);
+    for(int i = 0; i < 200;) {
+
+      // Make a random array of byte arrays
+      int size = gen.nextInt() % MAX_SIZE;
+      if (size <= 0) {   // ensure size is >= 1, otherwise try again
+        continue;
+      }
+      i++;
+      values = new byte[size][];
+      loadRandomBytes(values, gen);
+
+      // load them into a set
+      CuckooSetBytes s = new CuckooSetBytes(size);
+      loadSet(s, values);
+
+      // look them up to make sure they are all there
+      for (int j = 0; j != size; j++) {
+        assertTrue(s.lookup(values[j], 0, values[j].length));
+      }
+    }
+  }
+
+  public void loadRandomBytes(byte[][] values, Random gen) {
+    for (int i = 0; i != values.length; i++) {
+      values[i] = getUTF8Bytes(Integer.toString(gen.nextInt()));
+    }
+  }
+
+  private byte[] getUTF8Bytes(String s) {
+    byte[] v = null;
+    try {
+      v = s.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception e) {
+      ; // won't happen
+    }
+    return v;
+  }
+
+  // Get an array of UTF-8 byte arrays from an array of strings
+  private byte[][] getByteArrays(String[] strings) {
+    byte[][] values = new byte[strings.length][];
+    for(int i = 0; i != strings.length; i++) {
+      try {
+        values[i] = strings[i].getBytes(StandardCharsets.UTF_8);
+      } catch (Exception e) {
+        ; // can't happen
+      }
+    }
+    return values;
+  }
+
+  private void loadSet(CuckooSetBytes s, byte[][] values) {
+    for (byte[] v: values) {
+      s.insert(v);
+    }
+  }
+
+}
diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml
index 53ffec0..477ace9 100644
--- a/java/mapreduce/pom.xml
+++ b/java/mapreduce/pom.xml
@@ -67,6 +67,10 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-storage-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
 
     <!-- test inter-project -->
     <dependency>
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
index 69af5e2..34f4753 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
@@ -153,8 +153,10 @@ public class OrcInputFormat<V extends WritableComparable>
     Reader file = OrcFile.createReader(split.getPath(),
         OrcFile.readerOptions(conf)
             .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
-    return new OrcMapredRecordReader<>(file, buildOptions(conf,
-        file, split.getStart(), split.getLength()));
+    //Mapreduce supports selected vector
+    Reader.Options options =  buildOptions(conf, file, split.getStart(), split.getLength())
+      .useSelected(true);
+    return new OrcMapredRecordReader<>(file, options);
   }
 
   /**
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
index 223a7a8..f5c1906 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -104,16 +104,17 @@ public class OrcMapredRecordReader<V extends WritableComparable>
     if (!ensureBatch()) {
       return false;
     }
+    int rowIdx = batch.selectedInUse ? batch.selected[rowInBatch] : rowInBatch;
     if (schema.getCategory() == TypeDescription.Category.STRUCT) {
       OrcStruct result = (OrcStruct) value;
       List<TypeDescription> children = schema.getChildren();
       int numberOfChildren = children.size();
       for(int i=0; i < numberOfChildren; ++i) {
-        result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
+        result.setFieldValue(i, nextValue(batch.cols[i], rowIdx,
             children.get(i), result.getFieldValue(i)));
       }
     } else {
-      nextValue(batch.cols[0], rowInBatch, schema, value);
+      nextValue(batch.cols[0], rowIdx, schema, value);
     }
     rowInBatch += 1;
     return true;
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
index bfd48e2..16d705f 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
@@ -68,9 +68,14 @@ public class OrcInputFormat<V extends WritableComparable>
     Reader file = OrcFile.createReader(split.getPath(),
         OrcFile.readerOptions(conf)
             .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
-    return new OrcMapreduceRecordReader<>(file,
-        org.apache.orc.mapred.OrcInputFormat.buildOptions(conf,
-            file, split.getStart(), split.getLength()));
+    //Mapreduce supports selected vector
+    Reader.Options options =
+      org.apache.orc.mapred.OrcInputFormat.buildOptions(conf,
+                                                        file,
+                                                        split.getStart(),
+                                                        split.getLength())
+      .useSelected(true);
+    return new OrcMapreduceRecordReader<>(file, options);
   }
 
   @Override
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
index 9d11caa..d2937dc 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
@@ -102,16 +102,17 @@ public class OrcMapreduceRecordReader<V extends WritableComparable>
     if (!ensureBatch()) {
       return false;
     }
+    int rowIdx = batch.selectedInUse ? batch.selected[rowInBatch] : rowInBatch;
     if (schema.getCategory() == TypeDescription.Category.STRUCT) {
       OrcStruct result = (OrcStruct) row;
       List<TypeDescription> children = schema.getChildren();
       int numberOfChildren = children.size();
       for(int i=0; i < numberOfChildren; ++i) {
-        result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], rowInBatch,
+        result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], rowIdx,
             children.get(i), result.getFieldValue(i)));
       }
     } else {
-      OrcMapredRecordReader.nextValue(batch.cols[0], rowInBatch, schema, row);
+      OrcMapredRecordReader.nextValue(batch.cols[0], rowIdx, schema, row);
     }
     rowInBatch += 1;
     return true;
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
new file mode 100644
index 0000000..30462bf
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMapRedFiltering.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.orc.OrcConf;
+import org.apache.orc.mapreduce.FilterTestUtil;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.orc.mapreduce.FilterTestUtil.RowCount;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class TestMapRedFiltering {
+  private static final Path workDir = new Path(System.getProperty("test.tmp.dir",
+                                                                  "target" + File.separator + "test"
+                                                                  + File.separator + "tmp"));
+
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static final Path filePath = new Path(workDir, "mapred_skip_file.orc");
+
+  @BeforeAll
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    fs = FileSystem.get(conf);
+    FilterTestUtil.createFile(conf, fs, filePath);
+  }
+
+  @Test
+  public void readWithSArg() throws IOException, InterruptedException {
+    OrcConf.ALLOW_SARG_TO_FILTER.setBoolean(conf, false);
+    OrcConf.INCLUDE_COLUMNS.setString(conf, "0,1,2,3,4");
+    OrcInputFormat.setSearchArgument(conf,
+                                     SearchArgumentFactory.newBuilder()
+                                       .in("f1", PredicateLeaf.Type.LONG, 0L)
+                                       .build(),
+                                     new String[] {"f1"});
+    FileSplit split = new FileSplit(filePath,
+                                    0, fs.getFileStatus(filePath).getLen(),
+                                    new String[0]);
+    FilterTestUtil.readStart();
+    RecordReader<NullWritable, OrcStruct> r = new OrcInputFormat<OrcStruct>()
+      .getRecordReader(split, new JobConf(conf), null);
+    long rowCount = validateFilteredRecordReader(r);
+    double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
+                                             fs.getFileStatus(filePath).getLen());
+    assertEquals(FilterTestUtil.RowCount, rowCount);
+    assertTrue(p >= 100);
+  }
+
+  @Test
+  public void readWithSArgAsFilter() throws IOException {
+    OrcConf.ALLOW_SARG_TO_FILTER.setBoolean(conf, true);
+    OrcConf.INCLUDE_COLUMNS.setString(conf, "0,1,2,3,4");
+    OrcInputFormat.setSearchArgument(conf,
+                                     SearchArgumentFactory.newBuilder()
+                                       .in("f1", PredicateLeaf.Type.LONG, 0L)
+                                       .build(),
+                                     new String[] {"f1"});
+    FileSplit split = new FileSplit(filePath,
+                                    0, fs.getFileStatus(filePath).getLen(),
+                                    new String[0]);
+    FilterTestUtil.readStart();
+    RecordReader<NullWritable, OrcStruct> r = new OrcInputFormat<OrcStruct>()
+      .getRecordReader(split, new JobConf(conf), null);
+    long rowCount = validateFilteredRecordReader(r);
+    double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
+                                             fs.getFileStatus(filePath).getLen());
+    assertEquals(0, rowCount);
+    assertTrue(p < 30);
+  }
+
+  @Test
+  public void readSingleRowWFilter() throws IOException, InterruptedException {
+    int cnt = 100;
+    Random r = new Random(cnt);
+    long ridx = 0;
+
+    while (cnt > 0) {
+      ridx = r.nextInt((int) RowCount);
+      readSingleRowWfilter(ridx);
+      cnt -= 1;
+    }
+
+  }
+
+  private static long validateFilteredRecordReader(RecordReader<NullWritable, OrcStruct> rr)
+    throws IOException {
+    OrcStruct row = new OrcStruct(FilterTestUtil.schema);
+    long rowCount = 0;
+    while (rr.next(NullWritable.get(), row)) {
+      FilterTestUtil.validateRow(row);
+      rowCount += 1;
+    }
+    return rowCount;
+  }
+
+  private void readSingleRowWfilter(long idx) throws IOException, InterruptedException {
+    OrcConf.ALLOW_SARG_TO_FILTER.setBoolean(conf, true);
+    OrcInputFormat.setSearchArgument(conf,
+                                     SearchArgumentFactory.newBuilder()
+                                       .in("ridx", PredicateLeaf.Type.LONG, idx)
+                                       .build(),
+                                     new String[] {"ridx"});
+    OrcConf.INCLUDE_COLUMNS.setString(conf, "0,1,2,4");
+    FileSplit split = new FileSplit(filePath,
+                                    0, fs.getFileStatus(filePath).getLen(),
+                                    new String[0]);
+    FilterTestUtil.readStart();
+    RecordReader<NullWritable, OrcStruct> r = new OrcInputFormat<OrcStruct>()
+      .getRecordReader(split, new JobConf(conf), null);
+    OrcStruct row = new OrcStruct(FilterTestUtil.schema);
+    long rowCount = 0;
+    while (r.next(NullWritable.get(), row)) {
+      FilterTestUtil.validateLimitedRow(row, idx);
+      rowCount += 1;
+    }
+    assertEquals(1, rowCount);
+    r.close();
+  }
+}
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/FilterTestUtil.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/FilterTestUtil.java
new file mode 100644
index 0000000..262b47e
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/FilterTestUtil.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcStruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FilterTestUtil {
+  private final static Logger LOG = LoggerFactory.getLogger(FilterTestUtil.class);
+  public static final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createDecimal().withPrecision(20).withScale(6))
+    .addField("f3", TypeDescription.createLong())
+    .addField("f4", TypeDescription.createString())
+    .addField("ridx", TypeDescription.createLong());
+  public static final long RowCount = 4000000L;
+  private static final int scale = 3;
+
+  public static void createFile(Configuration conf, FileSystem fs, Path filePath)
+    throws IOException {
+    if (fs.exists(filePath)) {
+      return;
+    }
+
+    LOG.info("Creating file {} with schema {}", filePath, schema);
+    try (Writer writer = OrcFile.createWriter(filePath,
+                                              OrcFile.writerOptions(conf)
+                                                .fileSystem(fs)
+                                                .overwrite(true)
+                                                .rowIndexStride(8192)
+                                                .setSchema(schema))) {
+      Random rnd = new Random(1024);
+      VectorizedRowBatch b = schema.createRowBatch();
+      for (int rowIdx = 0; rowIdx < RowCount; rowIdx++) {
+        long v = rnd.nextLong();
+        for (int colIdx = 0; colIdx < schema.getChildren().size() - 1; colIdx++) {
+          switch (schema.getChildren().get(colIdx).getCategory()) {
+            case LONG:
+              ((LongColumnVector) b.cols[colIdx]).vector[b.size] = v;
+              break;
+            case DECIMAL:
+              HiveDecimalWritable d = new HiveDecimalWritable();
+              d.setFromLongAndScale(v, scale);
+              ((DecimalColumnVector) b.cols[colIdx]).vector[b.size] = d;
+              break;
+            case STRING:
+              ((BytesColumnVector) b.cols[colIdx]).setVal(b.size,
+                                                          String.valueOf(v)
+                                                            .getBytes(StandardCharsets.UTF_8));
+              break;
+            default:
+              throw new IllegalArgumentException();
+          }
+        }
+        // Populate the rowIdx
+        ((LongColumnVector) b.cols[4]).vector[b.size] = rowIdx;
+
+        b.size += 1;
+        if (b.size == b.getMaxSize()) {
+          writer.addRowBatch(b);
+          b.reset();
+        }
+      }
+      if (b.size > 0) {
+        writer.addRowBatch(b);
+        b.reset();
+      }
+    }
+    LOG.info("Created file {}", filePath);
+  }
+
+  public static void validateRow(OrcStruct row, long expId) {
+    HiveDecimalWritable d = new HiveDecimalWritable();
+
+    if (expId > 0) {
+      assertEquals(expId, ((LongWritable) row.getFieldValue(4)).get());
+    }
+    for (int i = 0; i < row.getNumFields(); i++) {
+      long expValue = ((LongWritable) row.getFieldValue(0)).get();
+      d.setFromLongAndScale(expValue, scale);
+      assertEquals(d, row.getFieldValue(1));
+      assertEquals(expValue, ((LongWritable) row.getFieldValue(2)).get());
+      assertEquals(String.valueOf(expValue),
+                          row.getFieldValue(3).toString());
+    }
+  }
+
+  public static void validateLimitedRow(OrcStruct row, long expId) {
+    HiveDecimalWritable d = new HiveDecimalWritable();
+
+    if (expId > 0) {
+      assertEquals(expId, ((LongWritable) row.getFieldValue(4)).get());
+    }
+    for (int i = 0; i < row.getNumFields(); i++) {
+      long expValue = ((LongWritable) row.getFieldValue(0)).get();
+      d.setFromLongAndScale(expValue, scale);
+      assertEquals(d, row.getFieldValue(1));
+      assertEquals(expValue, ((LongWritable) row.getFieldValue(2)).get());
+    }
+  }
+
+  public static void validateRow(OrcStruct row) {
+    validateRow(row, -1);
+  }
+
+  public static double readPercentage(FileSystem.Statistics stats, long fileSize) {
+    double p = stats.getBytesRead() * 100.0 / fileSize;
+    LOG.info(String.format("FileSize: %d%nReadSize: %d%nRead %%: %.2f",
+                           fileSize,
+                           stats.getBytesRead(),
+                           p));
+    return p;
+  }
+
+  public static void readStart() {
+    FileSystem.clearStatistics();
+  }
+
+  public static FileSystem.Statistics readEnd() {
+    return FileSystem.getAllStatistics().get(0);
+  }
+}
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
new file mode 100644
index 0000000..f8bbf1f
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapReduceFiltering.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.orc.OrcConf;
+import org.apache.orc.mapred.OrcStruct;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.orc.mapreduce.FilterTestUtil.RowCount;
+import static org.apache.orc.mapreduce.FilterTestUtil.validateLimitedRow;
+import static org.apache.orc.mapreduce.FilterTestUtil.validateRow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class TestMapReduceFiltering {
+  private static final Path workDir = new Path(System.getProperty("test.tmp.dir",
+                                                                  "target" + File.separator + "test"
+                                                                  + File.separator + "tmp"));
+
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static final Path filePath = new Path(workDir, "mapreduce_skip_file.orc");
+
+  @BeforeAll
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    fs = FileSystem.get(conf);
+    FilterTestUtil.createFile(conf, fs, filePath);
+  }
+
+  @Test
+  public void readWithSArg() throws IOException, InterruptedException {
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    OrcConf.ALLOW_SARG_TO_FILTER.setBoolean(conf, false);
+    OrcConf.INCLUDE_COLUMNS.setString(conf, "0,1,2,3,4");
+    OrcInputFormat.setSearchArgument(conf,
+                                     SearchArgumentFactory.newBuilder()
+                                       .in("f1", PredicateLeaf.Type.LONG, 0L)
+                                       .build(),
+                                     new String[] {"f1"});
+    FileSplit split = new FileSplit(filePath,
+                                    0, fs.getFileStatus(filePath).getLen(),
+                                    new String[0]);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    FilterTestUtil.readStart();
+    org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> r =
+      new OrcInputFormat<OrcStruct>().createRecordReader(split,
+                                                         attemptContext);
+    long rowCount = validateFilteredRecordReader(r);
+    double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
+                                             fs.getFileStatus(filePath).getLen());
+    assertEquals(FilterTestUtil.RowCount, rowCount);
+    assertTrue(p >= 100);
+  }
+
+  @Test
+  public void readWithSArgAsFilter() throws IOException, InterruptedException {
+    TaskAttemptID id = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
+    OrcConf.ALLOW_SARG_TO_FILTER.setBoolean(conf, true);
+    OrcConf.INCLUDE_COLUMNS.setString(conf, "0,1,2,3,4");
+    OrcInputFormat.setSearchArgument(conf,
+                                     SearchArgumentFactory.newBuilder()
+                                       .in("f1", PredicateLeaf.Type.LONG, 0L)
+                                       .build(),
+                                     new String[] {"f1"});
+    FileSplit split = new FileSplit(filePath,
+                                    0, fs.getFileStatus(filePath).getLen(),
+                                    new String[0]);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    FilterTestUtil.readStart();
+    org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> r =
+      new OrcInputFormat<OrcStruct>().createRecordReader(split,
+                                                         attemptContext);
+    long rowCount = validateFilteredRecordReader(r);
+    double p = FilterTestUtil.readPercentage(FilterTestUtil.readEnd(),
+                                             fs.getFileStatus(filePath).getLen());
+    assertEquals(0, rowCount);
+    assertTrue(p < 30);
+  }
+
+  @Test
+  public void readSingleRowWFilter() throws IOException, InterruptedException {
+    int cnt = 100;
+    Random r = new Random(cnt);
+    long ridx = 0;
+
+    while (cnt > 0) {
+      ridx = r.nextInt((int) RowCount);
+      testSingleRowWfilter(ridx);
+      cnt -= 1;
+    }
+
+  }
+
+  private void testSingleRowWfilter(long idx) throws IOException, InterruptedException {
+    TaskAttemptID id = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
+    OrcConf.ALLOW_SARG_TO_FILTER.setBoolean(conf, true);
+    OrcConf.INCLUDE_COLUMNS.setString(conf, "0,1,2,4");
+    OrcInputFormat.setSearchArgument(conf,
+                                     SearchArgumentFactory.newBuilder()
+                                       .in("ridx", PredicateLeaf.Type.LONG, idx)
+                                       .build(),
+                                     new String[] {"ridx"});
+    FileSplit split = new FileSplit(filePath,
+                                    0, fs.getFileStatus(filePath).getLen(),
+                                    new String[0]);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    FilterTestUtil.readStart();
+    org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> r =
+      new OrcInputFormat<OrcStruct>().createRecordReader(split,
+                                                         attemptContext);
+    long rowCount = 0;
+    while (r.nextKeyValue()) {
+      validateLimitedRow(r.getCurrentValue(), idx);
+      rowCount += 1;
+    }
+    r.close();
+    assertEquals(1, rowCount);
+  }
+
+  private static long validateFilteredRecordReader(org.apache.hadoop.mapreduce.RecordReader<NullWritable
+    , OrcStruct> rr)
+    throws IOException, InterruptedException {
+    long rowCount = 0;
+    while (rr.nextKeyValue()) {
+      validateRow(rr.getCurrentValue());
+      rowCount += 1;
+    }
+    return rowCount;
+  }
+}