You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2020/08/05 18:57:10 UTC
[orc] branch master updated: ORC-597: Add Row-level filtering
benchmark (#478)
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new b89aabd ORC-597: Add Row-level filtering benchmark (#478)
b89aabd is described below
commit b89aabde88ca7d7fea6eb775d57c49c4089a1c74
Author: Panagiotis Garefalakis <pg...@cloudera.com>
AuthorDate: Wed Aug 5 21:57:00 2020 +0300
ORC-597: Add Row-level filtering benchmark (#478)
### What changes were proposed in this pull request?
Extending orc benchmark to support row-level filtering using existing datasets
### Why are the changes needed?
To compare the result between versions.
### How was this patch tested?
Run the benchmark.
---
java/bench/README.md | 4 +
java/bench/hive/pom.xml | 4 +
.../bench/hive/RowFilterProjectionBenchmark.java | 219 +++++++++++++++++++++
.../hive/rowfilter/BooleanRowFilterBenchmark.java | 172 ++++++++++++++++
.../hive/rowfilter/DecimalRowFilterBenchmark.java | 173 ++++++++++++++++
.../hive/rowfilter/DoubleRowFilterBenchmark.java | 172 ++++++++++++++++
.../hive/rowfilter/StringRowFilterBenchmark.java | 172 ++++++++++++++++
.../rowfilter/TimestampRowFilterBenchmark.java | 172 ++++++++++++++++
java/bench/pom.xml | 2 +-
9 files changed, 1089 insertions(+), 1 deletion(-)
diff --git a/java/bench/README.md b/java/bench/README.md
index f49404d..c0ee992 100644
--- a/java/bench/README.md
+++ b/java/bench/README.md
@@ -41,6 +41,10 @@ To run decimal/decimal64 benchmark:
```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar decimal data```
+To run row-filter benchmark:
+
+```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar row-filter data```
+
To run spark benchmark:
```% java -jar spark/target/orc-benchmarks-spark-*.jar spark data```
diff --git a/java/bench/hive/pom.xml b/java/bench/hive/pom.xml
index 7bdf612..e8ed1bf 100644
--- a/java/bench/hive/pom.xml
+++ b/java/bench/hive/pom.xml
@@ -114,6 +114,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
new file mode 100644
index 0000000..d125606
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive;
+
+import com.google.auto.service.AutoService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.TrackingLocalFileSystem;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.OrcBenchmark;
+import org.apache.orc.bench.core.ReadCounters;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Thread)
+@AutoService(OrcBenchmark.class)
+public class RowFilterProjectionBenchmark implements OrcBenchmark {
+
+ private static final Path root = Utilities.getBenchmarkRoot();
+
+ @Param({"taxi"})
+ public String dataset;
+
+ @Param({"none"})
+ public String compression;
+
+ @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+ public String filter_percentage;
+
+ @Param({"all", "2", "4", "8", "16"})
+ public String projected_columns;
+
+ @Override
+ public String getName() {
+ return "row-filter";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Benchmark column projection with row-level filtering";
+ }
+
+ @Override
+ public void run(String[] args) throws Exception {
+ new Runner(Utilities.parseOptions(args, getClass())).run();
+ }
+
+ static Set<Integer> filterValues = null;
+ public static void generateRandomSet(double percentage) throws IllegalArgumentException {
+ if (percentage > 1.0) {
+ throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+ }
+ filterValues = new HashSet<>();
+ while (filterValues.size() < (1024 * percentage)) {
+ Random randomGenerator = new Random();
+ filterValues.add(randomGenerator.nextInt(1024));
+ }
+ }
+
+ public static void customIntRowFilter(VectorizedRowBatch batch) {
+ int newSize = 0;
+ for (int row = 0; row < batch.size; ++row) {
+ // Select ONLY specific keys
+ if (filterValues.contains(row)) {
+ batch.selected[newSize++] = row;
+ }
+ }
+ batch.selectedInUse = true;
+ batch.size = newSize;
+ }
+
+ @Benchmark
+ public void orcRowFilter(ReadCounters counters) throws Exception {
+ Configuration conf = new Configuration();
+ TrackingLocalFileSystem fs = new TrackingLocalFileSystem();
+ fs.initialize(new URI("file:///"), conf);
+ FileSystem.Statistics statistics = fs.getLocalStatistics();
+ statistics.reset();
+ OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs);
+ Path path = Utilities.getVariant(root, dataset, "orc", compression);
+ Reader reader = OrcFile.createReader(path, options);
+ TypeDescription schema = reader.getSchema();
+ // select an ID column to apply filter on
+ String filter_column;
+ if ("taxi".equals(dataset)) {
+ filter_column = "vendor_id";
+ } else if ("sales".equals(dataset)) {
+ filter_column = "sales_id";
+ } else if ("github".equals(dataset)) {
+ filter_column = "id";
+ } else {
+ throw new IllegalArgumentException("Unknown data set " + dataset);
+ }
+ boolean[] include = new boolean[schema.getMaximumId() + 1];
+ int columns_len = schema.getMaximumId();
+ if (projected_columns.compareTo("all") != 0) {
+ columns_len = Integer.parseInt(projected_columns);
+ }
+ // select the remaining columns to project
+ List<TypeDescription> children = schema.getChildren();
+ boolean foundFilterCol = false;
+ for (int c = children.get(0).getId(); c < schema.getMaximumId() + 1; ++c) {
+ if (c < schema.getFieldNames().size() && schema.getFieldNames().get(c-1).compareTo(filter_column) == 0) {
+ foundFilterCol = true;
+ include[c] = true;
+ }
+ else {
+ if (columns_len > 0) {
+ include[c] = true;
+ columns_len--;
+ }
+ }
+ if (foundFilterCol && (columns_len == 0)) break;
+ }
+ generateRandomSet(Double.parseDouble(filter_percentage));
+ RecordReader rows =
+ reader.rows(reader.options()
+ .include(include)
+ .setRowFilter(new String[]{filter_column}, RowFilterProjectionBenchmark::customIntRowFilter));
+
+ VectorizedRowBatch batch = schema.createRowBatch();
+ while (rows.nextBatch(batch)) {
+ counters.addRecords(batch.size);
+ }
+ rows.close();
+ counters.addBytes(statistics.getReadOps(), statistics.getBytesRead());
+ counters.addInvocation();
+ }
+
+ @Benchmark
+ public void orcNoFilter(ReadCounters counters) throws Exception {
+ Configuration conf = new Configuration();
+ TrackingLocalFileSystem fs = new TrackingLocalFileSystem();
+ fs.initialize(new URI("file:///"), conf);
+ FileSystem.Statistics statistics = fs.getLocalStatistics();
+ statistics.reset();
+ OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs);
+ Path path = Utilities.getVariant(root, dataset, "orc", compression);
+ Reader reader = OrcFile.createReader(path, options);
+ TypeDescription schema = reader.getSchema();
+ // select an ID column to apply filter on
+ String filter_column;
+ if ("taxi".equals(dataset)) {
+ filter_column = "vendor_id";
+ } else if ("sales".equals(dataset)) {
+ filter_column = "sales_id";
+ } else if ("github".equals(dataset)) {
+ filter_column = "id";
+ } else {
+ throw new IllegalArgumentException("Unknown data set " + dataset);
+ }
+ boolean[] include = new boolean[schema.getMaximumId() + 1];
+ int columns_len = schema.getMaximumId();
+ if (projected_columns.compareTo("all") != 0) {
+ columns_len = Integer.parseInt(projected_columns);
+ }
+ // select the remaining columns to project
+ List<TypeDescription> children = schema.getChildren();
+ boolean foundFilterCol = false;
+ for (int c = children.get(0).getId(); c < schema.getMaximumId() + 1; ++c) {
+ if (c < schema.getFieldNames().size() && schema.getFieldNames().get(c-1).compareTo(filter_column) == 0) {
+ foundFilterCol = true;
+ include[c] = true;
+ }
+ else {
+ if (columns_len > 0) {
+ include[c] = true;
+ columns_len--;
+ }
+ }
+ if (foundFilterCol && (columns_len == 0)) break;
+ }
+ RecordReader rows = reader.rows(reader.options().include(include));
+
+ VectorizedRowBatch batch = schema.createRowBatch();
+ while (rows.nextBatch(batch)) {
+ counters.addRecords(batch.size);
+ }
+ rows.close();
+ counters.addBytes(statistics.getReadOps(), statistics.getBytesRead());
+ counters.addInvocation();
+ }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
new file mode 100644
index 0000000..b7a0e69
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class BooleanRowFilterBenchmark extends org.openjdk.jmh.Main {
+
+ private static final Path root = new Path(System.getProperty("user.dir"));
+
+ @State(Scope.Thread)
+ public static class InputState {
+
+ @Param({"ORIGINAL"})
+ public TypeDescription.RowBatchVersion version;
+
+ @Param({"BOOLEAN"})
+ public TypeDescription.Category benchType;
+
+ @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+ public String filterPerc;
+
+ @Param({"2"})
+ public int filterColsNum;
+
+ Configuration conf = new Configuration();
+ FileSystem fs;
+ TypeDescription schema;
+ VectorizedRowBatch batch;
+ Path path;
+ boolean[] include;
+ Reader reader;
+ Reader.Options readerOptions;
+ String filter_column = "sales_id";
+
+ @Setup
+ public void setup() throws IOException {
+ fs = FileSystem.getLocal(conf).getRaw();
+ path = new Path(root, "data/generated/sales/orc.none");
+ schema = Utilities.loadSchema("sales.schema");
+ batch = schema.createRowBatch(version, 1024);
+ include = new boolean[schema.getMaximumId() + 1];
+ for(TypeDescription child: schema.getChildren()) {
+ if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+ System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ } else if (child.getCategory() == benchType) {
+ System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ if (--filterColsNum == 0) break;
+ }
+ }
+ if (filterColsNum != 0) {
+ System.err.println("Dataset does not contain type: "+ benchType);
+ System.exit(-1);
+ }
+ generateRandomSet(Double.parseDouble(filterPerc));
+ reader = OrcFile.createReader(path,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ // just read the Boolean columns
+ readerOptions = reader.options().include(include);
+ }
+
+ static boolean[] filterValues = null;
+ public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+ if (percentage > 1.0) {
+ throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+ }
+ filterValues = new boolean[1024];
+ int count = 0;
+ while (count < (1024 * percentage)) {
+ Random randomGenerator = new Random();
+ int randVal = randomGenerator.nextInt(1024);
+ if (filterValues[randVal] == false) {
+ filterValues[randVal] = true;
+ count++;
+ }
+ }
+ return filterValues;
+ }
+
+ public static void customIntRowFilter(VectorizedRowBatch batch) {
+ int newSize = 0;
+ for (int row = 0; row < batch.size; ++row) {
+ if (filterValues[row]) {
+ batch.selected[newSize++] = row;
+ }
+ }
+ batch.selectedInUse = true;
+ batch.size = newSize;
+ }
+ }
+
+ @Benchmark
+ public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows =
+ state.reader.rows(state.readerOptions
+ .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ @Benchmark
+ public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows = state.reader.rows(state.readerOptions);
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ /*
+ * Run this test:
+ * java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.BooleanRowFilterBenchmark
+ */
+ public static void main(String[] args) throws RunnerException {
+ new Runner(new OptionsBuilder()
+ .include(BooleanRowFilterBenchmark.class.getSimpleName())
+ .forks(1)
+ .build()).run();
+ }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
new file mode 100644
index 0000000..a43c4c5
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class DecimalRowFilterBenchmark extends org.openjdk.jmh.Main {
+
+ private static final Path root = new Path(System.getProperty("user.dir"));
+
+ @State(Scope.Thread)
+ public static class InputState {
+
+ // try both DecimalColumnVector and Decimal64
+ @Param({"ORIGINAL", "USE_DECIMAL64"})
+ public TypeDescription.RowBatchVersion version;
+
+ @Param({"DECIMAL"})
+ public TypeDescription.Category benchType;
+
+ @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+ public String filterPerc;
+
+ @Param({"2"})
+ public int filterColsNum;
+
+ Configuration conf = new Configuration();
+ FileSystem fs;
+ TypeDescription schema;
+ VectorizedRowBatch batch;
+ Path path;
+ boolean[] include;
+ Reader reader;
+ Reader.Options readerOptions;
+ String filter_column = "vendor_id";
+
+ @Setup
+ public void setup() throws IOException {
+ fs = FileSystem.getLocal(conf).getRaw();
+ path = new Path(root, "data/generated/taxi/orc.none");
+ schema = Utilities.loadSchema("taxi.schema");
+ batch = schema.createRowBatch(version, 1024);
+ include = new boolean[schema.getMaximumId() + 1];
+ for(TypeDescription child: schema.getChildren()) {
+ if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+ System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ } else if (child.getCategory() == benchType) {
+ System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ if (--filterColsNum == 0) break;
+ }
+ }
+ if (filterColsNum != 0) {
+ System.err.println("Dataset does not contain type: "+ benchType);
+ System.exit(-1);
+ }
+ generateRandomSet(Double.parseDouble(filterPerc));
+ reader = OrcFile.createReader(path,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ // just read the Decimal columns
+ readerOptions = reader.options().include(include);
+ }
+
+ static boolean[] filterValues = null;
+ public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+ if (percentage > 1.0) {
+ throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+ }
+ filterValues = new boolean[1024];
+ int count = 0;
+ while (count < (1024 * percentage)) {
+ Random randomGenerator = new Random();
+ int randVal = randomGenerator.nextInt(1024);
+ if (filterValues[randVal] == false) {
+ filterValues[randVal] = true;
+ count++;
+ }
+ }
+ return filterValues;
+ }
+
+ public static void customIntRowFilter(VectorizedRowBatch batch) {
+ int newSize = 0;
+ for (int row = 0; row < batch.size; ++row) {
+ if (filterValues[row]) {
+ batch.selected[newSize++] = row;
+ }
+ }
+ batch.selectedInUse = true;
+ batch.size = newSize;
+ }
+ }
+
+ @Benchmark
+ public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows =
+ state.reader.rows(state.readerOptions
+ .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ @Benchmark
+ public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows = state.reader.rows(state.readerOptions);
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ /*
+ * Run this test:
+ * java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.DecimalRowFilterBenchmark
+ */
+ public static void main(String[] args) throws RunnerException {
+ new Runner(new OptionsBuilder()
+ .include(DecimalRowFilterBenchmark.class.getSimpleName())
+ .forks(1)
+ .build()).run();
+ }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
new file mode 100644
index 0000000..d5afb48
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class DoubleRowFilterBenchmark extends org.openjdk.jmh.Main {
+ private static final Path root = new Path(System.getProperty("user.dir"));
+
+ @State(Scope.Thread)
+ public static class InputState {
+
+ // try both DecimalColumnVector and Decimal64
+ @Param({"ORIGINAL"})
+ public TypeDescription.RowBatchVersion version;
+
+ @Param({"DOUBLE"})
+ public TypeDescription.Category benchType;
+
+ @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+ public String filterPerc;
+
+ @Param({"2"})
+ public int filterColsNum;
+
+ Configuration conf = new Configuration();
+ FileSystem fs;
+ TypeDescription schema;
+ VectorizedRowBatch batch;
+ Path path;
+ boolean[] include;
+ Reader reader;
+ Reader.Options readerOptions;
+ String filter_column = "vendor_id";
+
+ @Setup
+ public void setup() throws IOException {
+ fs = FileSystem.getLocal(conf).getRaw();
+ path = new Path(root, "data/generated/taxi/orc.none");
+ schema = Utilities.loadSchema("taxi.schema");
+ batch = schema.createRowBatch(version, 1024);
+ include = new boolean[schema.getMaximumId() + 1];
+ for(TypeDescription child: schema.getChildren()) {
+ if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+ System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ } else if (child.getCategory() == benchType) {
+ System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ if (--filterColsNum == 0) break;
+ }
+ }
+ if (filterColsNum != 0) {
+ System.err.println("Dataset does not contain type: "+ benchType);
+ System.exit(-1);
+ }
+ generateRandomSet(Double.parseDouble(filterPerc));
+ reader = OrcFile.createReader(path,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ // just read the Double columns
+ readerOptions = reader.options().include(include);
+ }
+
+ static boolean[] filterValues = null;
+ public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+ if (percentage > 1.0) {
+ throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+ }
+ filterValues = new boolean[1024];
+ int count = 0;
+ while (count < (1024 * percentage)) {
+ Random randomGenerator = new Random();
+ int randVal = randomGenerator.nextInt(1024);
+ if (filterValues[randVal] == false) {
+ filterValues[randVal] = true;
+ count++;
+ }
+ }
+ return filterValues;
+ }
+
+ public static void customIntRowFilter(VectorizedRowBatch batch) {
+ int newSize = 0;
+ for (int row = 0; row < batch.size; ++row) {
+ if (filterValues[row]) {
+ batch.selected[newSize++] = row;
+ }
+ }
+ batch.selectedInUse = true;
+ batch.size = newSize;
+ }
+ }
+
+ @Benchmark
+ public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows =
+ state.reader.rows(state.readerOptions
+ .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ @Benchmark
+ public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows = state.reader.rows(state.readerOptions);
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ /*
+ * Run this test:
+ * java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.DoubleRowFilterBenchmark
+ */
+ public static void main(String[] args) throws RunnerException {
+ new Runner(new OptionsBuilder()
+ .include(DoubleRowFilterBenchmark.class.getSimpleName())
+ .forks(1)
+ .build()).run();
+ }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
new file mode 100644
index 0000000..33447cd
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class StringRowFilterBenchmark extends org.openjdk.jmh.Main {
+
+ private static final Path root = new Path(System.getProperty("user.dir"));
+
+ @State(Scope.Thread)
+ public static class InputState {
+
+ @Param({"ORIGINAL"})
+ public TypeDescription.RowBatchVersion version;
+
+ @Param({"STRING"})
+ public TypeDescription.Category benchType;
+
+ @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+ public String filterPerc;
+
+ @Param({"2"})
+ public int filterColsNum;
+
+ Configuration conf = new Configuration();
+ FileSystem fs;
+ TypeDescription schema;
+ VectorizedRowBatch batch;
+ Path path;
+ boolean[] include;
+ Reader reader;
+ Reader.Options readerOptions;
+ String filter_column = "sales_id";
+
+ @Setup
+ public void setup() throws IOException {
+ fs = FileSystem.getLocal(conf).getRaw();
+ path = new Path(root, "data/generated/sales/orc.none");
+ schema = Utilities.loadSchema("sales.schema");
+ batch = schema.createRowBatch(version, 1024);
+ include = new boolean[schema.getMaximumId() + 1];
+ for(TypeDescription child: schema.getChildren()) {
+ if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+ System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ } else if (child.getCategory() == benchType) {
+ System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ if (--filterColsNum == 0) break;
+ }
+ }
+ if (filterColsNum != 0) {
+ System.err.println("Dataset does not contain type: "+ benchType);
+ System.exit(-1);
+ }
+ generateRandomSet(Double.parseDouble(filterPerc));
+ reader = OrcFile.createReader(path,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ // just read the String columns
+ readerOptions = reader.options().include(include);
+ }
+
+ static boolean[] filterValues = null;
+ public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+ if (percentage > 1.0) {
+ throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+ }
+ filterValues = new boolean[1024];
+ int count = 0;
+ while (count < (1024 * percentage)) {
+ Random randomGenerator = new Random();
+ int randVal = randomGenerator.nextInt(1024);
+ if (filterValues[randVal] == false) {
+ filterValues[randVal] = true;
+ count++;
+ }
+ }
+ return filterValues;
+ }
+
+ public static void customIntRowFilter(VectorizedRowBatch batch) {
+ int newSize = 0;
+ for (int row = 0; row < batch.size; ++row) {
+ if (filterValues[row]) {
+ batch.selected[newSize++] = row;
+ }
+ }
+ batch.selectedInUse = true;
+ batch.size = newSize;
+ }
+ }
+
+ @Benchmark
+ public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows =
+ state.reader.rows(state.readerOptions
+ .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ @Benchmark
+ public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows = state.reader.rows(state.readerOptions);
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ /*
+ * Run this test:
+ * java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.StringRowFilterBenchmark
+ */
+ public static void main(String[] args) throws RunnerException {
+ new Runner(new OptionsBuilder()
+ .include(StringRowFilterBenchmark.class.getSimpleName())
+ .forks(1)
+ .build()).run();
+ }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
new file mode 100644
index 0000000..f4438a0
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TimestampRowFilterBenchmark extends org.openjdk.jmh.Main {
+ private static final Path root = new Path(System.getProperty("user.dir"));
+
+ @State(Scope.Thread)
+ public static class InputState {
+
+ // try both DecimalColumnVector and Decimal64
+ @Param({"ORIGINAL"})
+ public TypeDescription.RowBatchVersion version;
+
+ @Param({"TIMESTAMP"})
+ public TypeDescription.Category benchType;
+
+ @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+ public String filterPerc;
+
+ @Param({"2"})
+ public int filterColsNum;
+
+ Configuration conf = new Configuration();
+ FileSystem fs;
+ TypeDescription schema;
+ VectorizedRowBatch batch;
+ Path path;
+ boolean[] include;
+ Reader reader;
+ Reader.Options readerOptions;
+ String filter_column = "vendor_id";
+
+ @Setup
+ public void setup() throws IOException {
+ fs = FileSystem.getLocal(conf).getRaw();
+ path = new Path(root, "data/generated/taxi/orc.none");
+ schema = Utilities.loadSchema("taxi.schema");
+ batch = schema.createRowBatch(version, 1024);
+ include = new boolean[schema.getMaximumId() + 1];
+ for(TypeDescription child: schema.getChildren()) {
+ if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+ System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ } else if (child.getCategory() == benchType) {
+ System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+ include[child.getId()] = true;
+ if (--filterColsNum == 0) break;
+ }
+ }
+ if (filterColsNum != 0) {
+ System.err.println("Dataset does not contain type: "+ benchType);
+ System.exit(-1);
+ }
+ generateRandomSet(Double.parseDouble(filterPerc));
+ reader = OrcFile.createReader(path,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ // just read the Timestamp columns
+ readerOptions = reader.options().include(include);
+ }
+
+ static boolean[] filterValues = null;
+ public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+ if (percentage > 1.0) {
+ throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+ }
+ filterValues = new boolean[1024];
+ int count = 0;
+ while (count < (1024 * percentage)) {
+ Random randomGenerator = new Random();
+ int randVal = randomGenerator.nextInt(1024);
+ if (filterValues[randVal] == false) {
+ filterValues[randVal] = true;
+ count++;
+ }
+ }
+ return filterValues;
+ }
+
+ public static void customIntRowFilter(VectorizedRowBatch batch) {
+ int newSize = 0;
+ for (int row = 0; row < batch.size; ++row) {
+ if (filterValues[row]) {
+ batch.selected[newSize++] = row;
+ }
+ }
+ batch.selectedInUse = true;
+ batch.size = newSize;
+ }
+ }
+
+ @Benchmark
+ public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows =
+ state.reader.rows(state.readerOptions
+ .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ @Benchmark
+ public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+ RecordReader rows = state.reader.rows(state.readerOptions);
+ while (rows.nextBatch(state.batch)) {
+ blackhole.consume(state.batch);
+ }
+ rows.close();
+ }
+
+ /*
+ * Run this test:
+ * java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.TimestampRowFilterBenchmark
+ */
+ public static void main(String[] args) throws RunnerException {
+ new Runner(new OptionsBuilder()
+ .include(TimestampRowFilterBenchmark.class.getSimpleName())
+ .forks(1)
+ .build()).run();
+ }
+}
\ No newline at end of file
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index 57de560..4ca7b01 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -45,7 +45,7 @@
<parquet.version>1.8.3</parquet.version>
<slf4j.version>1.7.25</slf4j.version>
<spark.version>2.4.0</spark.version>
- <storage-api.version>2.7.1</storage-api.version>
+ <storage-api.version>2.7.2</storage-api.version>
<zookeeper.version>3.4.6</zookeeper.version>
</properties>