You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/08/30 22:26:22 UTC

[orc] branch branch-1.7 updated: ORC-811 Benchmarks and documentation for ORC-742 and ORC-743

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 49c7e44  ORC-811 Benchmarks and documentation for ORC-742 and ORC-743
49c7e44 is described below

commit 49c7e448ff805b1f5af21f1bdff639ba6834ffc2
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Tue Jun 8 12:16:46 2021 -0700

    ORC-811 Benchmarks and documentation for ORC-742 and ORC-743
    
    Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
 java/bench/core/pom.xml                            |  16 +
 java/bench/core/src/findbugs/exclude.xml           |   2 +-
 .../apache/orc/bench/core/filter/FilterBench.java  | 274 +++++++++++++++++
 .../orc/bench/core/filter/FilterBenchUtil.java     | 142 +++++++++
 .../java/org/apache/orc/impl/filter/RowFilter.java |  96 ++++++
 .../apache/orc/impl/filter/RowFilterFactory.java   | 128 ++++++++
 .../apache/orc/bench/core/filter/TestFilter.java   | 124 ++++++++
 .../org/apache/orc/impl/filter/ATestFilter.java    | 108 +++++++
 .../org/apache/orc/impl/filter/TestRowFilter.java  | 171 +++++++++++
 java/bench/hive/src/assembly/uber.xml              |   7 +
 java/bench/pom.xml                                 |  22 +-
 site/develop/design/lazy_filter.md                 | 335 +++++++++++++++++++++
 12 files changed, 1423 insertions(+), 2 deletions(-)

diff --git a/java/bench/core/pom.xml b/java/bench/core/pom.xml
index bf39f8d..626b120 100644
--- a/java/bench/core/pom.xml
+++ b/java/bench/core/pom.xml
@@ -100,6 +100,10 @@
       <artifactId>jmh-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -107,6 +111,18 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/java/bench/core/src/findbugs/exclude.xml b/java/bench/core/src/findbugs/exclude.xml
index dde1471..e9fe859 100644
--- a/java/bench/core/src/findbugs/exclude.xml
+++ b/java/bench/core/src/findbugs/exclude.xml
@@ -20,6 +20,6 @@
     <Class name="~org\.openjdk\.jmh\.infra\.generated.*"/>
   </Match>
   <Match>
-    <Class name="~org\.apache\.orc\.bench\.generated.*"/>
+    <Class name="~org\.apache\.orc\.bench\..*\.generated.*"/>
   </Match>
 </FindBugsFilter>
diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
new file mode 100644
index 0000000..d493b67
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.filter;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.Reader;
+import org.apache.orc.bench.core.OrcBenchmark;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.filter.RowFilterFactory;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static org.apache.orc.bench.core.BenchmarkOptions.FORK;
+import static org.apache.orc.bench.core.BenchmarkOptions.GC;
+import static org.apache.orc.bench.core.BenchmarkOptions.HELP;
+import static org.apache.orc.bench.core.BenchmarkOptions.ITERATIONS;
+import static org.apache.orc.bench.core.BenchmarkOptions.MAX_MEMORY;
+import static org.apache.orc.bench.core.BenchmarkOptions.MIN_MEMORY;
+import static org.apache.orc.bench.core.BenchmarkOptions.TIME;
+import static org.apache.orc.bench.core.BenchmarkOptions.WARMUP_ITERATIONS;
+
+@AutoService(OrcBenchmark.class)
+public class FilterBench implements OrcBenchmark {
+  @Override
+  public String getName() {
+    return "filter";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Perform filter bench";
+  }
+
+  @Override
+  public void run(String[] args) throws Exception {
+    new Runner(parseOptions(args)).run();
+  }
+
+  private static CommandLine parseCommandLine(String[] args) {
+    org.apache.commons.cli.Options options = new org.apache.commons.cli.Options()
+      .addOption("h", HELP, false, "Provide help")
+      .addOption("i", ITERATIONS, true, "Number of iterations")
+      .addOption("I", WARMUP_ITERATIONS, true, "Number of warmup iterations")
+      .addOption("f", FORK, true, "How many forks to use")
+      .addOption("t", TIME, true, "How long each iteration is in seconds")
+      .addOption("m", MIN_MEMORY, true, "The minimum size of each JVM")
+      .addOption("M", MAX_MEMORY, true, "The maximum size of each JVM")
+      .addOption("g", GC, false, "Should GC be profiled");
+    CommandLine result;
+    try {
+      result = new DefaultParser().parse(options, args, true);
+    } catch (ParseException pe) {
+      System.err.println("Argument exception - " + pe.getMessage());
+      result = null;
+    }
+    if (result == null || result.hasOption(HELP) || result.getArgs().length == 0) {
+      new HelpFormatter().printHelp("java -jar <jar> <command> <options> <sub_cmd>\n"
+                                    + "sub_cmd:\nsimple\ncomplex\n",
+                                    options);
+      System.err.println();
+      System.exit(1);
+    }
+    return result;
+  }
+
+  public static Options parseOptions(String[] args) {
+    CommandLine options = parseCommandLine(args);
+    String cmd = options.getArgs()[0];
+    OptionsBuilder builder = new OptionsBuilder();
+    switch (cmd) {
+      case "simple":
+        builder.include(SimpleFilter.class.getSimpleName());
+        break;
+      case "complex":
+        builder.include(ComplexFilter.class.getSimpleName());
+        break;
+      default:
+        throw new UnsupportedOperationException(String.format("Command %s is not supported", cmd));
+    }
+    if (options.hasOption(GC)) {
+      builder.addProfiler("hs_gc");
+    }
+    if (options.hasOption(ITERATIONS)) {
+      builder.measurementIterations(Integer.parseInt(options.getOptionValue(ITERATIONS)));
+    }
+    if (options.hasOption(WARMUP_ITERATIONS)) {
+      builder.warmupIterations(Integer.parseInt(options.getOptionValue(
+        WARMUP_ITERATIONS)));
+    }
+    if (options.hasOption(FORK)) {
+      builder.forks(Integer.parseInt(options.getOptionValue(
+        FORK)));
+    }
+    if (options.hasOption(TIME)) {
+      TimeValue iterationTime = TimeValue.seconds(Long.parseLong(
+        options.getOptionValue(TIME)));
+      builder.measurementTime(iterationTime);
+      builder.warmupTime(iterationTime);
+    }
+
+    String minMemory = options.getOptionValue(MIN_MEMORY, "256m");
+    String maxMemory = options.getOptionValue(MAX_MEMORY, "2g");
+    builder.jvmArgs("-server",
+                    "-Xms" + minMemory, "-Xmx" + maxMemory);
+    return builder.build();
+  }
+
+  private static Consumer<OrcFilterContext> createFilter(SearchArgument sArg,
+                                                         String fType,
+                                                         boolean normalize,
+                                                         Configuration conf)
+    throws FilterFactory.UnSupportedSArgException {
+    switch (fType) {
+      case "row":
+        return RowFilterFactory.create(sArg,
+                                       FilterBenchUtil.schema,
+                                       OrcFile.Version.CURRENT,
+                                       normalize);
+      case "vector":
+        Reader.Options options = new Reader.Options(conf)
+          .searchArgument(sArg, new String[0])
+          .allowSARGToFilter(true)
+          .useSelected(true);
+        return FilterFactory.createBatchFilter(options,
+                                               FilterBenchUtil.schema,
+                                               OrcFile.Version.CURRENT,
+                                               normalize);
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  @OutputTimeUnit(value = TimeUnit.MICROSECONDS)
+  @Warmup(iterations = 20, time = 1)
+  @BenchmarkMode(value = Mode.AverageTime)
+  @Fork(value = 1)
+  @State(value = Scope.Benchmark)
+  @Measurement(iterations = 20, time = 1)
+  public static class SimpleFilter {
+    private OrcFilterContext fc;
+    private int[] expSel;
+
+    @Param( {"4", "8", "16", "32", "256"})
+    private int fInSize;
+
+    @Param( {"row", "vector"})
+    private String fType;
+
+    private Consumer<OrcFilterContext> f;
+
+    @Setup
+    public void setup() throws FilterFactory.UnSupportedSArgException {
+      Random rnd = new Random(1024);
+      VectorizedRowBatch b = FilterBenchUtil.createBatch(rnd);
+      Configuration conf = new Configuration();
+      fc = new OrcFilterContextImpl(FilterBenchUtil.schema).setBatch(b);
+      Map.Entry<SearchArgument, int[]> r = FilterBenchUtil.createSArg(rnd, b, fInSize);
+      SearchArgument sArg = r.getKey();
+      expSel = r.getValue();
+      f = createFilter(sArg, fType, false, conf);
+    }
+
+    @Benchmark
+    public OrcFilterContext filter() {
+      // Reset the selection
+      FilterBenchUtil.unFilterBatch(fc);
+      f.accept(fc);
+      return fc;
+    }
+
+    @TearDown
+    public void tearDown() {
+      FilterBenchUtil.validate(fc, expSel);
+    }
+  }
+
+  @OutputTimeUnit(value = TimeUnit.MICROSECONDS)
+  @Warmup(iterations = 20, time = 1)
+  @BenchmarkMode(value = Mode.AverageTime)
+  @Fork(value = 1)
+  @State(value = Scope.Benchmark)
+  @Measurement(iterations = 20, time = 1)
+  public static class ComplexFilter {
+    private OrcFilterContext fc;
+    private int[] expSel;
+
+    private final int inSize = 32;
+
+    @Param( {"2", "4", "8"})
+    private int fSize;
+
+    @Param( {"true", "false"})
+    private boolean normalize;
+
+    @Param( {"row", "vector"})
+    private String fType;
+
+    private Consumer<OrcFilterContext> f;
+    private final Configuration conf = new Configuration();
+
+    @Setup
+    public void setup() throws FilterFactory.UnSupportedSArgException {
+      VectorizedRowBatch b = FilterBenchUtil.createBatch(new Random(1024));
+
+      fc = new OrcFilterContextImpl(FilterBenchUtil.schema).setBatch(b);
+      Map.Entry<SearchArgument, int[]> r = FilterBenchUtil.createComplexSArg(new Random(1024),
+                                                                             b,
+                                                                             inSize,
+                                                                             fSize);
+
+      SearchArgument sArg = r.getKey();
+      expSel = r.getValue();
+      f = createFilter(sArg, fType, normalize, conf);
+    }
+
+    @Benchmark
+    public OrcFilterContext filter() {
+      // Reset the selection
+      FilterBenchUtil.unFilterBatch(fc);
+      f.accept(fc);
+      return fc;
+    }
+
+    @TearDown
+    public void tearDown() {
+      FilterBenchUtil.validate(fc, expSel);
+    }
+  }
+}
diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBenchUtil.java b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBenchUtil.java
new file mode 100644
index 0000000..d39bb08
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBenchUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+class FilterBenchUtil {
+  static final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createLong());
+
+  static VectorizedRowBatch createBatch(Random rnd) {
+    VectorizedRowBatch b = schema.createRowBatch(1024);
+    LongColumnVector f1Vector = (LongColumnVector) b.cols[0];
+    LongColumnVector f2Vector = (LongColumnVector) b.cols[1];
+
+    for (int i = 0; i < b.getMaxSize(); i++) {
+      f1Vector.vector[b.size] = rnd.nextInt();
+      f2Vector.vector[b.size] = rnd.nextInt();
+      b.size++;
+    }
+    return b;
+  }
+
+  static Map.Entry<SearchArgument, int[]> createSArg(Random rnd,
+                                                     VectorizedRowBatch b,
+                                                     int inSize) {
+    LongColumnVector f1Vector = (LongColumnVector) b.cols[0];
+    LongColumnVector f2Vector = (LongColumnVector) b.cols[1];
+
+    Object[] f1Values = new Object[inSize];
+    Object[] f2Values = new Object[inSize];
+    Set<Integer> sel = new HashSet<>();
+
+    for (int i = 0; i < f1Values.length; i++) {
+      int selIdx = rnd.nextInt(b.getMaxSize());
+      f1Values[i] = f1Vector.vector[selIdx];
+      sel.add(selIdx);
+      selIdx = rnd.nextInt(b.getMaxSize());
+      f2Values[i] = f2Vector.vector[selIdx];
+      sel.add(selIdx);
+    }
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .in("f1", PredicateLeaf.Type.LONG, f1Values)
+      .in("f2", PredicateLeaf.Type.LONG, f2Values)
+      .end()
+      .build();
+    int[] s = sel.stream()
+      .mapToInt(Integer::intValue)
+      .toArray();
+    Arrays.sort(s);
+    return new AbstractMap.SimpleImmutableEntry<>(sArg, s);
+  }
+
+  static Map.Entry<SearchArgument, int[]> createComplexSArg(Random rnd,
+                                                            VectorizedRowBatch b,
+                                                            int inSize,
+                                                            int orSize) {
+    LongColumnVector f1Vector = (LongColumnVector) b.cols[0];
+    LongColumnVector f2Vector = (LongColumnVector) b.cols[1];
+
+    Object[] f1Values = new Object[inSize];
+    Object[] f2Values = new Object[inSize];
+    Set<Integer> sel = new HashSet<>();
+    SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+    builder.startOr();
+    builder.in("f2", PredicateLeaf.Type.LONG, f2Vector.vector[0], f2Vector.vector[1]);
+    sel.add(0);
+    sel.add(1);
+    int selIdx;
+    for (int i = 0; i < orSize; i++) {
+      builder.startAnd();
+      for (int j = 0; j < inSize; j++) {
+        selIdx = rnd.nextInt(b.getMaxSize());
+        f1Values[j] = f1Vector.vector[selIdx];
+        f2Values[j] = f2Vector.vector[selIdx];
+        sel.add(selIdx);
+      }
+      builder
+        .in("f1", PredicateLeaf.Type.LONG, f1Values)
+        .in("f2", PredicateLeaf.Type.LONG, f2Values);
+      builder.end();
+    }
+    builder.end();
+
+    int[] s = sel.stream()
+      .mapToInt(Integer::intValue)
+      .toArray();
+    Arrays.sort(s);
+    return new AbstractMap.SimpleImmutableEntry<>(builder.build(), s);
+  }
+
+  static void unFilterBatch(OrcFilterContext fc) {
+    fc.setSelectedInUse(false);
+    fc.setSelectedSize(1024);
+  }
+
+  static void validate(OrcFilterContext fc, int[] expSel) {
+    if (!fc.isSelectedInUse()) {
+      throw new IllegalArgumentException("Validation failed: selected is not set");
+    }
+    if (expSel.length != fc.getSelectedSize()) {
+      throw new IllegalArgumentException(String.format(
+        "Validation failed: length %s is not equal to expected length %s",
+        fc.getSelectedSize(), expSel.length));
+    }
+    if (!Arrays.equals(expSel, Arrays.copyOf(fc.getSelected(), expSel.length))) {
+      throw new IllegalArgumentException("Validation failed: array values are not the same");
+    }
+  }
+}
diff --git a/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilter.java b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilter.java
new file mode 100644
index 0000000..59bfce8
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.OrcFilterContext;
+
+interface RowFilter {
+  boolean accept(OrcFilterContext batch, int rowIdx);
+
+  class LeafFilter extends org.apache.orc.impl.filter.LeafFilter implements RowFilter {
+    final org.apache.orc.impl.filter.LeafFilter filter;
+
+    LeafFilter(org.apache.orc.impl.filter.LeafFilter filter) {
+      super(filter.getColName(), false);
+      this.filter = filter;
+    }
+
+    @Override
+    public boolean accept(OrcFilterContext batch, int rowIdx) {
+      ColumnVector[] branch = batch.findColumnVector(filter.getColName());
+      ColumnVector v = branch[branch.length - 1];
+      boolean noNulls = OrcFilterContext.noNulls(branch);
+      int idx = rowIdx;
+      if (v.isRepeating) {
+        idx = 0;
+      }
+      if (noNulls || !OrcFilterContext.isNull(branch, idx)) {
+        return allow(v, idx);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return filter.allow(v,rowIdx);
+    }
+  }
+
+  class OrFilter implements RowFilter {
+    final RowFilter[] filters;
+
+    OrFilter(RowFilter[] filters) {
+      this.filters = filters;
+    }
+
+    @Override
+    public boolean accept(OrcFilterContext batch, int rowIdx) {
+      boolean result = true;
+      for (RowFilter filter : filters) {
+        result = filter.accept(batch, rowIdx);
+        if (result) {
+          break;
+        }
+      }
+      return result;
+    }
+  }
+
+  class AndFilter implements RowFilter {
+    final RowFilter[] filters;
+
+    AndFilter(RowFilter[] filters) {
+      this.filters = filters;
+    }
+
+    @Override
+    public boolean accept(OrcFilterContext batch, int rowIdx) {
+      boolean result = true;
+      for (RowFilter filter : filters) {
+        result = filter.accept(batch, rowIdx);
+        if (!result) {
+          break;
+        }
+      }
+      return result;
+    }
+  }
+}
diff --git a/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilterFactory.java b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilterFactory.java
new file mode 100644
index 0000000..95a75c6
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilterFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.filter.leaf.LeafFilterFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class RowFilterFactory {
+  public static Consumer<OrcFilterContext> create(SearchArgument sArg,
+                                           TypeDescription readSchema,
+                                           OrcFile.Version version,
+                                           boolean normalize)
+    throws FilterFactory.UnSupportedSArgException {
+    Set<String> colIds = new HashSet<>();
+    ExpressionTree expr = normalize ? sArg.getExpression() : sArg.getCompactExpression();
+    RowFilter filter = create(expr,
+                              colIds,
+                              sArg.getLeaves(),
+                              readSchema,
+                              version);
+    return new RowBatchFilter(filter, colIds.toArray(new String[0]));
+  }
+
+  static RowFilter create(ExpressionTree expr,
+                                 Set<String> colIds,
+                                 List<PredicateLeaf> leaves,
+                                 TypeDescription readSchema,
+                                 OrcFile.Version version)
+    throws FilterFactory.UnSupportedSArgException {
+    RowFilter result;
+    switch (expr.getOperator()) {
+      case OR:
+        RowFilter[] orFilters = new RowFilter[expr.getChildren().size()];
+        for (int i = 0; i < expr.getChildren().size(); i++) {
+          orFilters[i] = create(expr.getChildren().get(i), colIds, leaves, readSchema, version);
+        }
+        result = new RowFilter.OrFilter(orFilters);
+        break;
+      case AND:
+        RowFilter[] andFilters = new RowFilter[expr.getChildren().size()];
+        for (int i = 0; i < expr.getChildren().size(); i++) {
+          andFilters[i] = create(expr.getChildren().get(i), colIds, leaves, readSchema, version);
+        }
+        result = new RowFilter.AndFilter(andFilters);
+        break;
+      case LEAF:
+        result = createLeafFilter(leaves.get(expr.getLeaf()), colIds, readSchema, version, false);
+        break;
+      default:
+        throw new FilterFactory.UnSupportedSArgException(String.format(
+          "SArg Expression: %s is not supported",
+          expr));
+    }
+    return result;
+  }
+
+  private static RowFilter createLeafFilter(PredicateLeaf leaf,
+                                            Set<String> colIds,
+                                            TypeDescription readSchema,
+                                            OrcFile.Version version,
+                                            boolean negated)
+    throws FilterFactory.UnSupportedSArgException {
+    colIds.add(leaf.getColumnName());
+    LeafFilter f = (LeafFilter) LeafFilterFactory.createLeafVectorFilter(leaf,
+                                                                         colIds,
+                                                                         readSchema,
+                                                                         version,
+                                                                         negated);
+    return new RowFilter.LeafFilter(f);
+  }
+
+  static class RowBatchFilter implements Consumer<OrcFilterContext> {
+
+    private final RowFilter filter;
+    private final String[] colNames;
+
+    private RowBatchFilter(RowFilter filter, String[] colNames) {
+      this.filter = filter;
+      this.colNames = colNames;
+    }
+
+    @Override
+    public void accept(OrcFilterContext batch) {
+      int size = 0;
+      int[] selected = batch.getSelected();
+
+      for (int i = 0; i < batch.getSelectedSize(); i++) {
+        if (filter.accept(batch, i)) {
+          selected[size] = i;
+          size += 1;
+        }
+      }
+      batch.setSelectedInUse(true);
+      batch.setSelected(selected);
+      batch.setSelectedSize(size);
+    }
+
+    public String[] getColNames() {
+      return colNames;
+    }
+  }
+}
diff --git a/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java b/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
new file mode 100644
index 0000000..6b29c7d
--- /dev/null
+++ b/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.filter;
+
+import org.apache.orc.impl.filter.RowFilterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+public class TestFilter {
+  private static final Logger LOG = LoggerFactory.getLogger(TestFilter.class);
+  private static final long seed = 1024;
+  protected final Configuration conf = new Configuration();
+  protected final Random rnd = new Random(seed);
+  protected final VectorizedRowBatch b = FilterBenchUtil.createBatch(rnd);
+  protected final OrcFilterContextImpl fc = (OrcFilterContextImpl)
+    new OrcFilterContextImpl(FilterBenchUtil.schema).setBatch(b);
+
+  public static Stream<Arguments> filters() {
+    return Stream.of(
+      arguments("simple", "row", false),
+      arguments("simple", "vector", false),
+      arguments("complex", "row", true),
+      arguments("complex", "vector", true),
+      arguments("complex", "row", false),
+      arguments("complex", "vector", false)
+    );
+  }
+
+  @BeforeEach
+  public void setup() {
+    FilterBenchUtil.unFilterBatch(fc);
+  }
+
+  @ParameterizedTest(name = "#{index} - {0}+{1}")
+  @MethodSource("org.apache.orc.bench.core.filter.TestFilter#filters")
+  public void testFilter(String complexity, String filterType, boolean normalize)
+    throws FilterFactory.UnSupportedSArgException {
+    new Filter(complexity, filterType, normalize).execute();
+  }
+
+  private class Filter {
+    protected final SearchArgument sArg;
+    protected final int[] expSel;
+    protected final Consumer<OrcFilterContext> filter;
+
+    private Filter(String complexity, String filterType, boolean normalize)
+      throws FilterFactory.UnSupportedSArgException {
+      Map.Entry<SearchArgument, int[]> ft;
+      switch (complexity) {
+        case "simple":
+          ft = FilterBenchUtil.createSArg(new Random(seed), b, 5);
+          break;
+        case "complex":
+          ft = FilterBenchUtil.createComplexSArg(new Random(seed), b, 10, 8);
+          break;
+        default:
+          throw new IllegalArgumentException();
+      }
+      sArg = ft.getKey();
+      LOG.info("SearchArgument has {} expressions", sArg.getExpression().getChildren().size());
+      expSel = ft.getValue();
+
+      switch (filterType) {
+        case "row":
+          filter = RowFilterFactory.create(sArg,
+                                           FilterBenchUtil.schema,
+                                           OrcFile.Version.CURRENT,
+                                           normalize);
+          break;
+        case "vector":
+          Reader.Options options = new Reader.Options(conf)
+            .searchArgument(sArg, new String[0])
+            .allowSARGToFilter(true);
+          filter = FilterFactory.createBatchFilter(options,
+                                                   FilterBenchUtil.schema,
+                                                   OrcFile.Version.CURRENT,
+                                                   normalize);
+          break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+
+    private void execute() {
+      filter.accept(fc.setBatch(b));
+      FilterBenchUtil.validate(fc, expSel);
+    }
+  }
+}
\ No newline at end of file
diff --git a/java/bench/core/src/test/org/apache/orc/impl/filter/ATestFilter.java b/java/bench/core/src/test/org/apache/orc/impl/filter/ATestFilter.java
new file mode 100644
index 0000000..fd55516
--- /dev/null
+++ b/java/bench/core/src/test/org/apache/orc/impl/filter/ATestFilter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.DateUtils;
+import org.apache.orc.impl.OrcFilterContextImpl;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ATestFilter {
+  protected final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createString())
+    .addField("f3p", TypeDescription.createDate())
+    .addField("f3h", TypeDescription.createDate());
+  protected final OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+
+  protected final VectorizedRowBatch batch = schema.createRowBatch();
+
+  protected void setBatch(Long[] f1Values, String[] f2Values) {
+    setBatch(f1Values, f2Values, new String[0]);
+  }
+
+  protected void setBatch(Long[] f1Values, String[] f2Values, String[] f3Values) {
+    final LongColumnVector f1Vector = (LongColumnVector) batch.cols[0];
+    final BytesColumnVector f2Vector = (BytesColumnVector) batch.cols[1];
+    final DateColumnVector f3p = (DateColumnVector) batch.cols[2];
+    final DateColumnVector f3h = (DateColumnVector) batch.cols[3];
+
+    batch.reset();
+    f1Vector.noNulls = true;
+    for (int i =0; i < f1Values.length; i++) {
+      if (f1Values[i] == null) {
+        f1Vector.noNulls = false;
+        f1Vector.isNull[i] = true;
+      } else {
+        f1Vector.isNull[i] = false;
+        f1Vector.vector[i] = f1Values[i];
+      }
+    }
+
+    for (int i = 0; i < f2Values.length; i++) {
+      if (f2Values[i] == null) {
+        f2Vector.noNulls = false;
+        f2Vector.isNull[i] = true;
+      } else {
+        f2Vector.isNull[i] = false;
+        byte[] bytes = f2Values[i].getBytes(StandardCharsets.UTF_8);
+        f2Vector.vector[i] = bytes;
+        f2Vector.start[i] = 0;
+        f2Vector.length[i] = bytes.length;
+      }
+    }
+
+    for (int i = 0; i < f3Values.length; i++) {
+      if (f3Values[i] == null) {
+        f3p.noNulls = false;
+        f3p.isNull[i] = true;
+        f3h.noNulls = false;
+        f3h.isNull[i] = true;
+      } else {
+        f3p.isNull[i] = false;
+        f3p.vector[i] = DateUtils.parseDate(f3Values[i], true);
+        f3h.isNull[i] = false;
+        f3h.vector[i] = DateUtils.parseDate(f3Values[i], false);
+      }
+    }
+    batch.size = f1Values.length;
+    fc.setBatch(batch);
+  }
+
+  protected void validateSelected(int... v) {
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(v.length, fc.getSelectedSize());
+    assertArrayEquals(v, Arrays.copyOf(fc.getSelected(), v.length));
+  }
+
+  protected void validateNoneSelected() {
+    assertTrue(fc.isSelectedInUse());
+    assertEquals(0, fc.getSelectedSize());
+  }
+}
diff --git a/java/bench/core/src/test/org/apache/orc/impl/filter/TestRowFilter.java b/java/bench/core/src/test/org/apache/orc/impl/filter/TestRowFilter.java
new file mode 100644
index 0000000..08b58bb
--- /dev/null
+++ b/java/bench/core/src/test/org/apache/orc/impl/filter/TestRowFilter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRowFilter extends ATestFilter {
+
+  private final TypeDescription schema = TypeDescription.createStruct()
+    .addField("f1", TypeDescription.createLong())
+    .addField("f2", TypeDescription.createString());
+  final OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+
+  private final VectorizedRowBatch batch = schema.createRowBatch();
+
+  @Test
+  public void testINLongConversion() throws FilterFactory.UnSupportedSArgException {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+      .build();
+
+    Set<String> colIds = new HashSet<>();
+    RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+                                               colIds,
+                                               sarg.getLeaves(),
+                                               schema,
+                                               OrcFile.Version.CURRENT);
+    assertNotNull(filter);
+    assertTrue(filter instanceof RowFilter.LeafFilter);
+    assertEquals(1, colIds.size());
+    assertTrue(colIds.contains("f1"));
+
+    setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+             new String[] {});
+    fc.setBatch(batch);
+
+    for (int i = 0; i < batch.size; i++) {
+      if (i % 2 == 0) {
+        assertTrue(filter.accept(fc, i));
+      } else {
+        assertFalse(filter.accept(fc, i));
+      }
+    }
+  }
+
+  @Test
+  public void testINStringConversion() throws FilterFactory.UnSupportedSArgException {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .in("f2", PredicateLeaf.Type.STRING, "a", "b")
+      .build();
+
+    Set<String> colIds = new HashSet<>();
+    RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+                                               colIds,
+                                               sarg.getLeaves(),
+                                               schema,
+                                               OrcFile.Version.CURRENT);
+    assertNotNull(filter);
+    assertTrue(filter instanceof RowFilter.LeafFilter);
+    assertEquals(1, colIds.size());
+    assertTrue(colIds.contains("f2"));
+
+    setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+             new String[] {"a", "z", "b", "y", "a"});
+    fc.setBatch(batch);
+
+    for (int i = 0; i < batch.size; i++) {
+      if (i % 2 == 0) {
+        assertTrue(filter.accept(fc, i));
+      } else {
+        assertFalse(filter.accept(fc, i));
+      }
+    }
+  }
+
+  @Test
+  public void testORConversion() throws FilterFactory.UnSupportedSArgException {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .startOr()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+      .in("f2", PredicateLeaf.Type.STRING, "a", "b", "c")
+      .end()
+      .build();
+
+    Set<String> colIds = new HashSet<>();
+    RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+                                               colIds,
+                                               sarg.getLeaves(),
+                                               schema,
+                                               OrcFile.Version.CURRENT);
+    assertNotNull(filter);
+    assertTrue(filter instanceof RowFilter.OrFilter);
+    assertEquals(2, ((RowFilter.OrFilter) filter).filters.length);
+    assertEquals(2, colIds.size());
+    assertTrue(colIds.contains("f1"));
+    assertTrue(colIds.contains("f2"));
+
+    // Setup the data such that the OR condition should select every row
+    setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+             new String[] {"z", "a", "y", "b", "x"});
+    fc.setBatch(batch);
+
+
+    for (int i = 0; i < batch.size; i++) {
+      assertTrue(filter.accept(fc, i));
+    }
+  }
+
+  @Test
+  public void testANDConversion() throws FilterFactory.UnSupportedSArgException {
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+      .startAnd()
+      .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+      .in("f2", PredicateLeaf.Type.STRING, "a", "b", "c")
+      .end()
+      .build();
+
+    Set<String> colIds = new HashSet<>();
+    RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+                                               colIds,
+                                               sarg.getLeaves(),
+                                               schema,
+                                               OrcFile.Version.CURRENT);
+    assertNotNull(filter);
+    assertTrue(filter instanceof RowFilter.AndFilter);
+    assertEquals(2, ((RowFilter.AndFilter) filter).filters.length);
+    assertEquals(2, colIds.size());
+    assertTrue(colIds.contains("f1"));
+    assertTrue(colIds.contains("f2"));
+
+    // Setup the data such that the AND condition should not select any row
+    setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+             new String[] {"z", "a", "y", "b", "x"});
+    fc.setBatch(batch);
+
+    for (int i = 0; i < batch.size; i++) {
+      assertFalse(filter.accept(fc, i));
+    }
+  }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/assembly/uber.xml b/java/bench/hive/src/assembly/uber.xml
index 014eab9..7bd202d 100644
--- a/java/bench/hive/src/assembly/uber.xml
+++ b/java/bench/hive/src/assembly/uber.xml
@@ -29,5 +29,12 @@
     <containerDescriptorHandler>
       <handlerName>metaInf-services</handlerName>
     </containerDescriptorHandler>
