You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/08/30 22:26:22 UTC
[orc] branch branch-1.7 updated: ORC-811 Benchmarks and
documentation for ORC-742 and ORC-743
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 49c7e44 ORC-811 Benchmarks and documentation for ORC-742 and ORC-743
49c7e44 is described below
commit 49c7e448ff805b1f5af21f1bdff639ba6834ffc2
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Tue Jun 8 12:16:46 2021 -0700
ORC-811 Benchmarks and documentation for ORC-742 and ORC-743
Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
java/bench/core/pom.xml | 16 +
java/bench/core/src/findbugs/exclude.xml | 2 +-
.../apache/orc/bench/core/filter/FilterBench.java | 274 +++++++++++++++++
.../orc/bench/core/filter/FilterBenchUtil.java | 142 +++++++++
.../java/org/apache/orc/impl/filter/RowFilter.java | 96 ++++++
.../apache/orc/impl/filter/RowFilterFactory.java | 128 ++++++++
.../apache/orc/bench/core/filter/TestFilter.java | 124 ++++++++
.../org/apache/orc/impl/filter/ATestFilter.java | 108 +++++++
.../org/apache/orc/impl/filter/TestRowFilter.java | 171 +++++++++++
java/bench/hive/src/assembly/uber.xml | 7 +
java/bench/pom.xml | 22 +-
site/develop/design/lazy_filter.md | 335 +++++++++++++++++++++
12 files changed, 1423 insertions(+), 2 deletions(-)
diff --git a/java/bench/core/pom.xml b/java/bench/core/pom.xml
index bf39f8d..626b120 100644
--- a/java/bench/core/pom.xml
+++ b/java/bench/core/pom.xml
@@ -100,6 +100,10 @@
<artifactId>jmh-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -107,6 +111,18 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/java/bench/core/src/findbugs/exclude.xml b/java/bench/core/src/findbugs/exclude.xml
index dde1471..e9fe859 100644
--- a/java/bench/core/src/findbugs/exclude.xml
+++ b/java/bench/core/src/findbugs/exclude.xml
@@ -20,6 +20,6 @@
<Class name="~org\.openjdk\.jmh\.infra\.generated.*"/>
</Match>
<Match>
- <Class name="~org\.apache\.orc\.bench\.generated.*"/>
+ <Class name="~org\.apache\.orc\.bench\..*\.generated.*"/>
</Match>
</FindBugsFilter>
diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
new file mode 100644
index 0000000..d493b67
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.filter;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.Reader;
+import org.apache.orc.bench.core.OrcBenchmark;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.filter.RowFilterFactory;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static org.apache.orc.bench.core.BenchmarkOptions.FORK;
+import static org.apache.orc.bench.core.BenchmarkOptions.GC;
+import static org.apache.orc.bench.core.BenchmarkOptions.HELP;
+import static org.apache.orc.bench.core.BenchmarkOptions.ITERATIONS;
+import static org.apache.orc.bench.core.BenchmarkOptions.MAX_MEMORY;
+import static org.apache.orc.bench.core.BenchmarkOptions.MIN_MEMORY;
+import static org.apache.orc.bench.core.BenchmarkOptions.TIME;
+import static org.apache.orc.bench.core.BenchmarkOptions.WARMUP_ITERATIONS;
+
+@AutoService(OrcBenchmark.class)
+public class FilterBench implements OrcBenchmark {
+ @Override
+ public String getName() {
+ return "filter";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Perform filter bench";
+ }
+
+ @Override
+ public void run(String[] args) throws Exception {
+ new Runner(parseOptions(args)).run();
+ }
+
+ private static CommandLine parseCommandLine(String[] args) {
+ org.apache.commons.cli.Options options = new org.apache.commons.cli.Options()
+ .addOption("h", HELP, false, "Provide help")
+ .addOption("i", ITERATIONS, true, "Number of iterations")
+ .addOption("I", WARMUP_ITERATIONS, true, "Number of warmup iterations")
+ .addOption("f", FORK, true, "How many forks to use")
+ .addOption("t", TIME, true, "How long each iteration is in seconds")
+ .addOption("m", MIN_MEMORY, true, "The minimum size of each JVM")
+ .addOption("M", MAX_MEMORY, true, "The maximum size of each JVM")
+ .addOption("g", GC, false, "Should GC be profiled");
+ CommandLine result;
+ try {
+ result = new DefaultParser().parse(options, args, true);
+ } catch (ParseException pe) {
+ System.err.println("Argument exception - " + pe.getMessage());
+ result = null;
+ }
+ if (result == null || result.hasOption(HELP) || result.getArgs().length == 0) {
+ new HelpFormatter().printHelp("java -jar <jar> <command> <options> <sub_cmd>\n"
+ + "sub_cmd:\nsimple\ncomplex\n",
+ options);
+ System.err.println();
+ System.exit(1);
+ }
+ return result;
+ }
+
+ public static Options parseOptions(String[] args) {
+ CommandLine options = parseCommandLine(args);
+ String cmd = options.getArgs()[0];
+ OptionsBuilder builder = new OptionsBuilder();
+ switch (cmd) {
+ case "simple":
+ builder.include(SimpleFilter.class.getSimpleName());
+ break;
+ case "complex":
+ builder.include(ComplexFilter.class.getSimpleName());
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Command %s is not supported", cmd));
+ }
+ if (options.hasOption(GC)) {
+ builder.addProfiler("hs_gc");
+ }
+ if (options.hasOption(ITERATIONS)) {
+ builder.measurementIterations(Integer.parseInt(options.getOptionValue(ITERATIONS)));
+ }
+ if (options.hasOption(WARMUP_ITERATIONS)) {
+ builder.warmupIterations(Integer.parseInt(options.getOptionValue(
+ WARMUP_ITERATIONS)));
+ }
+ if (options.hasOption(FORK)) {
+ builder.forks(Integer.parseInt(options.getOptionValue(
+ FORK)));
+ }
+ if (options.hasOption(TIME)) {
+ TimeValue iterationTime = TimeValue.seconds(Long.parseLong(
+ options.getOptionValue(TIME)));
+ builder.measurementTime(iterationTime);
+ builder.warmupTime(iterationTime);
+ }
+
+ String minMemory = options.getOptionValue(MIN_MEMORY, "256m");
+ String maxMemory = options.getOptionValue(MAX_MEMORY, "2g");
+ builder.jvmArgs("-server",
+ "-Xms" + minMemory, "-Xmx" + maxMemory);
+ return builder.build();
+ }
+
+ private static Consumer<OrcFilterContext> createFilter(SearchArgument sArg,
+ String fType,
+ boolean normalize,
+ Configuration conf)
+ throws FilterFactory.UnSupportedSArgException {
+ switch (fType) {
+ case "row":
+ return RowFilterFactory.create(sArg,
+ FilterBenchUtil.schema,
+ OrcFile.Version.CURRENT,
+ normalize);
+ case "vector":
+ Reader.Options options = new Reader.Options(conf)
+ .searchArgument(sArg, new String[0])
+ .allowSARGToFilter(true)
+ .useSelected(true);
+ return FilterFactory.createBatchFilter(options,
+ FilterBenchUtil.schema,
+ OrcFile.Version.CURRENT,
+ normalize);
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @OutputTimeUnit(value = TimeUnit.MICROSECONDS)
+ @Warmup(iterations = 20, time = 1)
+ @BenchmarkMode(value = Mode.AverageTime)
+ @Fork(value = 1)
+ @State(value = Scope.Benchmark)
+ @Measurement(iterations = 20, time = 1)
+ public static class SimpleFilter {
+ private OrcFilterContext fc;
+ private int[] expSel;
+
+ @Param( {"4", "8", "16", "32", "256"})
+ private int fInSize;
+
+ @Param( {"row", "vector"})
+ private String fType;
+
+ private Consumer<OrcFilterContext> f;
+
+ @Setup
+ public void setup() throws FilterFactory.UnSupportedSArgException {
+ Random rnd = new Random(1024);
+ VectorizedRowBatch b = FilterBenchUtil.createBatch(rnd);
+ Configuration conf = new Configuration();
+ fc = new OrcFilterContextImpl(FilterBenchUtil.schema).setBatch(b);
+ Map.Entry<SearchArgument, int[]> r = FilterBenchUtil.createSArg(rnd, b, fInSize);
+ SearchArgument sArg = r.getKey();
+ expSel = r.getValue();
+ f = createFilter(sArg, fType, false, conf);
+ }
+
+ @Benchmark
+ public OrcFilterContext filter() {
+ // Reset the selection
+ FilterBenchUtil.unFilterBatch(fc);
+ f.accept(fc);
+ return fc;
+ }
+
+ @TearDown
+ public void tearDown() {
+ FilterBenchUtil.validate(fc, expSel);
+ }
+ }
+
+ @OutputTimeUnit(value = TimeUnit.MICROSECONDS)
+ @Warmup(iterations = 20, time = 1)
+ @BenchmarkMode(value = Mode.AverageTime)
+ @Fork(value = 1)
+ @State(value = Scope.Benchmark)
+ @Measurement(iterations = 20, time = 1)
+ public static class ComplexFilter {
+ private OrcFilterContext fc;
+ private int[] expSel;
+
+ private final int inSize = 32;
+
+ @Param( {"2", "4", "8"})
+ private int fSize;
+
+ @Param( {"true", "false"})
+ private boolean normalize;
+
+ @Param( {"row", "vector"})
+ private String fType;
+
+ private Consumer<OrcFilterContext> f;
+ private final Configuration conf = new Configuration();
+
+ @Setup
+ public void setup() throws FilterFactory.UnSupportedSArgException {
+ VectorizedRowBatch b = FilterBenchUtil.createBatch(new Random(1024));
+
+ fc = new OrcFilterContextImpl(FilterBenchUtil.schema).setBatch(b);
+ Map.Entry<SearchArgument, int[]> r = FilterBenchUtil.createComplexSArg(new Random(1024),
+ b,
+ inSize,
+ fSize);
+
+ SearchArgument sArg = r.getKey();
+ expSel = r.getValue();
+ f = createFilter(sArg, fType, normalize, conf);
+ }
+
+ @Benchmark
+ public OrcFilterContext filter() {
+ // Reset the selection
+ FilterBenchUtil.unFilterBatch(fc);
+ f.accept(fc);
+ return fc;
+ }
+
+ @TearDown
+ public void tearDown() {
+ FilterBenchUtil.validate(fc, expSel);
+ }
+ }
+}
diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBenchUtil.java b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBenchUtil.java
new file mode 100644
index 0000000..d39bb08
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBenchUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+class FilterBenchUtil {
+ static final TypeDescription schema = TypeDescription.createStruct()
+ .addField("f1", TypeDescription.createLong())
+ .addField("f2", TypeDescription.createLong());
+
+ static VectorizedRowBatch createBatch(Random rnd) {
+ VectorizedRowBatch b = schema.createRowBatch(1024);
+ LongColumnVector f1Vector = (LongColumnVector) b.cols[0];
+ LongColumnVector f2Vector = (LongColumnVector) b.cols[1];
+
+ for (int i = 0; i < b.getMaxSize(); i++) {
+ f1Vector.vector[b.size] = rnd.nextInt();
+ f2Vector.vector[b.size] = rnd.nextInt();
+ b.size++;
+ }
+ return b;
+ }
+
+ static Map.Entry<SearchArgument, int[]> createSArg(Random rnd,
+ VectorizedRowBatch b,
+ int inSize) {
+ LongColumnVector f1Vector = (LongColumnVector) b.cols[0];
+ LongColumnVector f2Vector = (LongColumnVector) b.cols[1];
+
+ Object[] f1Values = new Object[inSize];
+ Object[] f2Values = new Object[inSize];
+ Set<Integer> sel = new HashSet<>();
+
+ for (int i = 0; i < f1Values.length; i++) {
+ int selIdx = rnd.nextInt(b.getMaxSize());
+ f1Values[i] = f1Vector.vector[selIdx];
+ sel.add(selIdx);
+ selIdx = rnd.nextInt(b.getMaxSize());
+ f2Values[i] = f2Vector.vector[selIdx];
+ sel.add(selIdx);
+ }
+
+ SearchArgument sArg = SearchArgumentFactory.newBuilder()
+ .startOr()
+ .in("f1", PredicateLeaf.Type.LONG, f1Values)
+ .in("f2", PredicateLeaf.Type.LONG, f2Values)
+ .end()
+ .build();
+ int[] s = sel.stream()
+ .mapToInt(Integer::intValue)
+ .toArray();
+ Arrays.sort(s);
+ return new AbstractMap.SimpleImmutableEntry<>(sArg, s);
+ }
+
+ static Map.Entry<SearchArgument, int[]> createComplexSArg(Random rnd,
+ VectorizedRowBatch b,
+ int inSize,
+ int orSize) {
+ LongColumnVector f1Vector = (LongColumnVector) b.cols[0];
+ LongColumnVector f2Vector = (LongColumnVector) b.cols[1];
+
+ Object[] f1Values = new Object[inSize];
+ Object[] f2Values = new Object[inSize];
+ Set<Integer> sel = new HashSet<>();
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ builder.startOr();
+ builder.in("f2", PredicateLeaf.Type.LONG, f2Vector.vector[0], f2Vector.vector[1]);
+ sel.add(0);
+ sel.add(1);
+ int selIdx;
+ for (int i = 0; i < orSize; i++) {
+ builder.startAnd();
+ for (int j = 0; j < inSize; j++) {
+ selIdx = rnd.nextInt(b.getMaxSize());
+ f1Values[j] = f1Vector.vector[selIdx];
+ f2Values[j] = f2Vector.vector[selIdx];
+ sel.add(selIdx);
+ }
+ builder
+ .in("f1", PredicateLeaf.Type.LONG, f1Values)
+ .in("f2", PredicateLeaf.Type.LONG, f2Values);
+ builder.end();
+ }
+ builder.end();
+
+ int[] s = sel.stream()
+ .mapToInt(Integer::intValue)
+ .toArray();
+ Arrays.sort(s);
+ return new AbstractMap.SimpleImmutableEntry<>(builder.build(), s);
+ }
+
+ static void unFilterBatch(OrcFilterContext fc) {
+ fc.setSelectedInUse(false);
+ fc.setSelectedSize(1024);
+ }
+
+ static void validate(OrcFilterContext fc, int[] expSel) {
+ if (!fc.isSelectedInUse()) {
+ throw new IllegalArgumentException("Validation failed: selected is not set");
+ }
+ if (expSel.length != fc.getSelectedSize()) {
+ throw new IllegalArgumentException(String.format(
+ "Validation failed: length %s is not equal to expected length %s",
+ fc.getSelectedSize(), expSel.length));
+ }
+ if (!Arrays.equals(expSel, Arrays.copyOf(fc.getSelected(), expSel.length))) {
+ throw new IllegalArgumentException("Validation failed: array values are not the same");
+ }
+ }
+}
diff --git a/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilter.java b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilter.java
new file mode 100644
index 0000000..59bfce8
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.OrcFilterContext;
+
+interface RowFilter {
+ boolean accept(OrcFilterContext batch, int rowIdx);
+
+ class LeafFilter extends org.apache.orc.impl.filter.LeafFilter implements RowFilter {
+ final org.apache.orc.impl.filter.LeafFilter filter;
+
+ LeafFilter(org.apache.orc.impl.filter.LeafFilter filter) {
+ super(filter.getColName(), false);
+ this.filter = filter;
+ }
+
+ @Override
+ public boolean accept(OrcFilterContext batch, int rowIdx) {
+ ColumnVector[] branch = batch.findColumnVector(filter.getColName());
+ ColumnVector v = branch[branch.length - 1];
+ boolean noNulls = OrcFilterContext.noNulls(branch);
+ int idx = rowIdx;
+ if (v.isRepeating) {
+ idx = 0;
+ }
+ if (noNulls || !OrcFilterContext.isNull(branch, idx)) {
+ return allow(v, idx);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ protected boolean allow(ColumnVector v, int rowIdx) {
+ return filter.allow(v,rowIdx);
+ }
+ }
+
+ class OrFilter implements RowFilter {
+ final RowFilter[] filters;
+
+ OrFilter(RowFilter[] filters) {
+ this.filters = filters;
+ }
+
+ @Override
+ public boolean accept(OrcFilterContext batch, int rowIdx) {
+ boolean result = true;
+ for (RowFilter filter : filters) {
+ result = filter.accept(batch, rowIdx);
+ if (result) {
+ break;
+ }
+ }
+ return result;
+ }
+ }
+
+ class AndFilter implements RowFilter {
+ final RowFilter[] filters;
+
+ AndFilter(RowFilter[] filters) {
+ this.filters = filters;
+ }
+
+ @Override
+ public boolean accept(OrcFilterContext batch, int rowIdx) {
+ boolean result = true;
+ for (RowFilter filter : filters) {
+ result = filter.accept(batch, rowIdx);
+ if (!result) {
+ break;
+ }
+ }
+ return result;
+ }
+ }
+}
diff --git a/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilterFactory.java b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilterFactory.java
new file mode 100644
index 0000000..95a75c6
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/impl/filter/RowFilterFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.filter.leaf.LeafFilterFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class RowFilterFactory {
+ public static Consumer<OrcFilterContext> create(SearchArgument sArg,
+ TypeDescription readSchema,
+ OrcFile.Version version,
+ boolean normalize)
+ throws FilterFactory.UnSupportedSArgException {
+ Set<String> colIds = new HashSet<>();
+ ExpressionTree expr = normalize ? sArg.getExpression() : sArg.getCompactExpression();
+ RowFilter filter = create(expr,
+ colIds,
+ sArg.getLeaves(),
+ readSchema,
+ version);
+ return new RowBatchFilter(filter, colIds.toArray(new String[0]));
+ }
+
+ static RowFilter create(ExpressionTree expr,
+ Set<String> colIds,
+ List<PredicateLeaf> leaves,
+ TypeDescription readSchema,
+ OrcFile.Version version)
+ throws FilterFactory.UnSupportedSArgException {
+ RowFilter result;
+ switch (expr.getOperator()) {
+ case OR:
+ RowFilter[] orFilters = new RowFilter[expr.getChildren().size()];
+ for (int i = 0; i < expr.getChildren().size(); i++) {
+ orFilters[i] = create(expr.getChildren().get(i), colIds, leaves, readSchema, version);
+ }
+ result = new RowFilter.OrFilter(orFilters);
+ break;
+ case AND:
+ RowFilter[] andFilters = new RowFilter[expr.getChildren().size()];
+ for (int i = 0; i < expr.getChildren().size(); i++) {
+ andFilters[i] = create(expr.getChildren().get(i), colIds, leaves, readSchema, version);
+ }
+ result = new RowFilter.AndFilter(andFilters);
+ break;
+ case LEAF:
+ result = createLeafFilter(leaves.get(expr.getLeaf()), colIds, readSchema, version, false);
+ break;
+ default:
+ throw new FilterFactory.UnSupportedSArgException(String.format(
+ "SArg Expression: %s is not supported",
+ expr));
+ }
+ return result;
+ }
+
+ private static RowFilter createLeafFilter(PredicateLeaf leaf,
+ Set<String> colIds,
+ TypeDescription readSchema,
+ OrcFile.Version version,
+ boolean negated)
+ throws FilterFactory.UnSupportedSArgException {
+ colIds.add(leaf.getColumnName());
+ LeafFilter f = (LeafFilter) LeafFilterFactory.createLeafVectorFilter(leaf,
+ colIds,
+ readSchema,
+ version,
+ negated);
+ return new RowFilter.LeafFilter(f);
+ }
+
+ static class RowBatchFilter implements Consumer<OrcFilterContext> {
+
+ private final RowFilter filter;
+ private final String[] colNames;
+
+ private RowBatchFilter(RowFilter filter, String[] colNames) {
+ this.filter = filter;
+ this.colNames = colNames;
+ }
+
+ @Override
+ public void accept(OrcFilterContext batch) {
+ int size = 0;
+ int[] selected = batch.getSelected();
+
+ for (int i = 0; i < batch.getSelectedSize(); i++) {
+ if (filter.accept(batch, i)) {
+ selected[size] = i;
+ size += 1;
+ }
+ }
+ batch.setSelectedInUse(true);
+ batch.setSelected(selected);
+ batch.setSelectedSize(size);
+ }
+
+ public String[] getColNames() {
+ return colNames;
+ }
+ }
+}
diff --git a/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java b/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
new file mode 100644
index 0000000..6b29c7d
--- /dev/null
+++ b/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.filter;
+
+import org.apache.orc.impl.filter.RowFilterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.filter.FilterFactory;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+public class TestFilter {
+ private static final Logger LOG = LoggerFactory.getLogger(TestFilter.class);
+ private static final long seed = 1024;
+ protected final Configuration conf = new Configuration();
+ protected final Random rnd = new Random(seed);
+ protected final VectorizedRowBatch b = FilterBenchUtil.createBatch(rnd);
+ protected final OrcFilterContextImpl fc = (OrcFilterContextImpl)
+ new OrcFilterContextImpl(FilterBenchUtil.schema).setBatch(b);
+
+ public static Stream<Arguments> filters() {
+ return Stream.of(
+ arguments("simple", "row", false),
+ arguments("simple", "vector", false),
+ arguments("complex", "row", true),
+ arguments("complex", "vector", true),
+ arguments("complex", "row", false),
+ arguments("complex", "vector", false)
+ );
+ }
+
+ @BeforeEach
+ public void setup() {
+ FilterBenchUtil.unFilterBatch(fc);
+ }
+
+ @ParameterizedTest(name = "#{index} - {0}+{1}")
+ @MethodSource("org.apache.orc.bench.core.filter.TestFilter#filters")
+ public void testFilter(String complexity, String filterType, boolean normalize)
+ throws FilterFactory.UnSupportedSArgException {
+ new Filter(complexity, filterType, normalize).execute();
+ }
+
+ private class Filter {
+ protected final SearchArgument sArg;
+ protected final int[] expSel;
+ protected final Consumer<OrcFilterContext> filter;
+
+ private Filter(String complexity, String filterType, boolean normalize)
+ throws FilterFactory.UnSupportedSArgException {
+ Map.Entry<SearchArgument, int[]> ft;
+ switch (complexity) {
+ case "simple":
+ ft = FilterBenchUtil.createSArg(new Random(seed), b, 5);
+ break;
+ case "complex":
+ ft = FilterBenchUtil.createComplexSArg(new Random(seed), b, 10, 8);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ sArg = ft.getKey();
+ LOG.info("SearchArgument has {} expressions", sArg.getExpression().getChildren().size());
+ expSel = ft.getValue();
+
+ switch (filterType) {
+ case "row":
+ filter = RowFilterFactory.create(sArg,
+ FilterBenchUtil.schema,
+ OrcFile.Version.CURRENT,
+ normalize);
+ break;
+ case "vector":
+ Reader.Options options = new Reader.Options(conf)
+ .searchArgument(sArg, new String[0])
+ .allowSARGToFilter(true);
+ filter = FilterFactory.createBatchFilter(options,
+ FilterBenchUtil.schema,
+ OrcFile.Version.CURRENT,
+ normalize);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private void execute() {
+ filter.accept(fc.setBatch(b));
+ FilterBenchUtil.validate(fc, expSel);
+ }
+ }
+}
\ No newline at end of file
diff --git a/java/bench/core/src/test/org/apache/orc/impl/filter/ATestFilter.java b/java/bench/core/src/test/org/apache/orc/impl/filter/ATestFilter.java
new file mode 100644
index 0000000..fd55516
--- /dev/null
+++ b/java/bench/core/src/test/org/apache/orc/impl/filter/ATestFilter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.DateUtils;
+import org.apache.orc.impl.OrcFilterContextImpl;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ATestFilter {
+ protected final TypeDescription schema = TypeDescription.createStruct()
+ .addField("f1", TypeDescription.createLong())
+ .addField("f2", TypeDescription.createString())
+ .addField("f3p", TypeDescription.createDate())
+ .addField("f3h", TypeDescription.createDate());
+ protected final OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+
+ protected final VectorizedRowBatch batch = schema.createRowBatch();
+
+ protected void setBatch(Long[] f1Values, String[] f2Values) {
+ setBatch(f1Values, f2Values, new String[0]);
+ }
+
+ protected void setBatch(Long[] f1Values, String[] f2Values, String[] f3Values) {
+ final LongColumnVector f1Vector = (LongColumnVector) batch.cols[0];
+ final BytesColumnVector f2Vector = (BytesColumnVector) batch.cols[1];
+ final DateColumnVector f3p = (DateColumnVector) batch.cols[2];
+ final DateColumnVector f3h = (DateColumnVector) batch.cols[3];
+
+ batch.reset();
+ f1Vector.noNulls = true;
+ for (int i =0; i < f1Values.length; i++) {
+ if (f1Values[i] == null) {
+ f1Vector.noNulls = false;
+ f1Vector.isNull[i] = true;
+ } else {
+ f1Vector.isNull[i] = false;
+ f1Vector.vector[i] = f1Values[i];
+ }
+ }
+
+ for (int i = 0; i < f2Values.length; i++) {
+ if (f2Values[i] == null) {
+ f2Vector.noNulls = false;
+ f2Vector.isNull[i] = true;
+ } else {
+ f2Vector.isNull[i] = false;
+ byte[] bytes = f2Values[i].getBytes(StandardCharsets.UTF_8);
+ f2Vector.vector[i] = bytes;
+ f2Vector.start[i] = 0;
+ f2Vector.length[i] = bytes.length;
+ }
+ }
+
+ for (int i = 0; i < f3Values.length; i++) {
+ if (f3Values[i] == null) {
+ f3p.noNulls = false;
+ f3p.isNull[i] = true;
+ f3h.noNulls = false;
+ f3h.isNull[i] = true;
+ } else {
+ f3p.isNull[i] = false;
+ f3p.vector[i] = DateUtils.parseDate(f3Values[i], true);
+ f3h.isNull[i] = false;
+ f3h.vector[i] = DateUtils.parseDate(f3Values[i], false);
+ }
+ }
+ batch.size = f1Values.length;
+ fc.setBatch(batch);
+ }
+
+ protected void validateSelected(int... v) {
+ assertTrue(fc.isSelectedInUse());
+ assertEquals(v.length, fc.getSelectedSize());
+ assertArrayEquals(v, Arrays.copyOf(fc.getSelected(), v.length));
+ }
+
+ protected void validateNoneSelected() {
+ assertTrue(fc.isSelectedInUse());
+ assertEquals(0, fc.getSelectedSize());
+ }
+}
diff --git a/java/bench/core/src/test/org/apache/orc/impl/filter/TestRowFilter.java b/java/bench/core/src/test/org/apache/orc/impl/filter/TestRowFilter.java
new file mode 100644
index 0000000..08b58bb
--- /dev/null
+++ b/java/bench/core/src/test/org/apache/orc/impl/filter/TestRowFilter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRowFilter extends ATestFilter {
+
+ private final TypeDescription schema = TypeDescription.createStruct()
+ .addField("f1", TypeDescription.createLong())
+ .addField("f2", TypeDescription.createString());
+ final OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+
+ private final VectorizedRowBatch batch = schema.createRowBatch();
+
+ @Test
+ public void testINLongConversion() throws FilterFactory.UnSupportedSArgException {
+ SearchArgument sarg = SearchArgumentFactory.newBuilder()
+ .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+ .build();
+
+ Set<String> colIds = new HashSet<>();
+ RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+ colIds,
+ sarg.getLeaves(),
+ schema,
+ OrcFile.Version.CURRENT);
+ assertNotNull(filter);
+ assertTrue(filter instanceof RowFilter.LeafFilter);
+ assertEquals(1, colIds.size());
+ assertTrue(colIds.contains("f1"));
+
+ setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+ new String[] {});
+ fc.setBatch(batch);
+
+ for (int i = 0; i < batch.size; i++) {
+ if (i % 2 == 0) {
+ assertTrue(filter.accept(fc, i));
+ } else {
+ assertFalse(filter.accept(fc, i));
+ }
+ }
+ }
+
+ @Test
+ public void testINStringConversion() throws FilterFactory.UnSupportedSArgException {
+ SearchArgument sarg = SearchArgumentFactory.newBuilder()
+ .in("f2", PredicateLeaf.Type.STRING, "a", "b")
+ .build();
+
+ Set<String> colIds = new HashSet<>();
+ RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+ colIds,
+ sarg.getLeaves(),
+ schema,
+ OrcFile.Version.CURRENT);
+ assertNotNull(filter);
+ assertTrue(filter instanceof RowFilter.LeafFilter);
+ assertEquals(1, colIds.size());
+ assertTrue(colIds.contains("f2"));
+
+ setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+ new String[] {"a", "z", "b", "y", "a"});
+ fc.setBatch(batch);
+
+ for (int i = 0; i < batch.size; i++) {
+ if (i % 2 == 0) {
+ assertTrue(filter.accept(fc, i));
+ } else {
+ assertFalse(filter.accept(fc, i));
+ }
+ }
+ }
+
+ @Test
+ public void testORConversion() throws FilterFactory.UnSupportedSArgException {
+ SearchArgument sarg = SearchArgumentFactory.newBuilder()
+ .startOr()
+ .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+ .in("f2", PredicateLeaf.Type.STRING, "a", "b", "c")
+ .end()
+ .build();
+
+ Set<String> colIds = new HashSet<>();
+ RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+ colIds,
+ sarg.getLeaves(),
+ schema,
+ OrcFile.Version.CURRENT);
+ assertNotNull(filter);
+ assertTrue(filter instanceof RowFilter.OrFilter);
+ assertEquals(2, ((RowFilter.OrFilter) filter).filters.length);
+ assertEquals(2, colIds.size());
+ assertTrue(colIds.contains("f1"));
+ assertTrue(colIds.contains("f2"));
+
+ // Setup the data such that the OR condition should select every row
+ setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+ new String[] {"z", "a", "y", "b", "x"});
+ fc.setBatch(batch);
+
+
+ for (int i = 0; i < batch.size; i++) {
+ assertTrue(filter.accept(fc, i));
+ }
+ }
+
+ @Test
+ public void testANDConversion() throws FilterFactory.UnSupportedSArgException {
+ SearchArgument sarg = SearchArgumentFactory.newBuilder()
+ .startAnd()
+ .in("f1", PredicateLeaf.Type.LONG, 1L, 2L, 3L)
+ .in("f2", PredicateLeaf.Type.STRING, "a", "b", "c")
+ .end()
+ .build();
+
+ Set<String> colIds = new HashSet<>();
+ RowFilter filter = RowFilterFactory.create(sarg.getExpression(),
+ colIds,
+ sarg.getLeaves(),
+ schema,
+ OrcFile.Version.CURRENT);
+ assertNotNull(filter);
+ assertTrue(filter instanceof RowFilter.AndFilter);
+ assertEquals(2, ((RowFilter.AndFilter) filter).filters.length);
+ assertEquals(2, colIds.size());
+ assertTrue(colIds.contains("f1"));
+ assertTrue(colIds.contains("f2"));
+
+ // Setup the data such that the AND condition should not select any row
+ setBatch(new Long[] {1L, 0L, 2L, 4L, 3L},
+ new String[] {"z", "a", "y", "b", "x"});
+ fc.setBatch(batch);
+
+ for (int i = 0; i < batch.size; i++) {
+ assertFalse(filter.accept(fc, i));
+ }
+ }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/assembly/uber.xml b/java/bench/hive/src/assembly/uber.xml
index 014eab9..7bd202d 100644
--- a/java/bench/hive/src/assembly/uber.xml
+++ b/java/bench/hive/src/assembly/uber.xml
@@ -29,5 +29,12 @@
<containerDescriptorHandler>
<handlerName>metaInf-services</handlerName>
</containerDescriptorHandler>
+ <containerDescriptorHandler>
+ <handlerName>file-aggregator</handlerName>
+ <configuration>
+ <filePattern>META-INF/BenchmarkList</filePattern>
+ <outputPath>META-INF/BenchmarkList</outputPath>
+ </configuration>
+ </containerDescriptorHandler>
</containerDescriptorHandlers>
</assembly>
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index 41e8749..60d1520 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -37,7 +37,9 @@
<orc.version>${project.version}</orc.version>
<parquet.version>1.12.0</parquet.version>
<spark.version>3.1.2</spark.version>
- <jackson.version>2.12.2</jackson.version>
+ <jackson.version>2.12.4</jackson.version>
+ <hadoop.version>3.3.1</hadoop.version>
+ <junit.version>5.7.2</junit.version>
</properties>
<modules>
@@ -49,6 +51,24 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
diff --git a/site/develop/design/lazy_filter.md b/site/develop/design/lazy_filter.md
new file mode 100644
index 0000000..5c64d89
--- /dev/null
+++ b/site/develop/design/lazy_filter.md
@@ -0,0 +1,335 @@
+* [Lazy Filter](#LazyFilter)
+ * [Background](#Background)
+ * [Design](#Design)
+ * [SArg to Filter](#SArgtoFilter)
+ * [Read](#Read)
+ * [Configuration](#Configuration)
+ * [Tests](#Tests)
+ * [Appendix](#Appendix)
+ * [Benchmarks](#Benchmarks)
+ * [Row vs Vector](#RowvsVector)
+ * [Normalization vs Compact](#NormalizationvsCompact)
+ * [Summary](#Summary)
+
+# Lazy Filter <a id="LazyFilter"></a>
+
+## Background <a id="Background"></a>
+
+This feature request started as a result of a needle in the haystack search that is performed with the following
+characteristics:
+
+* The search fields are not part of partition, bucket or sort specification.
+* The table is a very large table.
+* The result is very few rows compared to the scan size.
+* The search columns are a significant subset of selection columns in the query.
+
+Initial analysis showed that we could have a significant benefit by lazily reading the non-search columns only when we
+have a match. We explore the design and some benchmarks in subsequent sections.
+
+## Design <a id="Design"></a>
+
+This builds further on [ORC-577][ORC-577] which currently only restricts deserialization for some selected data types
+but does not improve on IO.
+
+On a high level the design includes the following components:
+
+```text
+┌──────────────┐ ┌────────────────────────┐
+│ │ │ Read │
+│ │ │ │
+│ │ │ ┌────────────┐ │
+│SArg to Filter│─────────▶│ │Read Filter │ │
+│ │ │ │ Columns │ │
+│ │ │ └────────────┘ │
+│ │ │ │ │
+└──────────────┘ │ ▼ │
+ │ ┌────────────┐ │
+ │ │Apply Filter│ │
+ │ └────────────┘ │
+ │ │ │
+ │ ▼ │
+ │ ┌────────────┐ │
+ │ │Read Select │ │
+ │ │ Columns │ │
+ │ └────────────┘ │
+ │ │
+ │ │
+ └────────────────────────┘
+```
+
+* **SArg to Filter**: Converts Search Arguments passed down into filters for efficient application during scans.
+* **Read**: Performs the lazy read using the filters.
+ * **Read Filter Columns**: Read the filter columns from the file.
+ * **Apply Filter**: Apply the filter on the read filter columns.
+ * **Read Select Columns**: If filter selects at least a row then read the remaining columns.
+
+### SArg to Filter <a id="SArgtoFilter"></a>
+
+SArg to Filter converts the passed SArg into a filter. This enables automatic compatibility with both Spark and Hive as
+they already push down Search Arguments down to ORC.
+
+The SArg is automatically converted into a [Vector Filter][vfilter]. Which is applied during the read process. Two
+filter types were evaluated:
+
+* [Row Filter][rfilter] that evaluates each row across all the predicates once.
+* [Vector Filter][vfilter] that evaluates each filter across the entire vector and adjusts the subsequent evaluation.
+
+While a row based filter is easier to code, it is much [slower][rowvvector] to process. We also see a significant
+[performance gain][rowvvector] in the absence of normalization.
+
+The builder for search argument should allow skipping normalization during the [build][build]. This has been added with
+[HIVE-24458][HIVE-24458].
+
+### Read <a id="Read"></a>
+
+The read process has the following changes:
+
+```text
+ │
+ │
+ │
+┌────────────────────────▼────────────────────────┐
+│ ┏━━━━━━━━━━━━━━━━┓ │
+│ ┃Plan ++Search++ ┃ │
+│ ┃ Columns ┃ │
+│ ┗━━━━━━━━━━━━━━━━┛ │
+│ Read │Stripe │
+└────────────────────────┼────────────────────────┘
+ │
+ ▼
+
+
+ │
+ │
+┌────────────────────────▼────────────────────────┐
+│ ┏━━━━━━━━━━━━━━━━┓ │
+│ ┃Read ++Search++ ┃ │
+│ ┃ Columns ┃◀─────────┐ │
+│ ┗━━━━━━━━━━━━━━━━┛ │ │
+│ │ Size = 0 │
+│ ▼ │ │
+│ ┏━━━━━━━━━━━━━━━━┓ │ │
+│ ┃ Apply Filter ┃──────────┘ │
+│ ┗━━━━━━━━━━━━━━━━┛ │
+│ Size > 0 │
+│ │ │
+│ ▼ │
+│ ┏━━━━━━━━━━━━━━━━┓ │
+│ ┃ Plan Select ┃ │
+│ ┃ Columns ┃ │
+│ ┗━━━━━━━━━━━━━━━━┛ │
+│ │ │
+│ ▼ │
+│ ┏━━━━━━━━━━━━━━━━┓ │
+│ ┃ Read Select ┃ │
+│ ┃ Columns ┃ │
+│ ┗━━━━━━━━━━━━━━━━┛ │
+│ Next │Batch │
+└────────────────────────┼────────────────────────┘
+ │
+ ▼
+```
+
+The read process changes:
+
+* **Read Stripe** used to plan the read of all (search + select) columns. This is enhanced to plan and fetch only the
+ search columns. The rest of the stripe planning process optimizations remain unchanged e.g. partial read planning of
+ the stripe based on RowGroup statistics.
+* **Next Batch** identifies the processing that takes place when `RecordReader.nextBatch` is invoked.
+ * **Read Search Columns** takes place instead of reading all the selected columns. This is in sync with the planning
+ that has taken place during **Read Stripe** where only the search columns have been planned.
+ * **Apply Filter** on the batch that at this point only includes search columns. Evaluate the result of the filter:
+ * **Size = 0** indicates all records have been filtered out. Given this we proceed to the next batch of search
+ columns.
+ * **Size > 0** indicates that at least one record accepted by the filter. This record needs to be substantiated with
+ other columns.
+ * **Plan Select Columns** is invoked to perform read of the select columns. The planning happens as follows:
+ * Determine the current position of the read within the stripe and plan the read for the select columns from this
+ point forward to the end of the stripe.
+ * The Read planning of select columns respects the row groups filtered out as a result of the stripe planning.
+ * Fetch the select columns using the above plan.
+ * **Read Select Columns** into the vectorized row batch
+ * Return this batch.
+
+The current implementation performs a single read for the select columns in a stripe.
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│ Stripe │
+└──────────────────────────────────────────────────┘
+```
+
+The above diagram depicts a stripe with 7 Row Groups out of which **RG2** and **RG5** are selected by the filter. The
+current implementation does the following:
+
+* Start the read planning process from the first match RG2
+* Read to the end of the stripe that includes RG6
+* Based on the above fetch skips RG0 and RG1 subject to compression block boundaries
+
+The above logic could be enhanced to perform say **2 or n** reads before reading to the end of stripe. The current
+implementation allows 0 reads before reading to the end of the stripe. The value of **n** could be configurable but
+should avoid too many short reads.
+
+The read behavior changes as follows with multiple reads being allowed within a stripe for select columns:
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │ │ │ │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│ Current implementation │
+└──────────────────────────────────────────────────┘
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │ │ │ │ │■■■■│ │ │ │ │ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│ Allow 1 partial read │
+└──────────────────────────────────────────────────┘
+```
+
+The figure shows that we could read significantly fewer bytes by performing an additional read before reading to the end
+of stripe. This shall be included as a subsequent enhancement to this patch.
+
+## Configuration <a id="Configuration"></a>
+
+The following configuration options are exposed that control the filter behavior:
+
+|Property |Type |Default|
+|:--- |:--- |:--- |
+|orc.sarg.to.filter |boolean|false |
+|orc.filter.use.selected|boolean|false |
+
+* `orc.sarg.to.filter` can be used to turn off the SArg to filter conversion. This might be particularly relevant in
+ cases where the filter is expensive and does not eliminate a lot of records. This will not be relevant once we have
+ the option to turn off the filters on the caller as they have been completely implemented by the ORC layer.
+* `orc.filter.use.selected` is an important setting that if incorrectly enabled results in wrong output. A boolean flag
+ to determine if the selected vector is supported by the reading application. If false, the output of the ORC reader
+ must have the filter reapplied to avoid using unset values in the unselected rows. If unsure please leave this as
+ false.
+
+## Tests <a id="Tests"></a>
+
+We evaluated this patch against a search job with the following stats:
+
+* Table
+ * Size: ~**420 TB**
+ * Data fields: ~**120**
+ * Partition fields: **3**
+* Scan
+ * Search fields: 3 data fields with large (~ 1000 value) IN clauses compounded by **OR**.
+ * Select fields: 16 data fields (includes the 3 search fields), 1 partition field
+ * Search:
+ * Size: ~**180 TB**
+ * Records: **3.99 T**
+ * Selected:
+ * Size: ~**100 MB**
+ * Records: **1 M**
+
+We have observed the following reductions:
+
+|Test |IO Reduction %|CPU Reduction %|
+|:--- | ---:| ---:|
+|SELECT 16 cols| 45| 47|
+|SELECT * | 70| 87|
+
+* The savings are more significant as you increase the number of select columns with respect to the search columns
+* When the filter selects most data, no significant penalty observed as a result of 2 IO compared with a single IO
+ * We do have a penalty as a result of the double filter application both in ORC and in the calling engine.
+
+## Appendix <a id="Appendix"></a>
+
+### Benchmarks <a id="Benchmarks"></a>
+
+#### Row vs Vector <a id="RowvsVector"></a>
+
+We start with a decision of using a Row filter vs a Vector filter. The Row filter has the advantage of simpler code when
+compared with the Vector filter.
+
+```bash
+java -jar java/bench/core/target/orc-benchmarks-core-*-uber.jar filter simple
+```
+
+|Benchmark |(fInSize)|(fType)|Mode| Cnt| Score|Error |Units|
+|:--- | ---:|:--- |:---|---:| ---:|:--- |:--- |
+|FilterBench.SimpleFilter.filter| 4|row |avgt| 20|38.207|± 0.178|us/op|
+|FilterBench.SimpleFilter.filter| 4|vector |avgt| 20|18.663|± 0.117|us/op|
+|FilterBench.SimpleFilter.filter| 8|row |avgt| 20|50.694|± 0.313|us/op|
+|FilterBench.SimpleFilter.filter| 8|vector |avgt| 20|35.532|± 0.190|us/op|
+|FilterBench.SimpleFilter.filter| 16|row |avgt| 20|52.443|± 0.268|us/op|
+|FilterBench.SimpleFilter.filter| 16|vector |avgt| 20|33.966|± 0.204|us/op|
+|FilterBench.SimpleFilter.filter| 32|row |avgt| 20|68.504|± 0.318|us/op|
+|FilterBench.SimpleFilter.filter| 32|vector |avgt| 20|51.707|± 0.302|us/op|
+|FilterBench.SimpleFilter.filter| 256|row |avgt| 20|88.348|± 0.793|us/op|
+|FilterBench.SimpleFilter.filter| 256|vector |avgt| 20|72.602|± 0.282|us/op|
+
+Explanation:
+
+* **fInSize** calls out the number of values in the IN clause.
+* **fType** calls out the whether the filter is a row based filter, or a vector based filter.
+
+Observations:
+
+* The vector based filter is significantly faster than the row based filter.
+ * At best, vector was faster by **51.15%**
+ * At worst, vector was faster by **17.82%**
+* The performance of the filters is deteriorates with the increase of the IN values, however even in this case the
+ vector filter is much better than the row filter. The current `IN` filter employs a binary search on an array instead
+ of a hash lookup.
+
+#### Normalization vs Compact <a id="NormalizationvsCompact"></a>
+
+In this test we use a complex filter with both AND, and OR to understand the impact of Conjunctive Normal Form on the
+filter performance. The Search Argument builder by default performs a CNF. The advantage of the CNF would again be a
+simpler code base.
+
+```bash
+java -jar java/bench/core/target/orc-benchmarks-core-*-uber.jar filter complex
+```
+
+|Benchmark |(fSize)|(fType)|(normalize)|Mode| Cnt| Score|Error |Units|
+|:--- | ---:|:--- |:--- |:---|---:| ---:|:--- |:--- |
+|FilterBench.ComplexFilter.filter| 2|row |true |avgt| 20| 91.922|± 0.301 |us/op|
+|FilterBench.ComplexFilter.filter| 2|row |false |avgt| 20| 90.741|± 0.556 |us/op|
+|FilterBench.ComplexFilter.filter| 2|vector |true |avgt| 20| 61.137|± 0.398 |us/op|
+|FilterBench.ComplexFilter.filter| 2|vector |false |avgt| 20| 54.829|± 0.431 |us/op|
+|FilterBench.ComplexFilter.filter| 4|row |true |avgt| 20| 284.956|± 1.237 |us/op|
+|FilterBench.ComplexFilter.filter| 4|row |false |avgt| 20| 130.526|± 0.767 |us/op|
+|FilterBench.ComplexFilter.filter| 4|vector |true |avgt| 20| 242.387|± 1.053 |us/op|
+|FilterBench.ComplexFilter.filter| 4|vector |false |avgt| 20| 98.530|± 0.423 |us/op|
+|FilterBench.ComplexFilter.filter| 8|row |true |avgt| 20|8007.101|± 54.912|us/op|
+|FilterBench.ComplexFilter.filter| 8|row |false |avgt| 20| 234.943|± 4.713 |us/op|
+|FilterBench.ComplexFilter.filter| 8|vector |true |avgt| 20|7013.758|± 33.701|us/op|
+|FilterBench.ComplexFilter.filter| 8|vector |false |avgt| 20| 190.442|± 0.881 |us/op|
+
+Explanation:
+
+* **fSize** identifies the size of the children in the OR clause that will be normalized.
+* **normalize** identifies whether normalize was carried out on the Search Argument.
+
+Observations:
+
+* Vector filter is better than the row filter as demonstrated by the [Row vs Vector Test][rowvvector].
+* Normalizing the search argument results in a significant performance penalty given the explosion of the operator tree
+ * In case where an AND includes 8 ORs, the compact version is faster by **97.29%**
+
+#### Summary <a id="Summary"></a>
+
+Based on the benchmarks we have the following conclusions:
+
+* Vector based filter is significantly better than a row based filter and justifies the more complex code.
+* Compact filter is significantly faster than a normalized filter.
+
+[ORC-577]: https://issues.apache.org/jira/browse/ORC-577
+
+[HIVE-24458]: https://issues.apache.org/jira/browse/HIVE-24458
+
+[vfilter]: ../../../java/core/src/java/org/apache/orc/impl/filter/VectorFilter.java
+
+[rowvvector]: #RowvsVector
+
+[normalvcompact]: #NormalizationvsCompact
+
+[build]: https://github.com/apache/hive/blob/storage-branch-2.7/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L491
\ No newline at end of file