You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/09 13:49:29 UTC

spark git commit: [SPARK-16060][SQL] Support Vectorized ORC Reader

Repository: spark
Updated Branches:
  refs/heads/master 6a4206ff0 -> f44ba910f


[SPARK-16060][SQL] Support Vectorized ORC Reader

## What changes were proposed in this pull request?

This PR adds an ORC columnar-batch reader to native `OrcFileFormat`. Since both Spark `ColumnarBatch` and ORC `RowBatch` are used together, it is faster than the current Spark implementation. This replaces the prior PR, #17924.

Also, this PR adds `OrcReadBenchmark` to show the performance improvement.

## How was this patch tested?

Pass the existing test cases.

Author: Dongjoon Hyun <do...@apache.org>

Closes #19943 from dongjoon-hyun/SPARK-16060.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f44ba910
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f44ba910
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f44ba910

Branch: refs/heads/master
Commit: f44ba910f58083458e1133502e193a9d6f2bf766
Parents: 6a4206f
Author: Dongjoon Hyun <do...@apache.org>
Authored: Tue Jan 9 21:48:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jan 9 21:48:14 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../datasources/orc/OrcColumnarBatchReader.java | 523 +++++++++++++++++++
 .../datasources/orc/OrcFileFormat.scala         |  75 ++-
 .../execution/datasources/orc/OrcUtils.scala    |   7 +-
 .../spark/sql/hive/orc/OrcReadBenchmark.scala   | 435 +++++++++++++++
 5 files changed, 1022 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5c61f10..74949db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -386,6 +386,11 @@ object SQLConf {
     .checkValues(Set("hive", "native"))
     .createWithDefault("native")
 
+  val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
+    .doc("Enables vectorized orc decoding.")
+    .booleanConf
+    .createWithDefault(true)
+
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
     .doc("When true, enable filter pushdown for ORC files.")
     .booleanConf
@@ -1183,6 +1188,8 @@ class SQLConf extends Serializable with Logging {
 
   def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
 
+  def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
+
   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
 
   def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
new file mode 100644
index 0000000..5c28d0e
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcInputFormat;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.*;
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch.
+ * After creating, `initialize` and `initBatch` should be called sequentially.
+ */
+public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
+
+  /**
+   * The default size of batch. We use this value for both ORC and Spark consistently
+   * because they have different default values like the following.
+   *
+   * - ORC's VectorizedRowBatch.DEFAULT_SIZE = 1024
+   * - Spark's ColumnarBatch.DEFAULT_BATCH_SIZE = 4 * 1024
+   */
+  public static final int DEFAULT_SIZE = 4 * 1024;
+
+  // ORC File Reader
+  private Reader reader;
+
+  // Vectorized ORC Row Batch
+  private VectorizedRowBatch batch;
+
+  /**
+   * The column IDs of the physical ORC file schema which are required by this reader.
+   * -1 means this required column doesn't exist in the ORC file.
+   */
+  private int[] requestedColIds;
+
+  // Record reader from ORC row batch.
+  private org.apache.orc.RecordReader recordReader;
+
+  private StructField[] requiredFields;
+
+  // The result columnar batch for vectorized execution by whole-stage codegen.
+  private ColumnarBatch columnarBatch;
+
+  // Writable column vectors of the result columnar batch.
+  private WritableColumnVector[] columnVectors;
+
+  /**
+   * The memory mode of the columnarBatch
+   */
+  private final MemoryMode MEMORY_MODE;
+
+  public OrcColumnarBatchReader(boolean useOffHeap) {
+    MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+  }
+
+
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public ColumnarBatch getCurrentValue() throws IOException, InterruptedException {
+    return columnarBatch;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return recordReader.getProgress();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return nextBatch();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    if (recordReader != null) {
+      recordReader.close();
+      recordReader = null;
+    }
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `initBatch` is needed to be called after this.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    FileSplit fileSplit = (FileSplit)inputSplit;
+    Configuration conf = taskAttemptContext.getConfiguration();
+    reader = OrcFile.createReader(
+      fileSplit.getPath(),
+      OrcFile.readerOptions(conf)
+        .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+        .filesystem(fileSplit.getPath().getFileSystem(conf)));
+
+    Reader.Options options =
+      OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength());
+    recordReader = reader.rows(options);
+  }
+
+  /**
+   * Initialize columnar batch by setting required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full schema.
+   */
+  public void initBatch(
+      TypeDescription orcSchema,
+      int[] requestedColIds,
+      StructField[] requiredFields,
+      StructType partitionSchema,
+      InternalRow partitionValues) {
+    batch = orcSchema.createRowBatch(DEFAULT_SIZE);
+    assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
+
+    this.requiredFields = requiredFields;
+    this.requestedColIds = requestedColIds;
+    assert(requiredFields.length == requestedColIds.length);
+
+    StructType resultSchema = new StructType(requiredFields);
+    for (StructField f : partitionSchema.fields()) {
+      resultSchema = resultSchema.add(f);
+    }
+
+    int capacity = DEFAULT_SIZE;
+    if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
+      columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
+    } else {
+      columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
+    }
+    columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
+
+    if (partitionValues.numFields() > 0) {
+      int partitionIdx = requiredFields.length;
+      for (int i = 0; i < partitionValues.numFields(); i++) {
+        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
+        columnVectors[i + partitionIdx].setIsConstant();
+      }
+    }
+
+    // Initialize the missing columns once.
+    for (int i = 0; i < requiredFields.length; i++) {
+      if (requestedColIds[i] == -1) {
+        columnVectors[i].putNulls(0, columnarBatch.capacity());
+        columnVectors[i].setIsConstant();
+      }
+    }
+  }
+
+  /**
+   * Return true if there exists more data in the next batch. If exists, prepare the next batch
+   * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
+   */
+  private boolean nextBatch() throws IOException {
+    for (WritableColumnVector vector : columnVectors) {
+      vector.reset();
+    }
+    columnarBatch.setNumRows(0);
+
+    recordReader.nextBatch(batch);
+    int batchSize = batch.size;
+    if (batchSize == 0) {
+      return false;
+    }
+    columnarBatch.setNumRows(batchSize);
+    for (int i = 0; i < requiredFields.length; i++) {
+      StructField field = requiredFields[i];
+      WritableColumnVector toColumn = columnVectors[i];
+
+      if (requestedColIds[i] >= 0) {
+        ColumnVector fromColumn = batch.cols[requestedColIds[i]];
+
+        if (fromColumn.isRepeating) {
+          putRepeatingValues(batchSize, field, fromColumn, toColumn);
+        } else if (fromColumn.noNulls) {
+          putNonNullValues(batchSize, field, fromColumn, toColumn);
+        } else {
+          putValues(batchSize, field, fromColumn, toColumn);
+        }
+      }
+    }
+    return true;
+  }
+
+  private void putRepeatingValues(
+      int batchSize,
+      StructField field,
+      ColumnVector fromColumn,
+      WritableColumnVector toColumn) {
+    if (fromColumn.isNull[0]) {
+      toColumn.putNulls(0, batchSize);
+    } else {
+      DataType type = field.dataType();
+      if (type instanceof BooleanType) {
+        toColumn.putBooleans(0, batchSize, ((LongColumnVector)fromColumn).vector[0] == 1);
+      } else if (type instanceof ByteType) {
+        toColumn.putBytes(0, batchSize, (byte)((LongColumnVector)fromColumn).vector[0]);
+      } else if (type instanceof ShortType) {
+        toColumn.putShorts(0, batchSize, (short)((LongColumnVector)fromColumn).vector[0]);
+      } else if (type instanceof IntegerType || type instanceof DateType) {
+        toColumn.putInts(0, batchSize, (int)((LongColumnVector)fromColumn).vector[0]);
+      } else if (type instanceof LongType) {
+        toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector[0]);
+      } else if (type instanceof TimestampType) {
+        toColumn.putLongs(0, batchSize,
+          fromTimestampColumnVector((TimestampColumnVector)fromColumn, 0));
+      } else if (type instanceof FloatType) {
+        toColumn.putFloats(0, batchSize, (float)((DoubleColumnVector)fromColumn).vector[0]);
+      } else if (type instanceof DoubleType) {
+        toColumn.putDoubles(0, batchSize, ((DoubleColumnVector)fromColumn).vector[0]);
+      } else if (type instanceof StringType || type instanceof BinaryType) {
+        BytesColumnVector data = (BytesColumnVector)fromColumn;
+        WritableColumnVector arrayData = toColumn.getChildColumn(0);
+        int size = data.vector[0].length;
+        arrayData.reserve(size);
+        arrayData.putBytes(0, size, data.vector[0], 0);
+        for (int index = 0; index < batchSize; index++) {
+          toColumn.putArray(index, 0, size);
+        }
+      } else if (type instanceof DecimalType) {
+        DecimalType decimalType = (DecimalType)type;
+        putDecimalWritables(
+          toColumn,
+          batchSize,
+          decimalType.precision(),
+          decimalType.scale(),
+          ((DecimalColumnVector)fromColumn).vector[0]);
+      } else {
+        throw new UnsupportedOperationException("Unsupported Data Type: " + type);
+      }
+    }
+  }
+
+  private void putNonNullValues(
+      int batchSize,
+      StructField field,
+      ColumnVector fromColumn,
+      WritableColumnVector toColumn) {
+    DataType type = field.dataType();
+    if (type instanceof BooleanType) {
+      long[] data = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        toColumn.putBoolean(index, data[index] == 1);
+      }
+    } else if (type instanceof ByteType) {
+      long[] data = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        toColumn.putByte(index, (byte)data[index]);
+      }
+    } else if (type instanceof ShortType) {
+      long[] data = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        toColumn.putShort(index, (short)data[index]);
+      }
+    } else if (type instanceof IntegerType || type instanceof DateType) {
+      long[] data = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        toColumn.putInt(index, (int)data[index]);
+      }
+    } else if (type instanceof LongType) {
+      toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector, 0);
+    } else if (type instanceof TimestampType) {
+      TimestampColumnVector data = ((TimestampColumnVector)fromColumn);
+      for (int index = 0; index < batchSize; index++) {
+        toColumn.putLong(index, fromTimestampColumnVector(data, index));
+      }
+    } else if (type instanceof FloatType) {
+      double[] data = ((DoubleColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        toColumn.putFloat(index, (float)data[index]);
+      }
+    } else if (type instanceof DoubleType) {
+      toColumn.putDoubles(0, batchSize, ((DoubleColumnVector)fromColumn).vector, 0);
+    } else if (type instanceof StringType || type instanceof BinaryType) {
+      BytesColumnVector data = ((BytesColumnVector)fromColumn);
+      WritableColumnVector arrayData = toColumn.getChildColumn(0);
+      int totalNumBytes = IntStream.of(data.length).sum();
+      arrayData.reserve(totalNumBytes);
+      for (int index = 0, pos = 0; index < batchSize; pos += data.length[index], index++) {
+        arrayData.putBytes(pos, data.length[index], data.vector[index], data.start[index]);
+        toColumn.putArray(index, pos, data.length[index]);
+      }
+    } else if (type instanceof DecimalType) {
+      DecimalType decimalType = (DecimalType)type;
+      DecimalColumnVector data = ((DecimalColumnVector)fromColumn);
+      if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
+        WritableColumnVector arrayData = toColumn.getChildColumn(0);
+        arrayData.reserve(batchSize * 16);
+      }
+      for (int index = 0; index < batchSize; index++) {
+        putDecimalWritable(
+          toColumn,
+          index,
+          decimalType.precision(),
+          decimalType.scale(),
+          data.vector[index]);
+      }
+    } else {
+      throw new UnsupportedOperationException("Unsupported Data Type: " + type);
+    }
+  }
+
+  private void putValues(
+      int batchSize,
+      StructField field,
+      ColumnVector fromColumn,
+      WritableColumnVector toColumn) {
+    DataType type = field.dataType();
+    if (type instanceof BooleanType) {
+      long[] vector = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putBoolean(index, vector[index] == 1);
+        }
+      }
+    } else if (type instanceof ByteType) {
+      long[] vector = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putByte(index, (byte)vector[index]);
+        }
+      }
+    } else if (type instanceof ShortType) {
+      long[] vector = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putShort(index, (short)vector[index]);
+        }
+      }
+    } else if (type instanceof IntegerType || type instanceof DateType) {
+      long[] vector = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putInt(index, (int)vector[index]);
+        }
+      }
+    } else if (type instanceof LongType) {
+      long[] vector = ((LongColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putLong(index, vector[index]);
+        }
+      }
+    } else if (type instanceof TimestampType) {
+      TimestampColumnVector vector = ((TimestampColumnVector)fromColumn);
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putLong(index, fromTimestampColumnVector(vector, index));
+        }
+      }
+    } else if (type instanceof FloatType) {
+      double[] vector = ((DoubleColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putFloat(index, (float)vector[index]);
+        }
+      }
+    } else if (type instanceof DoubleType) {
+      double[] vector = ((DoubleColumnVector)fromColumn).vector;
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          toColumn.putDouble(index, vector[index]);
+        }
+      }
+    } else if (type instanceof StringType || type instanceof BinaryType) {
+      BytesColumnVector vector = (BytesColumnVector)fromColumn;
+      WritableColumnVector arrayData = toColumn.getChildColumn(0);
+      int totalNumBytes = IntStream.of(vector.length).sum();
+      arrayData.reserve(totalNumBytes);
+      for (int index = 0, pos = 0; index < batchSize; pos += vector.length[index], index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          arrayData.putBytes(pos, vector.length[index], vector.vector[index], vector.start[index]);
+          toColumn.putArray(index, pos, vector.length[index]);
+        }
+      }
+    } else if (type instanceof DecimalType) {
+      DecimalType decimalType = (DecimalType)type;
+      HiveDecimalWritable[] vector = ((DecimalColumnVector)fromColumn).vector;
+      if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
+        WritableColumnVector arrayData = toColumn.getChildColumn(0);
+        arrayData.reserve(batchSize * 16);
+      }
+      for (int index = 0; index < batchSize; index++) {
+        if (fromColumn.isNull[index]) {
+          toColumn.putNull(index);
+        } else {
+          putDecimalWritable(
+            toColumn,
+            index,
+            decimalType.precision(),
+            decimalType.scale(),
+            vector[index]);
+        }
+      }
+    } else {
+      throw new UnsupportedOperationException("Unsupported Data Type: " + type);
+    }
+  }
+
+  /**
+   * Returns the number of micros since epoch from an element of TimestampColumnVector.
+   */
+  private static long fromTimestampColumnVector(TimestampColumnVector vector, int index) {
+    return vector.time[index] * 1000L + vector.nanos[index] / 1000L;
+  }
+
+  /**
+   * Put a `HiveDecimalWritable` to a `WritableColumnVector`.
+   */
+  private static void putDecimalWritable(
+      WritableColumnVector toColumn,
+      int index,
+      int precision,
+      int scale,
+      HiveDecimalWritable decimalWritable) {
+    HiveDecimal decimal = decimalWritable.getHiveDecimal();
+    Decimal value =
+      Decimal.apply(decimal.bigDecimalValue(), decimal.precision(), decimal.scale());
+    value.changePrecision(precision, scale);
+
+    if (precision <= Decimal.MAX_INT_DIGITS()) {
+      toColumn.putInt(index, (int) value.toUnscaledLong());
+    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      toColumn.putLong(index, value.toUnscaledLong());
+    } else {
+      byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
+      WritableColumnVector arrayData = toColumn.getChildColumn(0);
+      arrayData.putBytes(index * 16, bytes.length, bytes, 0);
+      toColumn.putArray(index, index * 16, bytes.length);
+    }
+  }
+
+  /**
+   * Put `HiveDecimalWritable`s to a `WritableColumnVector`.
+   */
+  private static void putDecimalWritables(
+      WritableColumnVector toColumn,
+      int size,
+      int precision,
+      int scale,
+      HiveDecimalWritable decimalWritable) {
+    HiveDecimal decimal = decimalWritable.getHiveDecimal();
+    Decimal value =
+      Decimal.apply(decimal.bigDecimalValue(), decimal.precision(), decimal.scale());
+    value.changePrecision(precision, scale);
+
+    if (precision <= Decimal.MAX_INT_DIGITS()) {
+      toColumn.putInts(0, size, (int) value.toUnscaledLong());
+    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      toColumn.putLongs(0, size, value.toUnscaledLong());
+    } else {
+      byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
+      WritableColumnVector arrayData = toColumn.getChildColumn(0);
+      arrayData.reserve(bytes.length);
+      arrayData.putBytes(0, bytes.length, bytes, 0);
+      for (int index = 0; index < size; index++) {
+        toColumn.putArray(index, 0, bytes.length);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index f7471cd..b8bacfa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -118,6 +118,13 @@ class OrcFileFormat
     }
   }
 
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+    val conf = sparkSession.sessionState.conf
+    conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
+      schema.length <= conf.wholeStageMaxNumFields &&
+      schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
   override def isSplitable(
       sparkSession: SparkSession,
       options: Map[String, String],
@@ -139,6 +146,11 @@ class OrcFileFormat
       }
     }
 