+    <containerDescriptorHandler>
+      <handlerName>file-aggregator</handlerName>
+      <configuration>
+        <filePattern>META-INF/BenchmarkList</filePattern>
+        <outputPath>META-INF/BenchmarkList</outputPath>
+      </configuration>
+    </containerDescriptorHandler>
   </containerDescriptorHandlers>
 </assembly>
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index 41e8749..60d1520 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -37,7 +37,9 @@
     <orc.version>${project.version}</orc.version>
     <parquet.version>1.12.0</parquet.version>
     <spark.version>3.1.2</spark.version>
-    <jackson.version>2.12.2</jackson.version>
+    <jackson.version>2.12.4</jackson.version>
+    <hadoop.version>3.3.1</hadoop.version>
+    <junit.version>5.7.2</junit.version>
   </properties>
 
   <modules>
@@ -49,6 +51,24 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>org.junit.jupiter</groupId>
+        <artifactId>junit-jupiter-engine</artifactId>
+        <version>${junit.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.junit.vintage</groupId>
+        <artifactId>junit-vintage-engine</artifactId>
+        <version>${junit.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.junit.jupiter</groupId>
+        <artifactId>junit-jupiter-params</artifactId>
+        <version>${junit.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-core</artifactId>
         <version>${jackson.version}</version>
diff --git a/site/develop/design/lazy_filter.md b/site/develop/design/lazy_filter.md
new file mode 100644
index 0000000..5c64d89
--- /dev/null
+++ b/site/develop/design/lazy_filter.md
@@ -0,0 +1,335 @@
+* [Lazy Filter](#LazyFilter)
+  * [Background](#Background)
+  * [Design](#Design)
+    * [SArg to Filter](#SArgtoFilter)
+    * [Read](#Read)
+  * [Configuration](#Configuration)
+  * [Tests](#Tests)
+  * [Appendix](#Appendix)
+    * [Benchmarks](#Benchmarks)
+      * [Row vs Vector](#RowvsVector)
+      * [Normalization vs Compact](#NormalizationvsCompact)
+      * [Summary](#Summary)
+
+# Lazy Filter <a id="LazyFilter"></a>
+
+## Background <a id="Background"></a>
+
+This feature request started as a result of a needle in the haystack search that is performed with the following
+characteristics:
+
+* The search fields are not part of partition, bucket or sort specification.
+* The table is a very large table.
+* The result is very few rows compared to the scan size.
+* The search columns are a significant subset of selection columns in the query.
+
+Initial analysis showed that we could have a significant benefit by lazily reading the non-search columns only when we
+have a match. We explore the design and some benchmarks in subsequent sections.
+
+## Design <a id="Design"></a>
+
+This builds further on [ORC-577][ORC-577] which currently only restricts deserialization for some selected data types
+but does not improve on IO.
+
+On a high level the design includes the following components:
+
+```text
+┌──────────────┐          ┌────────────────────────┐
+│              │          │          Read          │
+│              │          │                        │
+│              │          │     ┌────────────┐     │
+│SArg to Filter│─────────▶│     │Read Filter │     │
+│              │          │     │  Columns   │     │
+│              │          │     └────────────┘     │
+│              │          │            │           │
+└──────────────┘          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Apply Filter│     │
+                          │     └────────────┘     │
+                          │            │           │
+                          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Read Select │     │
+                          │     │  Columns   │     │
+                          │     └────────────┘     │
+                          │                        │
+                          │                        │
+                          └────────────────────────┘
+```
+
+* **SArg to Filter**: Converts Search Arguments passed down into filters for efficient application during scans.
+* **Read**: Performs the lazy read using the filters.
+  * **Read Filter Columns**: Read the filter columns from the file.
+  * **Apply Filter**: Apply the filter on the read filter columns.
+  * **Read Select Columns**: If filter selects at least a row then read the remaining columns.
+
+### SArg to Filter <a id="SArgtoFilter"></a>
+
+SArg to Filter converts the passed SArg into a filter. This enables automatic compatibility with both Spark and Hive as
+they already push down Search Arguments down to ORC.
+
+The SArg is automatically converted into a [Vector Filter][vfilter]. Which is applied during the read process. Two
+filter types were evaluated:
+
+* [Row Filter][rfilter] that evaluates each row across all the predicates once.
+* [Vector Filter][vfilter] that evaluates each filter across the entire vector and adjusts the subsequent evaluation.
+
+While a row based filter is easier to code, it is much [slower][rowvvector] to process. We also see a significant
+[performance gain][rowvvector] in the absence of normalization.
+
+The builder for search argument should allow skipping normalization during the [build][build]. This has been added with
+[HIVE-24458][HIVE-24458].
+
+### Read <a id="Read"></a>
+
+The read process has the following changes:
+
+```text
+                         │
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Plan ++Search++ ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                 Read   │Stripe                  │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+
+
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Read ++Search++ ┃                │
+│               ┃    Columns     ┃◀─────────┐     │
+│               ┗━━━━━━━━━━━━━━━━┛          │     │
+│                        │              Size = 0  │
+│                        ▼                  │     │
+│               ┏━━━━━━━━━━━━━━━━┓          │     │
+│               ┃  Apply Filter  ┃──────────┘     │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                    Size > 0                     │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Plan Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Read Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                   Next │Batch                   │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+```
+
+The read process changes:
+
+* **Read Stripe** used to plan the read of all (search + select) columns. This is enhanced to plan and fetch only the
+  search columns. The rest of the stripe planning process optimizations remain unchanged e.g. partial read planning of
+  the stripe based on RowGroup statistics.
+* **Next Batch** identifies the processing that takes place when `RecordReader.nextBatch` is invoked.
+  * **Read Search Columns** takes place instead of reading all the selected columns. This is in sync with the planning
+    that has taken place during **Read Stripe** where only the search columns have been planned.
+  * **Apply Filter** on the batch that at this point only includes search columns. Evaluate the result of the filter:
+    * **Size = 0** indicates all records have been filtered out. Given this we proceed to the next batch of search
+      columns.
+    * **Size > 0** indicates that at least one record accepted by the filter. This record needs to be substantiated with
+      other columns.
+  * **Plan Select Columns** is invoked to perform read of the select columns. The planning happens as follows:
+    * Determine the current position of the read within the stripe and plan the read for the select columns from this
+      point forward to the end of the stripe.
+    * The Read planning of select columns respects the row groups filtered out as a result of the stripe planning.
+    * Fetch the select columns using the above plan.
+  * **Read Select Columns** into the vectorized row batch
+  * Return this batch.
+
+The current implementation performs a single read for the select columns in a stripe.
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│                      Stripe                      │
+└──────────────────────────────────────────────────┘
+```
+
+The above diagram depicts a stripe with 7 Row Groups out of which **RG2** and **RG5** are selected by the filter. The
+current implementation does the following:
+
+* Start the read planning process from the first match RG2
+* Read to the end of the stripe that includes RG6
+* Based on the above fetch skips RG0 and RG1 subject to compression block boundaries
+
+The above logic could be enhanced to perform say **2 or n** reads before reading to the end of stripe. The current
+implementation allows 0 reads before reading to the end of the stripe. The value of **n** could be configurable but
+should avoid too many short reads.
+
+The read behavior changes as follows with multiple reads being allowed within a stripe for select columns:
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│              Current implementation              │
+└──────────────────────────────────────────────────┘
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │    │ │    │ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│               Allow 1 partial read               │
+└──────────────────────────────────────────────────┘
+```
+
+The figure shows that we could read significantly fewer bytes by performing an additional read before reading to the end
+of stripe. This shall be included as a subsequent enhancement to this patch.
+
+## Configuration <a id="Configuration"></a>
+
+The following configuration options are exposed that control the filter behavior:
+
+|Property               |Type   |Default|
+|:---                   |:---   |:---   |
+|orc.sarg.to.filter     |boolean|false  |
+|orc.filter.use.selected|boolean|false  |
+
+* `orc.sarg.to.filter` can be used to turn off the SArg to filter conversion. This might be particularly relevant in
+  cases where the filter is expensive and does not eliminate a lot of records. This will not be relevant once we have
+  the option to turn off the filters on the caller as they have been completely implemented by the ORC layer.
+* `orc.filter.use.selected` is an important setting that if incorrectly enabled results in wrong output. A boolean flag
+  to determine if the selected vector is supported by the reading application. If false, the output of the ORC reader
+  must have the filter reapplied to avoid using unset values in the unselected rows. If unsure please leave this as
+  false.
+
+## Tests <a id="Tests"></a>
+
+We evaluated this patch against a search job with the following stats:
+
+* Table
+  * Size: ~**420 TB**
+  * Data fields: ~**120**
+  * Partition fields: **3**
+* Scan
+  * Search fields: 3 data fields with large (~ 1000 value) IN clauses compounded by **OR**.
+  * Select fields: 16 data fields (includes the 3 search fields), 1 partition field
+  * Search:
+    * Size: ~**180 TB**
+    * Records: **3.99 T**
+  * Selected:
+    * Size: ~**100 MB**
+    * Records: **1 M**
+
+We have observed the following reductions:
+
+|Test          |IO Reduction %|CPU Reduction %|
+|:---          |          ---:|           ---:|
+|SELECT 16 cols|            45|             47|
+|SELECT *      |            70|             87|
+
+* The savings are more significant as you increase the number of select columns with respect to the search columns
+* When the filter selects most data, no significant penalty observed as a result of 2 IO compared with a single IO
+  * We do have a penalty as a result of the double filter application both in ORC and in the calling engine.
+
+## Appendix <a id="Appendix"></a>
+
+### Benchmarks <a id="Benchmarks"></a>
+
+#### Row vs Vector <a id="RowvsVector"></a>
+
+We start with a decision of using a Row filter vs a Vector filter. The Row filter has the advantage of simpler code when
+compared with the Vector filter.
+
+```bash
+java -jar java/bench/core/target/orc-benchmarks-core-*-uber.jar filter simple
+```
+
+|Benchmark                      |(fInSize)|(fType)|Mode| Cnt| Score|Error  |Units|
+|:---                           |     ---:|:---   |:---|---:|  ---:|:---   |:--- |
+|FilterBench.SimpleFilter.filter|        4|row    |avgt|  20|38.207|± 0.178|us/op|
+|FilterBench.SimpleFilter.filter|        4|vector |avgt|  20|18.663|± 0.117|us/op|
+|FilterBench.SimpleFilter.filter|        8|row    |avgt|  20|50.694|± 0.313|us/op|
+|FilterBench.SimpleFilter.filter|        8|vector |avgt|  20|35.532|± 0.190|us/op|
+|FilterBench.SimpleFilter.filter|       16|row    |avgt|  20|52.443|± 0.268|us/op|
+|FilterBench.SimpleFilter.filter|       16|vector |avgt|  20|33.966|± 0.204|us/op|
+|FilterBench.SimpleFilter.filter|       32|row    |avgt|  20|68.504|± 0.318|us/op|
+|FilterBench.SimpleFilter.filter|       32|vector |avgt|  20|51.707|± 0.302|us/op|
+|FilterBench.SimpleFilter.filter|      256|row    |avgt|  20|88.348|± 0.793|us/op|
+|FilterBench.SimpleFilter.filter|      256|vector |avgt|  20|72.602|± 0.282|us/op|
+
+Explanation:
+
+* **fInSize** calls out the number of values in the IN clause.
+* **fType** calls out the whether the filter is a row based filter, or a vector based filter.
+
+Observations:
+
+* The vector based filter is significantly faster than the row based filter.
+  * At best, vector was faster by **51.15%**
+  * At worst, vector was faster by **17.82%**
+* The performance of the filters is deteriorates with the increase of the IN values, however even in this case the
+  vector filter is much better than the row filter. The current `IN` filter employs a binary search on an array instead
+  of a hash lookup.
+
+#### Normalization vs Compact <a id="NormalizationvsCompact"></a>
+
+In this test we use a complex filter with both AND, and OR to understand the impact of Conjunctive Normal Form on the
+filter performance. The Search Argument builder by default performs a CNF. The advantage of the CNF would again be a
+simpler code base.
+
+```bash
+java -jar java/bench/core/target/orc-benchmarks-core-*-uber.jar filter complex
+```
+
+|Benchmark                       |(fSize)|(fType)|(normalize)|Mode| Cnt|   Score|Error   |Units|
+|:---                            |   ---:|:---   |:---       |:---|---:|    ---:|:---    |:--- |
+|FilterBench.ComplexFilter.filter|      2|row    |true       |avgt|  20|  91.922|± 0.301 |us/op|
+|FilterBench.ComplexFilter.filter|      2|row    |false      |avgt|  20|  90.741|± 0.556 |us/op|
+|FilterBench.ComplexFilter.filter|      2|vector |true       |avgt|  20|  61.137|± 0.398 |us/op|
+|FilterBench.ComplexFilter.filter|      2|vector |false      |avgt|  20|  54.829|± 0.431 |us/op|
+|FilterBench.ComplexFilter.filter|      4|row    |true       |avgt|  20| 284.956|± 1.237 |us/op|
+|FilterBench.ComplexFilter.filter|      4|row    |false      |avgt|  20| 130.526|± 0.767 |us/op|
+|FilterBench.ComplexFilter.filter|      4|vector |true       |avgt|  20| 242.387|± 1.053 |us/op|
+|FilterBench.ComplexFilter.filter|      4|vector |false      |avgt|  20|  98.530|± 0.423 |us/op|
+|FilterBench.ComplexFilter.filter|      8|row    |true       |avgt|  20|8007.101|± 54.912|us/op|
+|FilterBench.ComplexFilter.filter|      8|row    |false      |avgt|  20| 234.943|± 4.713 |us/op|
+|FilterBench.ComplexFilter.filter|      8|vector |true       |avgt|  20|7013.758|± 33.701|us/op|
+|FilterBench.ComplexFilter.filter|      8|vector |false      |avgt|  20| 190.442|± 0.881 |us/op|
+
+Explanation:
+
+* **fSize** identifies the size of the children in the OR clause that will be normalized.
+* **normalize** identifies whether normalize was carried out on the Search Argument.
+
+Observations:
+
+* Vector filter is better than the row filter as demonstrated by the [Row vs Vector Test][rowvvector].
+* Normalizing the search argument results in a significant performance penalty given the explosion of the operator tree
+  * In case where an AND includes 8 ORs, the compact version is faster by **97.29%**
+
+#### Summary <a id="Summary"></a>
+
+Based on the benchmarks we have the following conclusions:
+
+* Vector based filter is significantly better than a row based filter and justifies the more complex code.
+* Compact filter is significantly faster than a normalized filter.
+
+[ORC-577]: https://issues.apache.org/jira/browse/ORC-577
+
+[HIVE-24458]: https://issues.apache.org/jira/browse/HIVE-24458
+
+[vfilter]: ../../../java/core/src/java/org/apache/orc/impl/filter/VectorFilter.java
+
+[rowvvector]: #RowvsVector
+
+[normalvcompact]: #NormalizationvsCompact
+
+[build]: https://github.com/apache/hive/blob/storage-branch-2.7/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L491
\ No newline at end of file