You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2020/08/05 18:57:10 UTC

[orc] branch master updated: ORC-597: Add Row-level filtering benchmark (#478)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new b89aabd  ORC-597: Add Row-level filtering benchmark (#478)
b89aabd is described below

commit b89aabde88ca7d7fea6eb775d57c49c4089a1c74
Author: Panagiotis Garefalakis <pg...@cloudera.com>
AuthorDate: Wed Aug 5 21:57:00 2020 +0300

    ORC-597: Add Row-level filtering benchmark (#478)
    
    ### What changes were proposed in this pull request?
    
    Extending orc benchmark to support row-level filtering using existing datasets
    
    ### Why are the changes needed?
    
    To compare the result between versions.
    
    ### How was this patch tested?
    
    Run the benchmark.
---
 java/bench/README.md                               |   4 +
 java/bench/hive/pom.xml                            |   4 +
 .../bench/hive/RowFilterProjectionBenchmark.java   | 219 +++++++++++++++++++++
 .../hive/rowfilter/BooleanRowFilterBenchmark.java  | 172 ++++++++++++++++
 .../hive/rowfilter/DecimalRowFilterBenchmark.java  | 173 ++++++++++++++++
 .../hive/rowfilter/DoubleRowFilterBenchmark.java   | 172 ++++++++++++++++
 .../hive/rowfilter/StringRowFilterBenchmark.java   | 172 ++++++++++++++++
 .../rowfilter/TimestampRowFilterBenchmark.java     | 172 ++++++++++++++++
 java/bench/pom.xml                                 |   2 +-
 9 files changed, 1089 insertions(+), 1 deletion(-)

diff --git a/java/bench/README.md b/java/bench/README.md
index f49404d..c0ee992 100644
--- a/java/bench/README.md
+++ b/java/bench/README.md
@@ -41,6 +41,10 @@ To run decimal/decimal64 benchmark:
 
 ```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar decimal data```
 
+To run row-filter benchmark:
+
+```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar row-filter data```
+
 To run spark benchmark:
 
 ```% java -jar spark/target/orc-benchmarks-spark-*.jar spark data```
diff --git a/java/bench/hive/pom.xml b/java/bench/hive/pom.xml
index 7bdf612..e8ed1bf 100644
--- a/java/bench/hive/pom.xml
+++ b/java/bench/hive/pom.xml
@@ -114,6 +114,10 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>8</source>
+          <target>8</target>
+        </configuration>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
new file mode 100644
index 0000000..d125606
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/RowFilterProjectionBenchmark.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive;
+
+import com.google.auto.service.AutoService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.TrackingLocalFileSystem;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.OrcBenchmark;
+import org.apache.orc.bench.core.ReadCounters;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Thread)
+@AutoService(OrcBenchmark.class)
+public class RowFilterProjectionBenchmark implements OrcBenchmark {
+
+  private static final Path root = Utilities.getBenchmarkRoot();
+
+  @Param({"taxi"})
+  public String dataset;
+
+  @Param({"none"})
+  public String compression;
+
+  @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+  public String filter_percentage;
+
+  @Param({"all", "2", "4", "8", "16"})
+  public String projected_columns;
+
+  @Override
+  public String getName() {
+    return "row-filter";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Benchmark column projection with row-level filtering";
+  }
+
+  @Override
+  public void run(String[] args) throws Exception {
+    new Runner(Utilities.parseOptions(args, getClass())).run();
+  }
+
+  static Set<Integer> filterValues = null;
+  public static void generateRandomSet(double percentage) throws IllegalArgumentException {
+    if (percentage > 1.0) {
+      throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+    }
+    filterValues = new HashSet<>();
+    while (filterValues.size() < (1024 * percentage)) {
+      Random randomGenerator = new Random();
+      filterValues.add(randomGenerator.nextInt(1024));
+    }
+  }
+
+  public static void customIntRowFilter(VectorizedRowBatch batch) {
+    int newSize = 0;
+    for (int row = 0; row < batch.size; ++row) {
+      // Select ONLY specific keys
+      if (filterValues.contains(row)) {
+        batch.selected[newSize++] = row;
+      }
+    }
+    batch.selectedInUse = true;
+    batch.size = newSize;
+  }
+
+  @Benchmark
+  public void orcRowFilter(ReadCounters counters) throws Exception {
+    Configuration conf = new Configuration();
+    TrackingLocalFileSystem fs = new TrackingLocalFileSystem();
+    fs.initialize(new URI("file:///"), conf);
+    FileSystem.Statistics statistics = fs.getLocalStatistics();
+    statistics.reset();
+    OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs);
+    Path path = Utilities.getVariant(root, dataset, "orc", compression);
+    Reader reader = OrcFile.createReader(path, options);
+    TypeDescription schema = reader.getSchema();
+    // select an ID column to apply filter on
+    String filter_column;
+    if ("taxi".equals(dataset)) {
+      filter_column = "vendor_id";
+    } else if ("sales".equals(dataset)) {
+      filter_column = "sales_id";
+    } else if ("github".equals(dataset)) {
+      filter_column = "id";
+    } else {
+      throw new IllegalArgumentException("Unknown data set " + dataset);
+    }
+    boolean[] include = new boolean[schema.getMaximumId() + 1];
+    int columns_len = schema.getMaximumId();
+    if (projected_columns.compareTo("all") != 0) {
+      columns_len = Integer.parseInt(projected_columns);
+    }
+    // select the remaining columns to project
+    List<TypeDescription> children = schema.getChildren();
+    boolean foundFilterCol = false;
+    for (int c = children.get(0).getId(); c < schema.getMaximumId() + 1; ++c) {
+      if (c < schema.getFieldNames().size() && schema.getFieldNames().get(c-1).compareTo(filter_column) == 0) {
+        foundFilterCol = true;
+        include[c] = true;
+      }
+      else {
+        if (columns_len > 0) {
+          include[c] = true;
+          columns_len--;
+        }
+      }
+      if (foundFilterCol && (columns_len == 0)) break;
+    }
+    generateRandomSet(Double.parseDouble(filter_percentage));
+    RecordReader rows =
+        reader.rows(reader.options()
+          .include(include)
+          .setRowFilter(new String[]{filter_column}, RowFilterProjectionBenchmark::customIntRowFilter));
+
+    VectorizedRowBatch batch = schema.createRowBatch();
+    while (rows.nextBatch(batch)) {
+      counters.addRecords(batch.size);
+    }
+    rows.close();
+    counters.addBytes(statistics.getReadOps(), statistics.getBytesRead());
+    counters.addInvocation();
+  }
+
+  @Benchmark
+  public void orcNoFilter(ReadCounters counters) throws Exception {
+    Configuration conf = new Configuration();
+    TrackingLocalFileSystem fs = new TrackingLocalFileSystem();
+    fs.initialize(new URI("file:///"), conf);
+    FileSystem.Statistics statistics = fs.getLocalStatistics();
+    statistics.reset();
+    OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs);
+    Path path = Utilities.getVariant(root, dataset, "orc", compression);
+    Reader reader = OrcFile.createReader(path, options);
+    TypeDescription schema = reader.getSchema();
+    // select an ID column to apply filter on
+    String filter_column;
+    if ("taxi".equals(dataset)) {
+      filter_column = "vendor_id";
+    } else if ("sales".equals(dataset)) {
+      filter_column = "sales_id";
+    } else if ("github".equals(dataset)) {
+      filter_column = "id";
+    } else {
+      throw new IllegalArgumentException("Unknown data set " + dataset);
+    }
+    boolean[] include = new boolean[schema.getMaximumId() + 1];
+    int columns_len = schema.getMaximumId();
+    if (projected_columns.compareTo("all") != 0) {
+      columns_len = Integer.parseInt(projected_columns);
+    }
+    // select the remaining columns to project
+    List<TypeDescription> children = schema.getChildren();
+    boolean foundFilterCol = false;
+    for (int c = children.get(0).getId(); c < schema.getMaximumId() + 1; ++c) {
+      if (c < schema.getFieldNames().size() && schema.getFieldNames().get(c-1).compareTo(filter_column) == 0) {
+        foundFilterCol = true;
+        include[c] = true;
+      }
+      else {
+        if (columns_len > 0) {
+          include[c] = true;
+          columns_len--;
+        }
+      }
+      if (foundFilterCol && (columns_len == 0)) break;
+    }
+    RecordReader rows = reader.rows(reader.options().include(include));
+
+    VectorizedRowBatch batch = schema.createRowBatch();
+    while (rows.nextBatch(batch)) {
+      counters.addRecords(batch.size);
+    }
+    rows.close();
+    counters.addBytes(statistics.getReadOps(), statistics.getBytesRead());
+    counters.addInvocation();
+  }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
new file mode 100644
index 0000000..b7a0e69
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/BooleanRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class BooleanRowFilterBenchmark extends org.openjdk.jmh.Main {
+
+  private static final Path root = new Path(System.getProperty("user.dir"));
+
+  @State(Scope.Thread)
+  public static class InputState {
+
+    @Param({"ORIGINAL"})
+    public TypeDescription.RowBatchVersion version;
+
+    @Param({"BOOLEAN"})
+    public TypeDescription.Category benchType;
+
+    @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+    public String filterPerc;
+
+    @Param({"2"})
+    public int filterColsNum;
+
+    Configuration conf = new Configuration();
+    FileSystem fs;
+    TypeDescription schema;
+    VectorizedRowBatch batch;
+    Path path;
+    boolean[] include;
+    Reader reader;
+    Reader.Options readerOptions;
+    String filter_column = "sales_id";
+
+    @Setup
+    public void setup() throws IOException {
+      fs = FileSystem.getLocal(conf).getRaw();
+      path = new Path(root, "data/generated/sales/orc.none");
+      schema = Utilities.loadSchema("sales.schema");
+      batch = schema.createRowBatch(version, 1024);
+      include = new boolean[schema.getMaximumId() + 1];
+      for(TypeDescription child: schema.getChildren()) {
+        if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+          System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+        } else if (child.getCategory() == benchType) {
+          System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+          if (--filterColsNum == 0) break;
+        }
+      }
+      if (filterColsNum != 0) {
+        System.err.println("Dataset does not contain type: "+ benchType);
+        System.exit(-1);
+      }
+      generateRandomSet(Double.parseDouble(filterPerc));
+      reader = OrcFile.createReader(path,
+          OrcFile.readerOptions(conf).filesystem(fs));
+      // just read the Boolean columns
+      readerOptions = reader.options().include(include);
+    }
+
+    static boolean[] filterValues = null;
+    public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+      if (percentage > 1.0) {
+        throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+      }
+      filterValues = new boolean[1024];
+      int count = 0;
+      while (count < (1024 * percentage)) {
+        Random randomGenerator = new Random();
+        int randVal = randomGenerator.nextInt(1024);
+        if (filterValues[randVal] == false) {
+          filterValues[randVal] = true;
+          count++;
+        }
+      }
+      return filterValues;
+    }
+
+    public static void customIntRowFilter(VectorizedRowBatch batch) {
+      int newSize = 0;
+      for (int row = 0; row < batch.size; ++row) {
+        if (filterValues[row]) {
+          batch.selected[newSize++] = row;
+        }
+      }
+      batch.selectedInUse = true;
+      batch.size = newSize;
+    }
+  }
+
+  @Benchmark
+  public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows =
+        state.reader.rows(state.readerOptions
+            .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  @Benchmark
+  public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows = state.reader.rows(state.readerOptions);
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  /*
+   * Run this test:
+   *  java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.BooleanRowFilterBenchmark
+   */
+  public static void main(String[] args) throws RunnerException {
+    new Runner(new OptionsBuilder()
+        .include(BooleanRowFilterBenchmark.class.getSimpleName())
+        .forks(1)
+        .build()).run();
+  }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
new file mode 100644
index 0000000..a43c4c5
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DecimalRowFilterBenchmark.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class DecimalRowFilterBenchmark extends org.openjdk.jmh.Main {
+
+  private static final Path root = new Path(System.getProperty("user.dir"));
+
+  @State(Scope.Thread)
+  public static class InputState {
+
+    // try both DecimalColumnVector and Decimal64
+    @Param({"ORIGINAL", "USE_DECIMAL64"})
+    public TypeDescription.RowBatchVersion version;
+
+    @Param({"DECIMAL"})
+    public TypeDescription.Category benchType;
+
+    @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+    public String filterPerc;
+
+    @Param({"2"})
+    public int filterColsNum;
+
+    Configuration conf = new Configuration();
+    FileSystem fs;
+    TypeDescription schema;
+    VectorizedRowBatch batch;
+    Path path;
+    boolean[] include;
+    Reader reader;
+    Reader.Options readerOptions;
+    String filter_column = "vendor_id";
+
+    @Setup
+    public void setup() throws IOException {
+      fs = FileSystem.getLocal(conf).getRaw();
+      path = new Path(root, "data/generated/taxi/orc.none");
+      schema = Utilities.loadSchema("taxi.schema");
+      batch = schema.createRowBatch(version, 1024);
+      include = new boolean[schema.getMaximumId() + 1];
+      for(TypeDescription child: schema.getChildren()) {
+        if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+          System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+        } else if (child.getCategory() == benchType) {
+          System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+          if (--filterColsNum == 0) break;
+        }
+      }
+      if (filterColsNum != 0) {
+        System.err.println("Dataset does not contain type: "+ benchType);
+        System.exit(-1);
+      }
+      generateRandomSet(Double.parseDouble(filterPerc));
+      reader = OrcFile.createReader(path,
+          OrcFile.readerOptions(conf).filesystem(fs));
+      // just read the Decimal columns
+      readerOptions = reader.options().include(include);
+    }
+
+    static boolean[] filterValues = null;
+    public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+      if (percentage > 1.0) {
+        throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+      }
+      filterValues = new boolean[1024];
+      int count = 0;
+      while (count < (1024 * percentage)) {
+        Random randomGenerator = new Random();
+        int randVal = randomGenerator.nextInt(1024);
+        if (filterValues[randVal] == false) {
+          filterValues[randVal] = true;
+          count++;
+        }
+      }
+      return filterValues;
+    }
+
+    public static void customIntRowFilter(VectorizedRowBatch batch) {
+      int newSize = 0;
+      for (int row = 0; row < batch.size; ++row) {
+        if (filterValues[row]) {
+          batch.selected[newSize++] = row;
+        }
+      }
+      batch.selectedInUse = true;
+      batch.size = newSize;
+    }
+  }
+
+  @Benchmark
+  public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows =
+        state.reader.rows(state.readerOptions
+            .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  @Benchmark
+  public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows = state.reader.rows(state.readerOptions);
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  /*
+   * Run this test:
+   *  java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.DecimalRowFilterBenchmark
+   */
+  public static void main(String[] args) throws RunnerException {
+    new Runner(new OptionsBuilder()
+        .include(DecimalRowFilterBenchmark.class.getSimpleName())
+        .forks(1)
+        .build()).run();
+  }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
new file mode 100644
index 0000000..d5afb48
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/DoubleRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class DoubleRowFilterBenchmark extends org.openjdk.jmh.Main {
+  private static final Path root = new Path(System.getProperty("user.dir"));
+
+  @State(Scope.Thread)
+  public static class InputState {
+
+    // try both DecimalColumnVector and Decimal64
+    @Param({"ORIGINAL"})
+    public TypeDescription.RowBatchVersion version;
+
+    @Param({"DOUBLE"})
+    public TypeDescription.Category benchType;
+
+    @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+    public String filterPerc;
+
+    @Param({"2"})
+    public int filterColsNum;
+
+    Configuration conf = new Configuration();
+    FileSystem fs;
+    TypeDescription schema;
+    VectorizedRowBatch batch;
+    Path path;
+    boolean[] include;
+    Reader reader;
+    Reader.Options readerOptions;
+    String filter_column = "vendor_id";
+
+    @Setup
+    public void setup() throws IOException {
+      fs = FileSystem.getLocal(conf).getRaw();
+      path = new Path(root, "data/generated/taxi/orc.none");
+      schema = Utilities.loadSchema("taxi.schema");
+      batch = schema.createRowBatch(version, 1024);
+      include = new boolean[schema.getMaximumId() + 1];
+      for(TypeDescription child: schema.getChildren()) {
+        if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+          System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+        } else if (child.getCategory() == benchType) {
+          System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+          if (--filterColsNum == 0) break;
+        }
+      }
+      if (filterColsNum != 0) {
+        System.err.println("Dataset does not contain type: "+ benchType);
+        System.exit(-1);
+      }
+      generateRandomSet(Double.parseDouble(filterPerc));
+      reader = OrcFile.createReader(path,
+          OrcFile.readerOptions(conf).filesystem(fs));
+      // just read the Double columns
+      readerOptions = reader.options().include(include);
+    }
+
+    static boolean[] filterValues = null;
+    public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+      if (percentage > 1.0) {
+        throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+      }
+      filterValues = new boolean[1024];
+      int count = 0;
+      while (count < (1024 * percentage)) {
+        Random randomGenerator = new Random();
+        int randVal = randomGenerator.nextInt(1024);
+        if (filterValues[randVal] == false) {
+          filterValues[randVal] = true;
+          count++;
+        }
+      }
+      return filterValues;
+    }
+
+    public static void customIntRowFilter(VectorizedRowBatch batch) {
+      int newSize = 0;
+      for (int row = 0; row < batch.size; ++row) {
+        if (filterValues[row]) {
+          batch.selected[newSize++] = row;
+        }
+      }
+      batch.selectedInUse = true;
+      batch.size = newSize;
+    }
+  }
+
+  @Benchmark
+  public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows =
+        state.reader.rows(state.readerOptions
+            .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  @Benchmark
+  public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows = state.reader.rows(state.readerOptions);
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  /*
+   * Run this test:
+   *  java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.DoubleRowFilterBenchmark
+   */
+  public static void main(String[] args) throws RunnerException {
+    new Runner(new OptionsBuilder()
+        .include(DoubleRowFilterBenchmark.class.getSimpleName())
+        .forks(1)
+        .build()).run();
+  }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
new file mode 100644
index 0000000..33447cd
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/StringRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class StringRowFilterBenchmark extends org.openjdk.jmh.Main {
+
+  private static final Path root = new Path(System.getProperty("user.dir"));
+
+  @State(Scope.Thread)
+  public static class InputState {
+
+    @Param({"ORIGINAL"})
+    public TypeDescription.RowBatchVersion version;
+
+    @Param({"STRING"})
+    public TypeDescription.Category benchType;
+
+    @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+    public String filterPerc;
+
+    @Param({"2"})
+    public int filterColsNum;
+
+    Configuration conf = new Configuration();
+    FileSystem fs;
+    TypeDescription schema;
+    VectorizedRowBatch batch;
+    Path path;
+    boolean[] include;
+    Reader reader;
+    Reader.Options readerOptions;
+    String filter_column = "sales_id";
+
+    @Setup
+    public void setup() throws IOException {
+      fs = FileSystem.getLocal(conf).getRaw();
+      path = new Path(root, "data/generated/sales/orc.none");
+      schema = Utilities.loadSchema("sales.schema");
+      batch = schema.createRowBatch(version, 1024);
+      include = new boolean[schema.getMaximumId() + 1];
+      for(TypeDescription child: schema.getChildren()) {
+        if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+          System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+        } else if (child.getCategory() == benchType) {
+          System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+          if (--filterColsNum == 0) break;
+        }
+      }
+      if (filterColsNum != 0) {
+        System.err.println("Dataset does not contain type: "+ benchType);
+        System.exit(-1);
+      }
+      generateRandomSet(Double.parseDouble(filterPerc));
+      reader = OrcFile.createReader(path,
+          OrcFile.readerOptions(conf).filesystem(fs));
+      // just read the String columns
+      readerOptions = reader.options().include(include);
+    }
+
+    static boolean[] filterValues = null;
+    public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+      if (percentage > 1.0) {
+        throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+      }
+      filterValues = new boolean[1024];
+      int count = 0;
+      while (count < (1024 * percentage)) {
+        Random randomGenerator = new Random();
+        int randVal = randomGenerator.nextInt(1024);
+        if (filterValues[randVal] == false) {
+          filterValues[randVal] = true;
+          count++;
+        }
+      }
+      return filterValues;
+    }
+
+    public static void customIntRowFilter(VectorizedRowBatch batch) {
+      int newSize = 0;
+      for (int row = 0; row < batch.size; ++row) {
+        if (filterValues[row]) {
+          batch.selected[newSize++] = row;
+        }
+      }
+      batch.selectedInUse = true;
+      batch.size = newSize;
+    }
+  }
+
+  @Benchmark
+  public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows =
+        state.reader.rows(state.readerOptions
+            .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  @Benchmark
+  public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows = state.reader.rows(state.readerOptions);
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  /*
+   * Run this test:
+   *  java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.StringRowFilterBenchmark
+   */
+  public static void main(String[] args) throws RunnerException {
+    new Runner(new OptionsBuilder()
+        .include(StringRowFilterBenchmark.class.getSimpleName())
+        .forks(1)
+        .build()).run();
+  }
+}
\ No newline at end of file
diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
new file mode 100644
index 0000000..f4438a0
--- /dev/null
+++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/rowfilter/TimestampRowFilterBenchmark.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.hive.rowfilter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.core.Utilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TimestampRowFilterBenchmark extends org.openjdk.jmh.Main {
+  private static final Path root = new Path(System.getProperty("user.dir"));
+
+  @State(Scope.Thread)
+  public static class InputState {
+
+    // try both DecimalColumnVector and Decimal64
+    @Param({"ORIGINAL"})
+    public TypeDescription.RowBatchVersion version;
+
+    @Param({"TIMESTAMP"})
+    public TypeDescription.Category benchType;
+
+    @Param({"0.01", "0.1", "0.2", "0.4", "0.6", "0.8", "1."})
+    public String filterPerc;
+
+    @Param({"2"})
+    public int filterColsNum;
+
+    Configuration conf = new Configuration();
+    FileSystem fs;
+    TypeDescription schema;
+    VectorizedRowBatch batch;
+    Path path;
+    boolean[] include;
+    Reader reader;
+    Reader.Options readerOptions;
+    String filter_column = "vendor_id";
+
+    @Setup
+    public void setup() throws IOException {
+      fs = FileSystem.getLocal(conf).getRaw();
+      path = new Path(root, "data/generated/taxi/orc.none");
+      schema = Utilities.loadSchema("taxi.schema");
+      batch = schema.createRowBatch(version, 1024);
+      include = new boolean[schema.getMaximumId() + 1];
+      for(TypeDescription child: schema.getChildren()) {
+        if (schema.getFieldNames().get(child.getId()-1).compareTo(filter_column) == 0) {
+          System.out.println("Apply Filter on column: " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+        } else if (child.getCategory() == benchType) {
+          System.out.println("Skip column(s): " + schema.getFieldNames().get(child.getId()-1));
+          include[child.getId()] = true;
+          if (--filterColsNum == 0) break;
+        }
+      }
+      if (filterColsNum != 0) {
+        System.err.println("Dataset does not contain type: "+ benchType);
+        System.exit(-1);
+      }
+      generateRandomSet(Double.parseDouble(filterPerc));
+      reader = OrcFile.createReader(path,
+          OrcFile.readerOptions(conf).filesystem(fs));
+      // just read the Timestamp columns
+      readerOptions = reader.options().include(include);
+    }
+
+    static boolean[] filterValues = null;
+    public static boolean[] generateRandomSet(double percentage) throws IllegalArgumentException {
+      if (percentage > 1.0) {
+        throw new IllegalArgumentException("Filter percentage must be < 1.0 but was "+ percentage);
+      }
+     filterValues = new boolean[1024];
+      int count = 0;
+      while (count < (1024 * percentage)) {
+        Random randomGenerator = new Random();
+        int randVal = randomGenerator.nextInt(1024);
+        if (filterValues[randVal] == false) {
+          filterValues[randVal] = true;
+          count++;
+        }
+      }
+      return filterValues;
+    }
+
+    public static void customIntRowFilter(VectorizedRowBatch batch) {
+      int newSize = 0;
+      for (int row = 0; row < batch.size; ++row) {
+        if (filterValues[row]) {
+          batch.selected[newSize++] = row;
+        }
+      }
+      batch.selectedInUse = true;
+      batch.size = newSize;
+    }
+  }
+
+  @Benchmark
+  public void readOrcRowFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows =
+        state.reader.rows(state.readerOptions
+            .setRowFilter(new String[]{state.filter_column}, InputState::customIntRowFilter));
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  @Benchmark
+  public void readOrcNoFilter(Blackhole blackhole, InputState state) throws Exception {
+    RecordReader rows = state.reader.rows(state.readerOptions);
+    while (rows.nextBatch(state.batch)) {
+      blackhole.consume(state.batch);
+    }
+    rows.close();
+  }
+
+  /*
+   * Run this test:
+   *  java -cp hive/target/orc-benchmarks-hive-*-uber.jar org.apache.orc.bench.hive.rowfilter.TimestampRowFilterBenchmark
+   */
+  public static void main(String[] args) throws RunnerException {
+    new Runner(new OptionsBuilder()
+        .include(TimestampRowFilterBenchmark.class.getSimpleName())
+        .forks(1)
+        .build()).run();
+  }
+}
\ No newline at end of file
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index 57de560..4ca7b01 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -45,7 +45,7 @@
     <parquet.version>1.8.3</parquet.version>
     <slf4j.version>1.7.25</slf4j.version>
     <spark.version>2.4.0</spark.version>
-    <storage-api.version>2.7.1</storage-api.version>
+    <storage-api.version>2.7.2</storage-api.version>
     <zookeeper.version>3.4.6</zookeeper.version>
   </properties>