You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/03/04 18:18:07 UTC
[orc] branch master updated: ORC-755: Added OrcFilterContext that
accomplishes the following: * Change the filter from
Consumer to Consumer *
OrcFilterContext has findColumnVector method that allows cached
determination of ColumnVectors from name * OrcFilterContext offers utility
methods for null determination on the vector batch * Refactors ParserUtils
to introduce TypeWalker and VectorWalker to centralize the walk logic *
Tests have been modified to support the new interface
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 853d574 ORC-755: Added OrcFilterContext that accomplishes the following: * Change the filter from Consumer<VectorizedRowBatch> to Consumer<OrcFilterContext> * OrcFilterContext has findColumnVector method that allows cached determination of ColumnVectors from name * OrcFilterContext offers utility methods for null determination on the vector batch * Refactors ParserUtils to introduce TypeWalker and VectorWalker to centralize the walk logic * Tests have been modified to support th [...]
853d574 is described below
commit 853d574df4c23f8dbe892b82220ad82f123f2bf2
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Tue Feb 23 16:55:03 2021 -0800
ORC-755: Added OrcFilterContext that accomplishes the following:
* Change the filter from Consumer<VectorizedRowBatch> to Consumer<OrcFilterContext>
* OrcFilterContext has findColumnVector method that allows cached determination of ColumnVectors from name
* OrcFilterContext offers utility methods for null determination on the vector batch
* Refactors ParserUtils to introduce TypeWalker and VectorWalker to centralize the walk logic
* Tests have been modified to support the new interface
Fixes #647
Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
.../bench/hive/RowFilterProjectionBenchmark.java | 11 +-
.../hive/rowfilter/BooleanRowFilterBenchmark.java | 11 +-
.../hive/rowfilter/DecimalRowFilterBenchmark.java | 11 +-
.../hive/rowfilter/DoubleRowFilterBenchmark.java | 11 +-
.../hive/rowfilter/StringRowFilterBenchmark.java | 11 +-
.../rowfilter/TimestampRowFilterBenchmark.java | 11 +-
.../src/java/org/apache/orc/OrcFilterContext.java | 91 +++++++++
java/core/src/java/org/apache/orc/Reader.java | 7 +-
.../src/java/org/apache/orc/TypeDescription.java | 21 +-
.../org/apache/orc/impl/OrcFilterContextImpl.java | 130 ++++++++++++
.../src/java/org/apache/orc/impl/ParserUtils.java | 207 ++++++++++++++++---
.../org/apache/orc/impl/TreeReaderFactory.java | 9 +-
.../orc/impl/reader/tree/StructBatchReader.java | 5 +-
.../test/org/apache/orc/TestOrcFilterContext.java | 224 +++++++++++++++++++++
.../test/org/apache/orc/TestRowFilteringSkip.java | 97 +++++----
.../apache/orc/impl/TestOrcFilterContextImpl.java | 209 +++++++++++++++++++
16 files changed, 940 insertions(+), 126 deletions(-)
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
index d125606..0f2718d 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.orc.TypeDescription;
import org.apache.orc.bench.core.OrcBenchmark;
import org.apache.orc.bench.core.ReadCounters;
import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
@@ -93,16 +94,16 @@ public class RowFilterProjectionBenchmark implements OrcBenchmark {
}
}
- public static void customIntRowFilter(VectorizedRowBatch batch) {
+ public static void customIntRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
// Select ONLY specific keys
if (filterValues.contains(row)) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
@Benchmark
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
index b7a0e69..789dda4 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class BooleanRowFilterBenchmark extends org.openjdk.jmh.Main {
return filterValues;
}
- public static void customIntRowFilter(VectorizedRowBatch batch) {
+ public static void customIntRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
if (filterValues[row]) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
}
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
index a43c4c5..dc15b30 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -128,15 +129,15 @@ public class DecimalRowFilterBenchmark extends org.openjdk.jmh.Main {
return filterValues;
}
- public static void customIntRowFilter(VectorizedRowBatch batch) {
+ public static void customIntRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
if (filterValues[row]) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
}
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
index d5afb48..616207e 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class DoubleRowFilterBenchmark extends org.openjdk.jmh.Main {
return filterValues;
}
- public static void customIntRowFilter(VectorizedRowBatch batch) {
+ public static void customIntRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
if (filterValues[row]) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
}
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
index 33447cd..4170523 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class StringRowFilterBenchmark extends org.openjdk.jmh.Main {
return filterValues;
}
- public static void customIntRowFilter(VectorizedRowBatch batch) {
+ public static void customIntRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
if (filterValues[row]) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
}
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
index f4438a0..363db35 100644
--- a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.bench.core.Utilities;
+import org.apache.orc.OrcFilterContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -127,15 +128,15 @@ public class TimestampRowFilterBenchmark extends org.openjdk.jmh.Main {
return filterValues;
}
- public static void customIntRowFilter(VectorizedRowBatch batch) {
+ public static void customIntRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
if (filterValues[row]) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
}
diff --git a/java/core/src/java/org/apache/orc/OrcFilterContext.java b/java/core/src/java/org/apache/orc/OrcFilterContext.java
new file mode 100644
index 0000000..52c1500
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/OrcFilterContext.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext;
+
+/**
+ * This defines the input for any filter operation. This is an extension of
+ * [[{@link VectorizedRowBatch}]] with schema.
+ * <p>
+ * This offers a convenience method of finding the column vector from a given column name
+ * that the filters can invoke to get access to the column vector.
+ */
+public interface OrcFilterContext extends MutableFilterContext {
+ /**
+ * Retrieves the column vector that matches the specified name. Allows support for nested struct
+ * references e.g. order.date where date is a field in a struct called order.
+ *
+ * @param name The column name whose vector should be retrieved
+ * @return The column vectors from the root to the column name. The array levels match the name
+ * levels with Array[0] referring to the top level, followed by the subsequent levels. For
+ * example of order.date Array[0] refers to order and Array[1] refers to date
+ * @throws IllegalArgumentException if the field is not found or if the nested field is not part
+ * of a struct
+ */
+ ColumnVector[] findColumnVector(String name);
+
+ /**
+ * Utility method for determining if the leaf vector of the branch can be treated as having
+ * noNulls.
+ * This method navigates from the top to the leaf and checks if we have nulls anywhere in the
+ * branch as compared to checking just the leaf vector.
+ *
+ * @param vectorBranch The input vector branch from the root to the leaf
+ * @return true if the entire branch satisfies noNull else false
+ */
+ static boolean noNulls(ColumnVector[] vectorBranch) {
+ for (ColumnVector v : vectorBranch) {
+ if (!v.noNulls) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Utility method for determining if a particular row element in the vector branch is null.
+ * This method navigates from the top to the leaf and checks if we have nulls anywhere in the
+ * branch as compared to checking just the leaf vector.
+ *
+ * @param vectorBranch The input vector branch from the root to the leaf
+ * @param idx The row index being tested
+ * @return true if the entire branch is not null for the idx otherwise false
+ * @throws IllegalArgumentException If a multivalued vector such as List or Map is encountered in
+ * the branch.
+ */
+ static boolean isNull(ColumnVector[] vectorBranch, int idx) throws IllegalArgumentException {
+ for (ColumnVector v : vectorBranch) {
+ if (v instanceof ListColumnVector || v instanceof MapColumnVector) {
+ throw new IllegalArgumentException(String.format(
+ "Found vector: %s in branch. List and Map vectors are not supported in isNull "
+ + "determination", v));
+ }
+ // v.noNulls = false does not mean that we have at least one null value
+ if (!v.noNulls && v.isNull[v.isRepeating ? 0 : idx]) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/java/core/src/java/org/apache/orc/Reader.java b/java/core/src/java/org/apache/orc/Reader.java
index 84c5d91..7f88bb7 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -26,7 +26,6 @@ import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
/**
* The interface for reading ORC files.
@@ -191,7 +190,7 @@ public interface Reader extends Closeable {
private Boolean skipCorruptRecords = null;
private TypeDescription schema = null;
private String[] preFilterColumns = null;
- Consumer<VectorizedRowBatch> skipRowCallback = null;
+ Consumer<OrcFilterContext> skipRowCallback = null;
private DataReader dataReader = null;
private Boolean tolerateMissingSchema = null;
private boolean forcePositionalEvolution;
@@ -265,7 +264,7 @@ public interface Reader extends Closeable {
*
* @return this
*/
- public Options setRowFilter(String[] filterColumnNames, Consumer<VectorizedRowBatch> filterCallback) {
+ public Options setRowFilter(String[] filterColumnNames, Consumer<OrcFilterContext> filterCallback) {
this.preFilterColumns = filterColumnNames;
this.skipRowCallback = filterCallback;
return this;
@@ -382,7 +381,7 @@ public interface Reader extends Closeable {
return sarg;
}
- public Consumer<VectorizedRowBatch> getFilterCallback() {
+ public Consumer<OrcFilterContext> getFilterCallback() {
return skipRowCallback;
}
diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java b/java/core/src/java/org/apache/orc/TypeDescription.java
index 72beb45..a6c9944 100644
--- a/java/core/src/java/org/apache/orc/TypeDescription.java
+++ b/java/core/src/java/org/apache/orc/TypeDescription.java
@@ -783,24 +783,9 @@ public class TypeDescription
* @return the subtype
*/
public TypeDescription findSubtype(int goal) {
- // call getId method to make sure the ids are assigned
- int id = getId();
- if (goal < id || goal > maxId) {
- throw new IllegalArgumentException("Unknown type id " + id + " in " +
- toJson());
- }
- if (goal == id) {
- return this;
- } else {
- TypeDescription prev = null;
- for(TypeDescription next: children) {
- if (next.id > goal) {
- return prev.findSubtype(goal);
- }
- prev = next;
- }
- return prev.findSubtype(goal);
- }
+ ParserUtils.TypeFinder result = new ParserUtils.TypeFinder(this);
+ ParserUtils.findSubtype(this, goal, result);
+ return result.current;
}
/**
diff --git a/java/core/src/java/org/apache/orc/impl/OrcFilterContextImpl.java b/java/core/src/java/org/apache/orc/impl/OrcFilterContextImpl.java
new file mode 100644
index 0000000..8769c5f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcFilterContextImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.TypeDescription;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This defines the input for any filter operation. This is an extension of
+ * [[{@link VectorizedRowBatch}]] with schema.
+ * <p>
+ * This offers a convenience method of finding the column vector from a given column name
+ * that the filters can invoke to get access to the column vector.
+ */
+public class OrcFilterContextImpl implements OrcFilterContext {
+ private VectorizedRowBatch batch = null;
+ // Cache of field to ColumnVector, this is reset everytime the batch reference changes
+ private final Map<String, ColumnVector[]> vectors;
+ private final TypeDescription readSchema;
+
+ public OrcFilterContextImpl(TypeDescription readSchema) {
+ this.readSchema = readSchema;
+ this.vectors = new HashMap<>();
+ }
+
+ public OrcFilterContext setBatch(@NotNull VectorizedRowBatch batch) {
+ if (batch != this.batch) {
+ this.batch = batch;
+ vectors.clear();
+ }
+ return this;
+ }
+
+ /**
+ * For testing only
+ * @return The batch reference against which the cache is maintained
+ */
+ VectorizedRowBatch getBatch() {
+ return batch;
+ }
+
+ @Override
+ public void setFilterContext(boolean selectedInUse, int[] selected, int selectedSize) {
+ batch.setFilterContext(selectedInUse, selected, selectedSize);
+ }
+
+ @Override
+ public boolean validateSelected() {
+ return batch.validateSelected();
+ }
+
+ @Override
+ public int[] updateSelected(int i) {
+ return batch.updateSelected(i);
+ }
+
+ @Override
+ public void setSelectedInUse(boolean b) {
+ batch.setSelectedInUse(b);
+ }
+
+ @Override
+ public void setSelected(int[] ints) {
+ batch.setSelected(ints);
+ }
+
+ @Override
+ public void setSelectedSize(int i) {
+ batch.setSelectedSize(i);
+ }
+
+ @Override
+ public void reset() {
+ batch.reset();
+ }
+
+ @Override
+ public boolean isSelectedInUse() {
+ return batch.isSelectedInUse();
+ }
+
+ @Override
+ public int[] getSelected() {
+ return batch.getSelected();
+ }
+
+ @Override
+ public int getSelectedSize() {
+ return batch.getSelectedSize();
+ }
+
+ // For testing only
+ public ColumnVector[] getCols() {
+ return batch.cols;
+ }
+
+ @Override
+ public ColumnVector[] findColumnVector(String name) {
+ if (!vectors.containsKey(name)) {
+ vectors.put(name,
+ ParserUtils.findColumnVectors(readSchema,
+ new ParserUtils.StringPosition(name),
+ true,
+ batch));
+ }
+ return vectors.get(name);
+ }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/ParserUtils.java b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
index 559e3e1..fe2111b 100644
--- a/java/core/src/java/org/apache/orc/impl/ParserUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
@@ -18,6 +18,12 @@
package org.apache.orc.impl;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
import java.util.ArrayList;
@@ -249,63 +255,136 @@ public class ParserUtils {
return findSubtype(schema, source, true);
}
+
+ public interface TypeVisitor {
+ /**
+ * As we navigate to the column, call this on each level
+ * @param type new level we are moving to
+ * @param posn the position relative to the parent
+ */
+ void visit(TypeDescription type, int posn);
+ }
+
+ public static class TypeFinder implements TypeVisitor {
+ public TypeDescription current;
+
+ public TypeFinder(TypeDescription schema) {
+ current = schema;
+ }
+
+ @Override
+ public void visit(TypeDescription type, int posn) {
+ current = type;
+ }
+ }
+
public static TypeDescription findSubtype(TypeDescription schema,
ParserUtils.StringPosition source,
boolean isSchemaEvolutionCaseAware) {
- List<String> names = ParserUtils.splitName(source);
+ TypeFinder result = new TypeFinder(removeAcid(schema));
+ findColumn(result.current, source, isSchemaEvolutionCaseAware, result);
+ return result.current;
+ }
+
+ private static TypeDescription removeAcid(TypeDescription schema) {
+ return SchemaEvolution.checkAcidSchema(schema)
+ ? SchemaEvolution.getBaseRow(schema) : schema;
+ }
+
+ private static int findCaseInsensitive(List<String> list, String goal) {
+ for (int i = 0; i < list.size(); i++) {
+ if (list.get(i).equalsIgnoreCase(goal)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public static void findSubtype(TypeDescription schema,
+ int goal,
+ TypeVisitor visitor) {
+ TypeDescription current = schema;
+ int id = schema.getId();
+ if (goal < id || goal > schema.getMaximumId()) {
+ throw new IllegalArgumentException("Unknown type id " + id + " in " +
+ current.toJson());
+ }
+ while (id != goal) {
+ List<TypeDescription> children = current.getChildren();
+ for(int i=0; i < children.size(); ++i) {
+ TypeDescription child = children.get(i);
+ if (goal <= child.getMaximumId()) {
+ current = child;
+ visitor.visit(current, i);
+ break;
+ }
+ }
+ id = current.getId();
+ }
+ }
+
+ /**
+ * Find a column in a schema by walking down the type tree to find the right column.
+ * @param schema the schema to look in
+ * @param source the name of the column
+ * @param isSchemaEvolutionCaseAware should the string compare be case sensitive
+ * @param visitor The visitor, which is called on each level
+ */
+ public static void findColumn(TypeDescription schema,
+ ParserUtils.StringPosition source,
+ boolean isSchemaEvolutionCaseAware,
+ TypeVisitor visitor) {
+ findColumn(schema, ParserUtils.splitName(source), isSchemaEvolutionCaseAware, visitor);
+ }
+
+ /**
+ * Find a column in a schema by walking down the type tree to find the right column.
+ * @param schema the schema to look in
+ * @param names the name of the column broken into a list of names per level
+ * @param isSchemaEvolutionCaseAware should the string compare be case sensitive
+ * @param visitor The visitor, which is called on each level
+ */
+ public static void findColumn(TypeDescription schema,
+ List<String> names,
+ boolean isSchemaEvolutionCaseAware,
+ TypeVisitor visitor) {
if (names.size() == 1 && INTEGER_PATTERN.matcher(names.get(0)).matches()) {
- return schema.findSubtype(Integer.parseInt(names.get(0)));
+ findSubtype(schema, Integer.parseInt(names.get(0)), visitor);
+ return;
}
- TypeDescription current = SchemaEvolution.checkAcidSchema(schema)
- ? SchemaEvolution.getBaseRow(schema) : schema;
+ TypeDescription current = schema;
+ int posn;
while (names.size() > 0) {
String first = names.remove(0);
switch (current.getCategory()) {
case STRUCT: {
- int posn = -1;
- if (isSchemaEvolutionCaseAware) {
- posn = current.getFieldNames().indexOf(first);
- } else {
- // Case-insensitive search like ORC 1.5
- for (int i = 0; i < current.getFieldNames().size(); i++) {
- if (current.getFieldNames().get(i).equalsIgnoreCase(first)) {
- posn = i;
- break;
- }
- }
- }
- if (posn == -1) {
- throw new IllegalArgumentException("Field " + first +
- " not found in " + current.toString());
- }
- current = current.getChildren().get(posn);
+ posn = isSchemaEvolutionCaseAware
+ ? current.getFieldNames().indexOf(first)
+ : findCaseInsensitive(current.getFieldNames(), first);
break;
}
case LIST:
if (first.equals("_elem")) {
- current = current.getChildren().get(0);
+ posn = 0;
} else {
- throw new IllegalArgumentException("Field " + first +
- "not found in " + current.toString());
+ posn = -1;
}
break;
case MAP:
if (first.equals("_key")) {
- current = current.getChildren().get(0);
+ posn = 0;
} else if (first.equals("_value")) {
- current = current.getChildren().get(1);
+ posn = 1;
} else {
- throw new IllegalArgumentException("Field " + first +
- "not found in " + current.toString());
+ posn = -1;
}
break;
case UNION: {
try {
- int posn = Integer.parseInt(first);
+ posn = Integer.parseInt(first);
if (posn < 0 || posn >= current.getChildren().size()) {
throw new NumberFormatException("off end of union");
}
- current = current.getChildren().get(posn);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Field " + first +
"not found in " + current.toString(), e);
@@ -313,11 +392,73 @@ public class ParserUtils {
break;
}
default:
- throw new IllegalArgumentException("Field " + first +
- "not found in " + current.toString());
+ posn = -1;
+ }
+ if (posn < 0) {
+ throw new IllegalArgumentException("Field " + first +
+ " not found in " + current.toString());
+ }
+ current = current.getChildren().get(posn);
+ visitor.visit(current, posn);
+ }
+ }
+
+ static class ColumnFinder implements TypeVisitor {
+ private ColumnVector[] top;
+ private ColumnVector current = null;
+ private final ColumnVector[] result;
+ private int resultIdx = 0;
+
+ ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) {
+ top = batch.cols;
+ if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+ result = new ColumnVector[levels];
+ } else {
+ result = new ColumnVector[levels + 1];
+ current = top[0];
+ addResult(current);
}
}
- return current;
+
+ private void addResult(ColumnVector vector) {
+ result[resultIdx] = vector;
+ resultIdx += 1;
+ }
+
+ @Override
+ public void visit(TypeDescription type, int posn) {
+ if (current == null) {
+ current = top[posn];
+ top = null;
+ } else {
+ current = navigate(current, posn);
+ }
+ addResult(current);
+ }
+
+ private ColumnVector navigate(ColumnVector parent, int posn) {
+ if (parent instanceof ListColumnVector) {
+ return ((ListColumnVector) parent).child;
+ } else if (parent instanceof StructColumnVector) {
+ return ((StructColumnVector) parent).fields[posn];
+ } else if (parent instanceof UnionColumnVector) {
+ return ((UnionColumnVector) parent).fields[posn];
+ } else if (parent instanceof MapColumnVector) {
+ MapColumnVector m = (MapColumnVector) parent;
+ return posn == 0 ? m.keys : m.values;
+ }
+ throw new IllegalArgumentException("Unknown complex column vector " + parent.getClass());
+ }
+ }
+
+ public static ColumnVector[] findColumnVectors(TypeDescription schema,
+ StringPosition source,
+ boolean isCaseSensitive,
+ VectorizedRowBatch batch) {
+ List<String> names = ParserUtils.splitName(source);
+ ColumnFinder result = new ColumnFinder(schema, batch, names.size());
+ findColumn(removeAcid(schema), names, isCaseSensitive, result);
+ return result.result;
}
public static List<TypeDescription> findSubtypeList(TypeDescription schema,
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 6d27508..bb1ff9a 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.OrcProto;
+import org.apache.orc.OrcFilterContext;
import org.apache.orc.impl.reader.ReaderEncryption;
import org.apache.orc.impl.reader.StripePlanner;
import org.apache.orc.impl.reader.tree.BatchReader;
@@ -67,7 +68,7 @@ public class TreeReaderFactory {
Set<Integer> getColumnFilterIds();
- Consumer<VectorizedRowBatch> getColumnFilterCallback();
+ Consumer<OrcFilterContext> getColumnFilterCallback();
boolean isSkipCorrupt();
@@ -94,7 +95,7 @@ public class TreeReaderFactory {
private boolean useProlepticGregorian;
private boolean fileUsedProlepticGregorian;
private Set<Integer> filterColumnIds = Collections.emptySet();
- Consumer<VectorizedRowBatch> filterCallback;
+ Consumer<OrcFilterContext> filterCallback;
public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
this.evolution = evolution;
@@ -106,7 +107,7 @@ public class TreeReaderFactory {
return this;
}
- public ReaderContext setFilterCallback(Set<Integer> filterColumnsList, Consumer<VectorizedRowBatch> filterCallback) {
+ public ReaderContext setFilterCallback(Set<Integer> filterColumnsList, Consumer<OrcFilterContext> filterCallback) {
this.filterColumnIds = filterColumnsList;
this.filterCallback = filterCallback;
return this;
@@ -150,7 +151,7 @@ public class TreeReaderFactory {
}
@Override
- public Consumer<VectorizedRowBatch> getColumnFilterCallback() {
+ public Consumer<OrcFilterContext> getColumnFilterCallback() {
return filterCallback;
}
diff --git a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
index e6a2db8..6153c5d 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
@@ -19,6 +19,7 @@ package org.apache.orc.impl.reader.tree;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.impl.OrcFilterContextImpl;
import org.apache.orc.impl.TreeReaderFactory;
import java.io.IOException;
@@ -27,10 +28,12 @@ import java.util.Set;
public class StructBatchReader extends BatchReader {
// The reader context including row-filtering details
private final TreeReaderFactory.Context context;
+ private final OrcFilterContextImpl filterContext;
public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader, TreeReaderFactory.Context context) {
super(rowReader);
this.context = context;
+ this.filterContext = new OrcFilterContextImpl(context.getSchemaEvolution().getReaderSchema());
}
private void readBatchColumn(VectorizedRowBatch batch, TypeReader[] children, int batchSize, int index)
@@ -62,7 +65,7 @@ public class StructBatchReader extends BatchReader {
// Apply filter callback to reduce number of # rows selected for decoding in the next TreeReaders
if (!earlyExpandCols.isEmpty() && this.context.getColumnFilterCallback() != null) {
- this.context.getColumnFilterCallback().accept(batch);
+ this.context.getColumnFilterCallback().accept(filterContext.setBatch(batch));
}
// Read the remaining columns applying row-level filtering
diff --git a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
new file mode 100644
index 0000000..8631202
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.impl.OrcFilterContextImpl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestOrcFilterContext {
+ private final TypeDescription schema = TypeDescription.createStruct()
+ .addField("f1", TypeDescription.createLong())
+ .addField("f2", TypeDescription.createString())
+ .addField("f3",
+ TypeDescription.createStruct()
+ .addField("a", TypeDescription.createInt())
+ .addField("b", TypeDescription.createLong())
+ .addField("c",
+ TypeDescription.createMap(TypeDescription.createInt(),
+ TypeDescription.createDate())))
+ .addField("f4",
+ TypeDescription.createList(TypeDescription.createStruct()
+ .addField("a", TypeDescription.createChar())
+ .addField("b", TypeDescription.createBoolean())))
+ .addField("f5",
+ TypeDescription.createMap(TypeDescription.createInt(),
+ TypeDescription.createDate()))
+ .addField("f6",
+ TypeDescription.createUnion()
+ .addUnionChild(TypeDescription.createInt())
+ .addUnionChild(TypeDescription.createStruct()
+ .addField("a", TypeDescription.createDate())
+ .addField("b",
+ TypeDescription.createList(TypeDescription.createChar()))
+ )
+ );
+ private final OrcFilterContext filterContext = new OrcFilterContextImpl(schema)
+ .setBatch(schema.createRowBatch());
+
+ @Before
+ public void setup() {
+ filterContext.reset();
+ }
+
+ @Test
+ public void testTopLevelElementaryType() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f1");
+ assertEquals(1, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(LongColumnVector.class));
+ }
+
+ @Test
+ public void testTopLevelCompositeType() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f3");
+ assertEquals(1, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(StructColumnVector.class));
+
+ vectorBranch = filterContext.findColumnVector("f4");
+ assertEquals(1, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(ListColumnVector.class));
+
+ vectorBranch = filterContext.findColumnVector("f5");
+ assertEquals(1, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(MapColumnVector.class));
+
+ vectorBranch = filterContext.findColumnVector("f6");
+ assertEquals(1, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(UnionColumnVector.class));
+ }
+
+ @Test
+ public void testNestedType() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f3.a");
+ assertEquals(2, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(StructColumnVector.class));
+ assertThat(vectorBranch[1], instanceOf(LongColumnVector.class));
+
+ vectorBranch = filterContext.findColumnVector("f3.c");
+ assertEquals(2, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(StructColumnVector.class));
+ assertThat(vectorBranch[1], instanceOf(MapColumnVector.class));
+
+ vectorBranch = filterContext.findColumnVector("f6.1.b");
+ assertEquals(3, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(UnionColumnVector.class));
+ assertThat(vectorBranch[1], instanceOf(StructColumnVector.class));
+ assertThat(vectorBranch[2], instanceOf(ListColumnVector.class));
+ }
+
+ @Test
+ public void testTopLevelVector() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f3");
+ vectorBranch[0].noNulls = true;
+ assertTrue(OrcFilterContext.noNulls(vectorBranch));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+
+ vectorBranch[0].noNulls = false;
+ vectorBranch[0].isNull[0] = true;
+ assertFalse(OrcFilterContext.noNulls(vectorBranch));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+ }
+
+ @Test
+ public void testNestedVector() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f3.a");
+ vectorBranch[0].noNulls = true;
+ vectorBranch[1].noNulls = true;
+ assertTrue(OrcFilterContext.noNulls(vectorBranch));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 2));
+
+ vectorBranch = filterContext.findColumnVector("f3.a");
+ vectorBranch[0].noNulls = false;
+ vectorBranch[0].isNull[0] = true;
+ vectorBranch[1].noNulls = true;
+ assertFalse(OrcFilterContext.noNulls(vectorBranch));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 2));
+
+ vectorBranch = filterContext.findColumnVector("f3.a");
+ vectorBranch[0].noNulls = true;
+ vectorBranch[1].noNulls = false;
+ vectorBranch[1].isNull[2] = true;
+ assertFalse(OrcFilterContext.noNulls(vectorBranch));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
+
+ vectorBranch = filterContext.findColumnVector("f3.a");
+ vectorBranch[0].noNulls = false;
+ vectorBranch[0].isNull[0] = true;
+ vectorBranch[1].noNulls = false;
+ vectorBranch[1].isNull[2] = true;
+ assertFalse(OrcFilterContext.noNulls(vectorBranch));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
+ }
+
+ @Test
+ public void testTopLevelList() {
+ TypeDescription topListSchema = TypeDescription.createList(
+ TypeDescription.createStruct()
+ .addField("a", TypeDescription.createChar())
+ .addField("b", TypeDescription
+ .createBoolean()));
+ OrcFilterContext fc = new OrcFilterContextImpl(topListSchema)
+ .setBatch(topListSchema.createRowBatch());
+ ColumnVector[] vectorBranch = fc.findColumnVector("_elem");
+ assertEquals(2, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(ListColumnVector.class));
+ assertThat(vectorBranch[1], instanceOf(StructColumnVector.class));
+ }
+
+ @Test
+ public void testUnsupportedIsNullUse() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f4._elem.a");
+ assertEquals(3, vectorBranch.length);
+ assertThat(vectorBranch[0], instanceOf(ListColumnVector.class));
+ assertThat(vectorBranch[1], instanceOf(StructColumnVector.class));
+ assertThat(vectorBranch[2], instanceOf(BytesColumnVector.class));
+
+ assertTrue(OrcFilterContext.noNulls(vectorBranch));
+ IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+ () -> OrcFilterContext.isNull(vectorBranch,
+ 0));
+ assertThat(exception.getMessage(), containsString("ListColumnVector"));
+ assertThat(exception.getMessage(), containsString("List and Map vectors are not supported"));
+ }
+
+ @Test
+ public void testRepeatingVector() {
+ ColumnVector[] vectorBranch = filterContext.findColumnVector("f3.a");
+ vectorBranch[0].noNulls = true;
+ vectorBranch[0].isRepeating = true;
+ vectorBranch[1].noNulls = true;
+ assertTrue(OrcFilterContext.noNulls(vectorBranch));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 0));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 1));
+ assertFalse(OrcFilterContext.isNull(vectorBranch, 2));
+
+ vectorBranch[0].noNulls = false;
+ vectorBranch[0].isRepeating = true;
+ vectorBranch[0].isNull[0] = true;
+ vectorBranch[1].noNulls = true;
+ assertFalse(OrcFilterContext.noNulls(vectorBranch));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 0));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 1));
+ assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
+ }
+}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
index 10d5594..2052bea 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringSkip.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.impl.OrcFilterContextImpl;
import org.apache.orc.impl.RecordReaderImpl;
import org.junit.Assert;
import org.junit.Before;
@@ -43,6 +45,7 @@ import java.text.Format;
import java.text.SimpleDateFormat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Types that are skipped at row-level include: Decimal, Decimal64, Double, Float, Char, VarChar, String, Boolean, Timestamp
@@ -77,52 +80,54 @@ public class TestRowFilteringSkip {
}
// Filter all rows except: 924 and 940
- public static void intAnyRowFilter(VectorizedRowBatch batch) {
+ public static void intAnyRowFilter(OrcFilterContext batch) {
// Dummy Filter implementation passing just one Batch row
int newSize = 2;
- batch.selected[0] = batch.size-100;
- batch.selected[1] = 940;
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.getSelected()[0] = batch.getSelectedSize()-100;
+ batch.getSelected()[1] = 940;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
// Filter all rows except the first one
- public static void intFirstRowFilter(VectorizedRowBatch batch) {
+ public static void intFirstRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row <batch.size; ++row) {
+ for (int row = 0; row <batch.getSelectedSize(); ++row) {
if (row == 0) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
// Filter out rows in a round-robbin fashion starting with a pass
- public static void intRoundRobbinRowFilter(VectorizedRowBatch batch) {
+ public static void intRoundRobbinRowFilter(OrcFilterContext batch) {
int newSize = 0;
- for (int row = 0; row < batch.size; ++row) {
+ int[] selected = batch.getSelected();
+ for (int row = 0; row < batch.getSelectedSize(); ++row) {
if ((row % 2) == 0) {
- batch.selected[newSize++] = row;
+ selected[newSize++] = row;
}
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelected(selected);
+ batch.setSelectedSize(newSize);
}
static int rowCount = 0;
- public static void intCustomValueFilter(VectorizedRowBatch batch) {
- LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+ public static void intCustomValueFilter(OrcFilterContext batch) {
+ LongColumnVector col1 = (LongColumnVector) ((OrcFilterContextImpl) batch).getCols()[0];
int newSize = 0;
- for (int row = 0; row <batch.size; ++row) {
+ for (int row = 0; row <batch.getSelectedSize(); ++row) {
long val = col1.vector[row];
if ((val == 2) || (val == 5) || (val == 13) || (val == 29) || (val == 70)) {
- batch.selected[newSize++] = row;
+ batch.getSelected()[newSize++] = row;
}
rowCount++;
}
- batch.selectedInUse = true;
- batch.size = newSize;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(newSize);
}
@Test
@@ -1293,15 +1298,24 @@ public class TestRowFilteringSkip {
}
}
- private static void notNullFilterMissing(VectorizedRowBatch batch) {
+ private static void notNullFilterMissing(OrcFilterContext batch) {
int selIdx = 0;
- for (int i = 0; i < batch.size; i++) {
- if (!batch.cols[2].isNull[i]) {
- batch.selected[selIdx++] = i;
+ ColumnVector cv = ((OrcFilterContextImpl) batch).getCols()[2];
+ if (cv.isRepeating) {
+ if (!cv.isNull[0]) {
+ for (int i = 0; i < batch.getSelectedSize(); i++) {
+ batch.getSelected()[selIdx++] = i;
+ }
+ }
+ } else {
+ for (int i = 0; i < batch.getSelectedSize(); i++) {
+ if (!((OrcFilterContextImpl) batch).getCols()[2].isNull[i]) {
+ batch.getSelected()[selIdx++] = i;
+ }
}
}
- batch.selectedInUse = true;
- batch.size = selIdx;
+ batch.setSelectedInUse(true);
+ batch.setSelectedSize(selIdx);
}
@Test
@@ -1325,25 +1339,36 @@ public class TestRowFilteringSkip {
int noNullCnt = 0;
while (rows.nextBatch(batch)) {
Assert.assertTrue(batch.selectedInUse);
- Assert.assertTrue(batch.selected != null);
+ Assert.assertNotNull(batch.selected);
// Rows are filtered so it should never be 1024
Assert.assertTrue(batch.size != ColumnBatchRows);
- assertEquals( true, col1.noNulls);
+ assertTrue(col1.noNulls);
for (int r = 0; r < ColumnBatchRows; ++r) {
- if (col1.vector[r] != 100)
- noNullCnt ++;
+ if (col1.vector[r] != 100) noNullCnt ++;
+ }
+ // We should always select 1 row as the file is spaced as such. We could get 0 in case all
+ // the rows are filtered out.
+ if (batch.size == 0) {
+ continue;
}
+ Assert.assertEquals(1, batch.size);
+ long val = col1.vector[batch.selected[0]] ;
+ // Check that we have read the valid value
+ Assert.assertTrue((val == 2) || (val == 5) || (val == 13) || (val == 29) || (val == 70));
+ if (val == 2) {
+ Assert.assertEquals(0, col5.getTime(batch.selected[0]));
+ } else {
+ Assert.assertNotEquals(0, col5.getTime(batch.selected[0]));
+ }
+
+ // Check that unselected is not populated
+ Assert.assertEquals(0, batch.selected[1]);
}
// Total rows of the file should be 25k
Assert.assertEquals(25000, rowCount);
// Make sure that our filter worked ( 5 rows with userId != 100)
Assert.assertEquals(5, noNullCnt);
- Assert.assertEquals(false, col5.isRepeating);
- Assert.assertEquals(544, batch.selected[0]);
- Assert.assertEquals(0, batch.selected[1]);
- Assert.assertTrue(col5.getTime(0) == 0);
- Assert.assertTrue(col5.getTime(544) != 0);
}
}
}
\ No newline at end of file
diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcFilterContextImpl.java b/java/core/src/test/org/apache/orc/impl/TestOrcFilterContextImpl.java
new file mode 100644
index 0000000..acf14ce
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestOrcFilterContextImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class TestOrcFilterContextImpl {
+
+ private final TypeDescription schema = TypeDescription.createStruct()
+ .addField("f1", TypeDescription.createLong())
+ .addField("f2", TypeDescription.createStruct()
+ .addField("f2a", TypeDescription.createLong())
+ .addField("f2b", TypeDescription.createString()))
+ .addField("f3", TypeDescription.createString());
+
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testSuccessfulRetrieval() {
+ VectorizedRowBatch b = createBatch();
+ OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+ fc.setBatch(b);
+
+ validateF1Vector(fc.findColumnVector("f1"), 1);
+ validateF2Vector(fc.findColumnVector("f2"));
+ validateF2AVector(fc.findColumnVector("f2.f2a"));
+ validateF2BVector(fc.findColumnVector("f2.f2b"));
+ validateF3Vector(fc.findColumnVector("f3"));
+ }
+
+ @Test
+ public void testSuccessfulRetrievalWithBatchChange() {
+ VectorizedRowBatch b1 = createBatch();
+ VectorizedRowBatch b2 = createBatch();
+ ((LongColumnVector) b2.cols[0]).vector[0] = 100;
+ OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+ fc.setBatch(b1);
+ validateF1Vector(fc.findColumnVector("f1"), 1);
+ // Change the batch
+ fc.setBatch(b2);
+ validateF1Vector(fc.findColumnVector("f1"), 100);
+ }
+
+ @Test
+ public void testMissingFieldTopLevel() {
+ VectorizedRowBatch b = createBatch();
+ OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+ fc.setBatch(b);
+
+ // Missing field at top level
+ IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+ () -> fc.findColumnVector("f4"));
+ assertThat(exception.getMessage(), containsString("Field f4 not found in"));
+ }
+
+ @Test
+ public void testMissingFieldNestedLevel() {
+ VectorizedRowBatch b = createBatch();
+ OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+ fc.setBatch(b);
+
+ // Missing field at top level
+ IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+ () -> fc.findColumnVector("f2.c"));
+ assertThat(exception.getMessage(),
+ containsString("Field c not found in struct<f2a:bigint,f2b:string>"));
+ }
+
+ @Test
+ public void testPropagations() {
+ OrcFilterContextImpl fc = new OrcFilterContextImpl(schema);
+ assertNull(fc.getBatch());
+ fc.setBatch(schema.createRowBatch());
+ assertNotNull(fc.getBatch());
+ assertFalse(fc.isSelectedInUse());
+
+ // Set selections
+ fc.setSelectedInUse(true);
+ fc.getSelected()[0] = 5;
+ fc.setSelectedSize(1);
+ assertTrue(fc.isSelectedInUse());
+ assertEquals(1, fc.getSelectedSize());
+ assertEquals(fc.getBatch().getMaxSize(), fc.getSelected().length);
+ assertArrayEquals(new int[] {5}, Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+ assertTrue(fc.validateSelected());
+ fc.setSelectedSize(2);
+ assertFalse(fc.validateSelected());
+
+ // Use a new selected vector
+ fc.setSelected(new int[fc.getBatch().getMaxSize()]);
+ assertArrayEquals(new int[] {0, 0}, Arrays.copyOf(fc.getSelected(), fc.getSelectedSize()));
+
+ // Increase the size of the vector
+ fc.reset();
+ assertFalse(fc.isSelectedInUse());
+ int currSize = fc.getBatch().getMaxSize();
+ assertEquals(currSize, fc.getSelected().length);
+ fc.updateSelected(currSize + 1);
+ assertEquals(currSize + 1, fc.getSelected().length);
+
+ // Set the filter context
+ fc.setFilterContext(true, new int[3], 1);
+ assertTrue(fc.isSelectedInUse());
+ assertEquals(3, fc.getBatch().getMaxSize());
+ assertEquals(1, fc.getSelectedSize());
+ }
+
+ private VectorizedRowBatch createBatch() {
+ VectorizedRowBatch b = schema.createRowBatch();
+ LongColumnVector v1 = (LongColumnVector) b.cols[0];
+ StructColumnVector v2 = (StructColumnVector) b.cols[1];
+ LongColumnVector v2a = (LongColumnVector) v2.fields[0];
+ BytesColumnVector v2b = (BytesColumnVector) v2.fields[1];
+ BytesColumnVector v3 = (BytesColumnVector) b.cols[2];
+
+ v1.vector[0] = 1;
+ v2a.vector[0] = 2;
+ v2b.setVal(0, "3".getBytes(StandardCharsets.UTF_8));
+ v3.setVal(0, "4".getBytes(StandardCharsets.UTF_8));
+ return b;
+ }
+
+ private void validateF1Vector(ColumnVector[] v, long headValue) {
+ assertEquals(1, v.length);
+ validateF1Vector(v[0], headValue);
+ }
+
+ private void validateF1Vector(ColumnVector v, long headValue) {
+ LongColumnVector l = (LongColumnVector) v;
+ assertEquals(headValue, l.vector[0]);
+ }
+
+ private void validateF2Vector(ColumnVector[] v) {
+ assertEquals(1, v.length);
+ validateF2Vector(v[0]);
+ }
+
+ private void validateF2Vector(ColumnVector v) {
+ StructColumnVector s = (StructColumnVector) v;
+ validateF2AVector(s.fields[0]);
+ validateF2BVector(s.fields[1]);
+ }
+
+ private void validateF2AVector(ColumnVector[] v) {
+ assertEquals(2, v.length);
+ validateF2Vector(v[0]);
+ validateF2AVector(v[1]);
+ }
+
+ private void validateF2AVector(ColumnVector v) {
+ LongColumnVector l = (LongColumnVector) v;
+ assertEquals(2, l.vector[0]);
+ }
+
+ private void validateF2BVector(ColumnVector[] v) {
+ assertEquals(2, v.length);
+ validateF2Vector(v[0]);
+ validateF2BVector(v[1]);
+ }
+
+ private void validateF2BVector(ColumnVector v) {
+ BytesColumnVector b = (BytesColumnVector) v;
+ assertEquals("3", b.toString(0));
+ }
+
+ private void validateF3Vector(ColumnVector[] v) {
+ assertEquals(1, v.length);
+ BytesColumnVector b = (BytesColumnVector) v[0];
+ assertEquals("4", b.toString(0));
+ }
+}
\ No newline at end of file