+    val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
+
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -146,8 +158,14 @@ class OrcFileFormat
     (file: PartitionedFile) => {
       val conf = broadcastedConf.value.value
 
+      val filePath = new Path(new URI(file.filePath))
+
+      val fs = filePath.getFileSystem(conf)
+      val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
+      val reader = OrcFile.createReader(filePath, readerOptions)
+
       val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
-        isCaseSensitive, dataSchema, requiredSchema, new Path(new URI(file.filePath)), conf)
+        isCaseSensitive, dataSchema, requiredSchema, reader, conf)
 
       if (requestedColIdsOrEmptyFile.isEmpty) {
         Iterator.empty
@@ -155,29 +173,46 @@ class OrcFileFormat
         val requestedColIds = requestedColIdsOrEmptyFile.get
         assert(requestedColIds.length == requiredSchema.length,
           "[BUG] requested column IDs do not match required schema")
-        conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
+        val taskConf = new Configuration(conf)
+        taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
           requestedColIds.filter(_ != -1).sorted.mkString(","))
 
-        val fileSplit =
-          new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+        val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty)
         val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        val taskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
-        val orcRecordReader = new OrcInputFormat[OrcStruct]
-          .createRecordReader(fileSplit, taskAttemptContext)
-        val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
-        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
-
-        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
-        val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-        val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
-
-        if (partitionSchema.length == 0) {
-          iter.map(value => unsafeProjection(deserializer.deserialize(value)))
+        val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+        val taskContext = Option(TaskContext.get())
+        if (enableVectorizedReader) {
+          val batchReader =
+            new OrcColumnarBatchReader(enableOffHeapColumnVector && taskContext.isDefined)
+          batchReader.initialize(fileSplit, taskAttemptContext)
+          batchReader.initBatch(
+            reader.getSchema,
+            requestedColIds,
+            requiredSchema.fields,
+            partitionSchema,
+            file.partitionValues)
+
+          val iter = new RecordReaderIterator(batchReader)
+          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+          iter.asInstanceOf[Iterator[InternalRow]]
         } else {
-          val joinedRow = new JoinedRow()
-          iter.map(value =>
-            unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues)))
+          val orcRecordReader = new OrcInputFormat[OrcStruct]
+            .createRecordReader(fileSplit, taskAttemptContext)
+          val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
+          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
+          val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+          val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
+
+          if (partitionSchema.length == 0) {
+            iter.map(value => unsafeProjection(deserializer.deserialize(value)))
+          } else {
+            val joinedRow = new JoinedRow()
+            iter.map(value =>
+              unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues)))
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index b03ee06..13a2399 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.orc.{OrcFile, TypeDescription}
+import org.apache.orc.{OrcFile, Reader, TypeDescription}
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
@@ -80,11 +80,8 @@ object OrcUtils extends Logging {
       isCaseSensitive: Boolean,
       dataSchema: StructType,
       requiredSchema: StructType,
-      file: Path,
+      reader: Reader,
       conf: Configuration): Option[Array[Int]] = {
-    val fs = file.getFileSystem(conf)
-    val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-    val reader = OrcFile.createReader(file, readerOptions)
     val orcFieldNames = reader.getSchema.getFieldNames.asScala
     if (orcFieldNames.isEmpty) {
       // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.

http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
new file mode 100644
index 0000000..37ed846
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure ORC read performance.
+ *
+ * This is in `sql/hive` module in order to compare `sql/core` and `sql/hive` ORC data sources.
+ */
+// scalastyle:off line.size.limit
+object OrcReadBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+
+  private val spark = SparkSession.builder()
+    .master("local[1]")
+    .appName("OrcReadBenchmark")
+    .config(conf)
+    .getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+    val path = Utils.createTempDir()
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+    try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+    (keys, values).zipped.foreach(spark.conf.set)
+    try f finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => spark.conf.set(key, value)
+        case (key, None) => spark.conf.unset(key)
+      }
+    }
+  }
+
+  private val NATIVE_ORC_FORMAT = classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName
+  private val HIVE_ORC_FORMAT = classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName
+
+  private def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
+    val dirORC = dir.getCanonicalPath
+
+    if (partition.isDefined) {
+      df.write.partitionBy(partition.get).orc(dirORC)
+    } else {
+      df.write.orc(dirORC)
+    }
+
+    spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable")
+    spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable")
+  }
+
+  def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
+    val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values)
+
+    withTempPath { dir =>
+      withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+        import spark.implicits._
+        spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1")
+
+        prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1"))
+
+        sqlBenchmark.addCase("Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+          spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+        }
+
+        sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+          spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+        }
+
+        /*
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+        SQL Single TINYINT Column Scan:          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1192 / 1221         13.2          75.8       1.0X
+        Native ORC Vectorized                          161 /  170         97.5          10.3       7.4X
+        Hive built-in ORC                             1399 / 1413         11.2          89.0       0.9X
+
+        SQL Single SMALLINT Column Scan:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1287 / 1333         12.2          81.8       1.0X
+        Native ORC Vectorized                          164 /  172         95.6          10.5       7.8X
+        Hive built-in ORC                             1629 / 1650          9.7         103.6       0.8X
+
+        SQL Single INT Column Scan:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1304 / 1388         12.1          82.9       1.0X
+        Native ORC Vectorized                          227 /  240         69.3          14.4       5.7X
+        Hive built-in ORC                             1866 / 1867          8.4         118.6       0.7X
+
+        SQL Single BIGINT Column Scan:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1331 / 1357         11.8          84.6       1.0X
+        Native ORC Vectorized                          289 /  297         54.4          18.4       4.6X
+        Hive built-in ORC                             1922 / 1929          8.2         122.2       0.7X
+
+        SQL Single FLOAT Column Scan:            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1410 / 1428         11.2          89.7       1.0X
+        Native ORC Vectorized                          328 /  335         48.0          20.8       4.3X
+        Hive built-in ORC                             1929 / 2012          8.2         122.6       0.7X
+
+        SQL Single DOUBLE Column Scan:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1467 / 1485         10.7          93.3       1.0X
+        Native ORC Vectorized                          402 /  411         39.1          25.6       3.6X
+        Hive built-in ORC                             2023 / 2042          7.8         128.6       0.7X
+        */
+        sqlBenchmark.run()
+      }
+    }
+  }
+
+  def intStringScanBenchmark(values: Int): Unit = {
+    val benchmark = new Benchmark("Int and String Scan", values)
+
+    withTempPath { dir =>
+      withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+        import spark.implicits._
+        spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1")
+
+        prepareTable(
+          dir,
+          spark.sql("SELECT CAST(value AS INT) AS c1, CAST(value as STRING) AS c2 FROM t1"))
+
+        benchmark.addCase("Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Native ORC Vectorized") { _ =>
+          spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
+          spark.sql("SELECT sum(c1), sum(length(c2)) FROM hiveOrcTable").collect()
+        }
+
+        /*
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+        Int and String Scan:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 2729 / 2744          3.8         260.2       1.0X
+        Native ORC Vectorized                         1318 / 1344          8.0         125.7       2.1X
+        Hive built-in ORC                             3731 / 3782          2.8         355.8       0.7X
+        */
+        benchmark.run()
+      }
+    }
+  }
+
+  def partitionTableScanBenchmark(values: Int): Unit = {
+    val benchmark = new Benchmark("Partitioned Table", values)
+
+    withTempPath { dir =>
+      withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+        import spark.implicits._
+        spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1")
+
+        prepareTable(dir, spark.sql("SELECT value % 2 AS p, value AS id FROM t1"), Some("p"))
+
+        benchmark.addCase("Read data column - Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Read data column - Native ORC Vectorized") { _ =>
+          spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+        }
+
+        benchmark.addCase("Read data column - Hive built-in ORC") { _ =>
+          spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+        }
+
+        benchmark.addCase("Read partition column - Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Read partition column - Native ORC Vectorized") { _ =>
+          spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
+        }
+
+        benchmark.addCase("Read partition column - Hive built-in ORC") { _ =>
+          spark.sql("SELECT sum(p) FROM hiveOrcTable").collect()
+        }
+
+        benchmark.addCase("Read both columns - Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Read both columns - Native ORC Vectorized") { _ =>
+          spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
+        }
+
+        benchmark.addCase("Read both columns - Hive built-in ORC") { _ =>
+          spark.sql("SELECT sum(p), sum(id) FROM hiveOrcTable").collect()
+        }
+
+        /*
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+        Partitioned Table:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Read data column - Native ORC MR               1531 / 1536         10.3          97.4       1.0X
+        Read data column - Native ORC Vectorized        295 /  298         53.3          18.8       5.2X
+        Read data column - Hive built-in ORC           2125 / 2126          7.4         135.1       0.7X
+        Read partition column - Native ORC MR          1049 / 1062         15.0          66.7       1.5X
+        Read partition column - Native ORC Vectorized    54 /   57        290.1           3.4      28.2X
+        Read partition column - Hive built-in ORC      1282 / 1291         12.3          81.5       1.2X
+        Read both columns - Native ORC MR              1594 / 1598          9.9         101.3       1.0X
+        Read both columns - Native ORC Vectorized       332 /  336         47.4          21.1       4.6X
+        Read both columns - Hive built-in ORC          2145 / 2187          7.3         136.4       0.7X
+        */
+        benchmark.run()
+      }
+    }
+  }
+
+  def repeatedStringScanBenchmark(values: Int): Unit = {
+    val benchmark = new Benchmark("Repeated String", values)
+
+    withTempPath { dir =>
+      withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+        spark.range(values).createOrReplaceTempView("t1")
+
+        prepareTable(dir, spark.sql("SELECT CAST((id % 200) + 10000 as STRING) AS c1 FROM t1"))
+
+        benchmark.addCase("Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Native ORC Vectorized") { _ =>
+          spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
+          spark.sql("SELECT sum(length(c1)) FROM hiveOrcTable").collect()
+        }
+
+        /*
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+        Repeated String:                         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1325 / 1328          7.9         126.4       1.0X
+        Native ORC Vectorized                          320 /  330         32.8          30.5       4.1X
+        Hive built-in ORC                             1971 / 1972          5.3         188.0       0.7X
+        */
+        benchmark.run()
+      }
+    }
+  }
+
+  def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
+    withTempPath { dir =>
+      withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+        spark.range(values).createOrReplaceTempView("t1")
+
+        prepareTable(
+          dir,
+          spark.sql(
+            s"SELECT IF(RAND(1) < $fractionOfNulls, NULL, CAST(id as STRING)) AS c1, " +
+            s"IF(RAND(2) < $fractionOfNulls, NULL, CAST(id as STRING)) AS c2 FROM t1"))
+
+        val benchmark = new Benchmark(s"String with Nulls Scan ($fractionOfNulls%)", values)
+
+        benchmark.addCase("Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
+              "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+          }
+        }
+
+        benchmark.addCase("Native ORC Vectorized") { _ =>
+          spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
+            "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
+          spark.sql("SELECT SUM(LENGTH(c2)) FROM hiveOrcTable " +
+            "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+        }
+
+        /*
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+        String with Nulls Scan (0.0%):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 2553 / 2554          4.1         243.4       1.0X
+        Native ORC Vectorized                          953 /  954         11.0          90.9       2.7X
+        Hive built-in ORC                             3875 / 3898          2.7         369.6       0.7X
+
+        String with Nulls Scan (0.5%):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 2389 / 2408          4.4         227.8       1.0X
+        Native ORC Vectorized                         1208 / 1209          8.7         115.2       2.0X
+        Hive built-in ORC                             2940 / 2952          3.6         280.4       0.8X
+
+        String with Nulls Scan (0.95%):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1295 / 1311          8.1         123.5       1.0X
+        Native ORC Vectorized                          449 /  457         23.4          42.8       2.9X
+        Hive built-in ORC                             1649 / 1660          6.4         157.3       0.8X
+        */
+        benchmark.run()
+      }
+    }
+  }
+
+  def columnsBenchmark(values: Int, width: Int): Unit = {
+    val sqlBenchmark = new Benchmark(s"SQL Single Column Scan from $width columns", values)
+
+    withTempPath { dir =>
+      withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+        import spark.implicits._
+        val middle = width / 2
+        val selectExpr = (1 to width).map(i => s"value as c$i")
+        spark.range(values).map(_ => Random.nextLong).toDF()
+          .selectExpr(selectExpr: _*).createOrReplaceTempView("t1")
+
+        prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+        sqlBenchmark.addCase("Native ORC MR") { _ =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+            spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
+          }
+        }
+
+        sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+          spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
+        }
+
+        sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+          spark.sql(s"SELECT sum(c$middle) FROM hiveOrcTable").collect()
+        }
+
+        /*
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+        SQL Single Column Scan from 100 columns: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 1103 / 1124          1.0        1052.0       1.0X
+        Native ORC Vectorized                           92 /  100         11.4          87.9      12.0X
+        Hive built-in ORC                              383 /  390          2.7         365.4       2.9X
+
+        SQL Single Column Scan from 200 columns: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 2245 / 2250          0.5        2141.0       1.0X
+        Native ORC Vectorized                          157 /  165          6.7         150.2      14.3X
+        Hive built-in ORC                              587 /  593          1.8         559.4       3.8X
+
+        SQL Single Column Scan from 300 columns: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+        ------------------------------------------------------------------------------------------------
+        Native ORC MR                                 3343 / 3350          0.3        3188.3       1.0X
+        Native ORC Vectorized                          265 /  280          3.9         253.2      12.6X
+        Hive built-in ORC                              828 /  842          1.3         789.8       4.0X
+        */
+        sqlBenchmark.run()
+      }
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType).foreach { dataType =>
+      numericScanBenchmark(1024 * 1024 * 15, dataType)
+    }
+    intStringScanBenchmark(1024 * 1024 * 10)
+    partitionTableScanBenchmark(1024 * 1024 * 15)
+    repeatedStringScanBenchmark(1024 * 1024 * 10)
+    for (fractionOfNulls <- List(0.0, 0.50, 0.95)) {
+      stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls)
+    }
+    columnsBenchmark(1024 * 1024 * 1, 100)
+    columnsBenchmark(1024 * 1024 * 1, 200)
+    columnsBenchmark(1024 * 1024 * 1, 300)
+  }
+}
+// scalastyle:on line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org