You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/09 13:49:29 UTC
spark git commit: [SPARK-16060][SQL] Support Vectorized ORC Reader
Repository: spark
Updated Branches:
refs/heads/master 6a4206ff0 -> f44ba910f
[SPARK-16060][SQL] Support Vectorized ORC Reader
## What changes were proposed in this pull request?
This PR adds an ORC columnar-batch reader to native `OrcFileFormat`. Since both Spark `ColumnarBatch` and ORC `RowBatch` are used together, it is faster than the current Spark implementation. This replaces the prior PR, #17924.
Also, this PR adds `OrcReadBenchmark` to show the performance improvement.
## How was this patch tested?
Pass the existing test cases.
Author: Dongjoon Hyun <do...@apache.org>
Closes #19943 from dongjoon-hyun/SPARK-16060.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f44ba910
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f44ba910
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f44ba910
Branch: refs/heads/master
Commit: f44ba910f58083458e1133502e193a9d6f2bf766
Parents: 6a4206f
Author: Dongjoon Hyun <do...@apache.org>
Authored: Tue Jan 9 21:48:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jan 9 21:48:14 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 7 +
.../datasources/orc/OrcColumnarBatchReader.java | 523 +++++++++++++++++++
.../datasources/orc/OrcFileFormat.scala | 75 ++-
.../execution/datasources/orc/OrcUtils.scala | 7 +-
.../spark/sql/hive/orc/OrcReadBenchmark.scala | 435 +++++++++++++++
5 files changed, 1022 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5c61f10..74949db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -386,6 +386,11 @@ object SQLConf {
.checkValues(Set("hive", "native"))
.createWithDefault("native")
+ val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
+ .doc("Enables vectorized orc decoding.")
+ .booleanConf
+ .createWithDefault(true)
+
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
@@ -1183,6 +1188,8 @@ class SQLConf extends Serializable with Logging {
def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
+ def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
+
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
new file mode 100644
index 0000000..5c28d0e
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcInputFormat;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.*;
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch.
+ * After creating, `initialize` and `initBatch` should be called sequentially.
+ */
+public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
+
+ /**
+ * The default size of batch. We use this value for both ORC and Spark consistently
+ * because they have different default values like the following.
+ *
+ * - ORC's VectorizedRowBatch.DEFAULT_SIZE = 1024
+ * - Spark's ColumnarBatch.DEFAULT_BATCH_SIZE = 4 * 1024
+ */
+ public static final int DEFAULT_SIZE = 4 * 1024;
+
+ // ORC File Reader
+ private Reader reader;
+
+ // Vectorized ORC Row Batch
+ private VectorizedRowBatch batch;
+
+ /**
+ * The column IDs of the physical ORC file schema which are required by this reader.
+ * -1 means this required column doesn't exist in the ORC file.
+ */
+ private int[] requestedColIds;
+
+ // Record reader from ORC row batch.
+ private org.apache.orc.RecordReader recordReader;
+
+ private StructField[] requiredFields;
+
+ // The result columnar batch for vectorized execution by whole-stage codegen.
+ private ColumnarBatch columnarBatch;
+
+ // Writable column vectors of the result columnar batch.
+ private WritableColumnVector[] columnVectors;
+
+ /**
+ * The memory mode of the columnarBatch
+ */
+ private final MemoryMode MEMORY_MODE;
+
+ public OrcColumnarBatchReader(boolean useOffHeap) {
+ MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+ }
+
+
+ @Override
+ public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public ColumnarBatch getCurrentValue() throws IOException, InterruptedException {
+ return columnarBatch;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return recordReader.getProgress();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return nextBatch();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (columnarBatch != null) {
+ columnarBatch.close();
+ columnarBatch = null;
+ }
+ if (recordReader != null) {
+ recordReader.close();
+ recordReader = null;
+ }
+ }
+
+ /**
+ * Initialize ORC file reader and batch record reader.
+ * Please note that `initBatch` is needed to be called after this.
+ */
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit)inputSplit;
+ Configuration conf = taskAttemptContext.getConfiguration();
+ reader = OrcFile.createReader(
+ fileSplit.getPath(),
+ OrcFile.readerOptions(conf)
+ .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+ .filesystem(fileSplit.getPath().getFileSystem(conf)));
+
+ Reader.Options options =
+ OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength());
+ recordReader = reader.rows(options);
+ }
+
+ /**
+ * Initialize columnar batch by setting required schema and partition information.
+ * With this information, this creates ColumnarBatch with the full schema.
+ */
+ public void initBatch(
+ TypeDescription orcSchema,
+ int[] requestedColIds,
+ StructField[] requiredFields,
+ StructType partitionSchema,
+ InternalRow partitionValues) {
+ batch = orcSchema.createRowBatch(DEFAULT_SIZE);
+ assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
+
+ this.requiredFields = requiredFields;
+ this.requestedColIds = requestedColIds;
+ assert(requiredFields.length == requestedColIds.length);
+
+ StructType resultSchema = new StructType(requiredFields);
+ for (StructField f : partitionSchema.fields()) {
+ resultSchema = resultSchema.add(f);
+ }
+
+ int capacity = DEFAULT_SIZE;
+ if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
+ columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
+ } else {
+ columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
+ }
+ columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
+
+ if (partitionValues.numFields() > 0) {
+ int partitionIdx = requiredFields.length;
+ for (int i = 0; i < partitionValues.numFields(); i++) {
+ ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
+ columnVectors[i + partitionIdx].setIsConstant();
+ }
+ }
+
+ // Initialize the missing columns once.
+ for (int i = 0; i < requiredFields.length; i++) {
+ if (requestedColIds[i] == -1) {
+ columnVectors[i].putNulls(0, columnarBatch.capacity());
+ columnVectors[i].setIsConstant();
+ }
+ }
+ }
+
+ /**
+ * Return true if there exists more data in the next batch. If exists, prepare the next batch
+ * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
+ */
+ private boolean nextBatch() throws IOException {
+ for (WritableColumnVector vector : columnVectors) {
+ vector.reset();
+ }
+ columnarBatch.setNumRows(0);
+
+ recordReader.nextBatch(batch);
+ int batchSize = batch.size;
+ if (batchSize == 0) {
+ return false;
+ }
+ columnarBatch.setNumRows(batchSize);
+ for (int i = 0; i < requiredFields.length; i++) {
+ StructField field = requiredFields[i];
+ WritableColumnVector toColumn = columnVectors[i];
+
+ if (requestedColIds[i] >= 0) {
+ ColumnVector fromColumn = batch.cols[requestedColIds[i]];
+
+ if (fromColumn.isRepeating) {
+ putRepeatingValues(batchSize, field, fromColumn, toColumn);
+ } else if (fromColumn.noNulls) {
+ putNonNullValues(batchSize, field, fromColumn, toColumn);
+ } else {
+ putValues(batchSize, field, fromColumn, toColumn);
+ }
+ }
+ }
+ return true;
+ }
+
+ private void putRepeatingValues(
+ int batchSize,
+ StructField field,
+ ColumnVector fromColumn,
+ WritableColumnVector toColumn) {
+ if (fromColumn.isNull[0]) {
+ toColumn.putNulls(0, batchSize);
+ } else {
+ DataType type = field.dataType();
+ if (type instanceof BooleanType) {
+ toColumn.putBooleans(0, batchSize, ((LongColumnVector)fromColumn).vector[0] == 1);
+ } else if (type instanceof ByteType) {
+ toColumn.putBytes(0, batchSize, (byte)((LongColumnVector)fromColumn).vector[0]);
+ } else if (type instanceof ShortType) {
+ toColumn.putShorts(0, batchSize, (short)((LongColumnVector)fromColumn).vector[0]);
+ } else if (type instanceof IntegerType || type instanceof DateType) {
+ toColumn.putInts(0, batchSize, (int)((LongColumnVector)fromColumn).vector[0]);
+ } else if (type instanceof LongType) {
+ toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector[0]);
+ } else if (type instanceof TimestampType) {
+ toColumn.putLongs(0, batchSize,
+ fromTimestampColumnVector((TimestampColumnVector)fromColumn, 0));
+ } else if (type instanceof FloatType) {
+ toColumn.putFloats(0, batchSize, (float)((DoubleColumnVector)fromColumn).vector[0]);
+ } else if (type instanceof DoubleType) {
+ toColumn.putDoubles(0, batchSize, ((DoubleColumnVector)fromColumn).vector[0]);
+ } else if (type instanceof StringType || type instanceof BinaryType) {
+ BytesColumnVector data = (BytesColumnVector)fromColumn;
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ int size = data.vector[0].length;
+ arrayData.reserve(size);
+ arrayData.putBytes(0, size, data.vector[0], 0);
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putArray(index, 0, size);
+ }
+ } else if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType)type;
+ putDecimalWritables(
+ toColumn,
+ batchSize,
+ decimalType.precision(),
+ decimalType.scale(),
+ ((DecimalColumnVector)fromColumn).vector[0]);
+ } else {
+ throw new UnsupportedOperationException("Unsupported Data Type: " + type);
+ }
+ }
+ }
+
+ private void putNonNullValues(
+ int batchSize,
+ StructField field,
+ ColumnVector fromColumn,
+ WritableColumnVector toColumn) {
+ DataType type = field.dataType();
+ if (type instanceof BooleanType) {
+ long[] data = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putBoolean(index, data[index] == 1);
+ }
+ } else if (type instanceof ByteType) {
+ long[] data = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putByte(index, (byte)data[index]);
+ }
+ } else if (type instanceof ShortType) {
+ long[] data = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putShort(index, (short)data[index]);
+ }
+ } else if (type instanceof IntegerType || type instanceof DateType) {
+ long[] data = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putInt(index, (int)data[index]);
+ }
+ } else if (type instanceof LongType) {
+ toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector, 0);
+ } else if (type instanceof TimestampType) {
+ TimestampColumnVector data = ((TimestampColumnVector)fromColumn);
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putLong(index, fromTimestampColumnVector(data, index));
+ }
+ } else if (type instanceof FloatType) {
+ double[] data = ((DoubleColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ toColumn.putFloat(index, (float)data[index]);
+ }
+ } else if (type instanceof DoubleType) {
+ toColumn.putDoubles(0, batchSize, ((DoubleColumnVector)fromColumn).vector, 0);
+ } else if (type instanceof StringType || type instanceof BinaryType) {
+ BytesColumnVector data = ((BytesColumnVector)fromColumn);
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ int totalNumBytes = IntStream.of(data.length).sum();
+ arrayData.reserve(totalNumBytes);
+ for (int index = 0, pos = 0; index < batchSize; pos += data.length[index], index++) {
+ arrayData.putBytes(pos, data.length[index], data.vector[index], data.start[index]);
+ toColumn.putArray(index, pos, data.length[index]);
+ }
+ } else if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType)type;
+ DecimalColumnVector data = ((DecimalColumnVector)fromColumn);
+ if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ arrayData.reserve(batchSize * 16);
+ }
+ for (int index = 0; index < batchSize; index++) {
+ putDecimalWritable(
+ toColumn,
+ index,
+ decimalType.precision(),
+ decimalType.scale(),
+ data.vector[index]);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported Data Type: " + type);
+ }
+ }
+
+ private void putValues(
+ int batchSize,
+ StructField field,
+ ColumnVector fromColumn,
+ WritableColumnVector toColumn) {
+ DataType type = field.dataType();
+ if (type instanceof BooleanType) {
+ long[] vector = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putBoolean(index, vector[index] == 1);
+ }
+ }
+ } else if (type instanceof ByteType) {
+ long[] vector = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putByte(index, (byte)vector[index]);
+ }
+ }
+ } else if (type instanceof ShortType) {
+ long[] vector = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putShort(index, (short)vector[index]);
+ }
+ }
+ } else if (type instanceof IntegerType || type instanceof DateType) {
+ long[] vector = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putInt(index, (int)vector[index]);
+ }
+ }
+ } else if (type instanceof LongType) {
+ long[] vector = ((LongColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putLong(index, vector[index]);
+ }
+ }
+ } else if (type instanceof TimestampType) {
+ TimestampColumnVector vector = ((TimestampColumnVector)fromColumn);
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putLong(index, fromTimestampColumnVector(vector, index));
+ }
+ }
+ } else if (type instanceof FloatType) {
+ double[] vector = ((DoubleColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putFloat(index, (float)vector[index]);
+ }
+ }
+ } else if (type instanceof DoubleType) {
+ double[] vector = ((DoubleColumnVector)fromColumn).vector;
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ toColumn.putDouble(index, vector[index]);
+ }
+ }
+ } else if (type instanceof StringType || type instanceof BinaryType) {
+ BytesColumnVector vector = (BytesColumnVector)fromColumn;
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ int totalNumBytes = IntStream.of(vector.length).sum();
+ arrayData.reserve(totalNumBytes);
+ for (int index = 0, pos = 0; index < batchSize; pos += vector.length[index], index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ arrayData.putBytes(pos, vector.length[index], vector.vector[index], vector.start[index]);
+ toColumn.putArray(index, pos, vector.length[index]);
+ }
+ }
+ } else if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType)type;
+ HiveDecimalWritable[] vector = ((DecimalColumnVector)fromColumn).vector;
+ if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ arrayData.reserve(batchSize * 16);
+ }
+ for (int index = 0; index < batchSize; index++) {
+ if (fromColumn.isNull[index]) {
+ toColumn.putNull(index);
+ } else {
+ putDecimalWritable(
+ toColumn,
+ index,
+ decimalType.precision(),
+ decimalType.scale(),
+ vector[index]);
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported Data Type: " + type);
+ }
+ }
+
+ /**
+ * Returns the number of micros since epoch from an element of TimestampColumnVector.
+ */
+ private static long fromTimestampColumnVector(TimestampColumnVector vector, int index) {
+ return vector.time[index] * 1000L + vector.nanos[index] / 1000L;
+ }
+
+ /**
+ * Put a `HiveDecimalWritable` to a `WritableColumnVector`.
+ */
+ private static void putDecimalWritable(
+ WritableColumnVector toColumn,
+ int index,
+ int precision,
+ int scale,
+ HiveDecimalWritable decimalWritable) {
+ HiveDecimal decimal = decimalWritable.getHiveDecimal();
+ Decimal value =
+ Decimal.apply(decimal.bigDecimalValue(), decimal.precision(), decimal.scale());
+ value.changePrecision(precision, scale);
+
+ if (precision <= Decimal.MAX_INT_DIGITS()) {
+ toColumn.putInt(index, (int) value.toUnscaledLong());
+ } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ toColumn.putLong(index, value.toUnscaledLong());
+ } else {
+ byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ arrayData.putBytes(index * 16, bytes.length, bytes, 0);
+ toColumn.putArray(index, index * 16, bytes.length);
+ }
+ }
+
+ /**
+ * Put `HiveDecimalWritable`s to a `WritableColumnVector`.
+ */
+ private static void putDecimalWritables(
+ WritableColumnVector toColumn,
+ int size,
+ int precision,
+ int scale,
+ HiveDecimalWritable decimalWritable) {
+ HiveDecimal decimal = decimalWritable.getHiveDecimal();
+ Decimal value =
+ Decimal.apply(decimal.bigDecimalValue(), decimal.precision(), decimal.scale());
+ value.changePrecision(precision, scale);
+
+ if (precision <= Decimal.MAX_INT_DIGITS()) {
+ toColumn.putInts(0, size, (int) value.toUnscaledLong());
+ } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ toColumn.putLongs(0, size, value.toUnscaledLong());
+ } else {
+ byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
+ WritableColumnVector arrayData = toColumn.getChildColumn(0);
+ arrayData.reserve(bytes.length);
+ arrayData.putBytes(0, bytes.length, bytes, 0);
+ for (int index = 0; index < size; index++) {
+ toColumn.putArray(index, 0, bytes.length);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index f7471cd..b8bacfa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -118,6 +118,13 @@ class OrcFileFormat
}
}
+ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
@@ -139,6 +146,11 @@ class OrcFileFormat
}
}
+ val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
+ val sqlConf = sparkSession.sessionState.conf
+ val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+ val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
+
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -146,8 +158,14 @@ class OrcFileFormat
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
+ val filePath = new Path(new URI(file.filePath))
+
+ val fs = filePath.getFileSystem(conf)
+ val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
+ val reader = OrcFile.createReader(filePath, readerOptions)
+
val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, requiredSchema, new Path(new URI(file.filePath)), conf)
+ isCaseSensitive, dataSchema, requiredSchema, reader, conf)
if (requestedColIdsOrEmptyFile.isEmpty) {
Iterator.empty
@@ -155,29 +173,46 @@ class OrcFileFormat
val requestedColIds = requestedColIdsOrEmptyFile.get
assert(requestedColIds.length == requiredSchema.length,
"[BUG] requested column IDs do not match required schema")
- conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
+ val taskConf = new Configuration(conf)
+ taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
requestedColIds.filter(_ != -1).sorted.mkString(","))
- val fileSplit =
- new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+ val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
- val taskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- val orcRecordReader = new OrcInputFormat[OrcStruct]
- .createRecordReader(fileSplit, taskAttemptContext)
- val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
- Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
-
- val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
- val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
- val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
-
- if (partitionSchema.length == 0) {
- iter.map(value => unsafeProjection(deserializer.deserialize(value)))
+ val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
+
+ val taskContext = Option(TaskContext.get())
+ if (enableVectorizedReader) {
+ val batchReader =
+ new OrcColumnarBatchReader(enableOffHeapColumnVector && taskContext.isDefined)
+ batchReader.initialize(fileSplit, taskAttemptContext)
+ batchReader.initBatch(
+ reader.getSchema,
+ requestedColIds,
+ requiredSchema.fields,
+ partitionSchema,
+ file.partitionValues)
+
+ val iter = new RecordReaderIterator(batchReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+ iter.asInstanceOf[Iterator[InternalRow]]
} else {
- val joinedRow = new JoinedRow()
- iter.map(value =>
- unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues)))
+ val orcRecordReader = new OrcInputFormat[OrcStruct]
+ .createRecordReader(fileSplit, taskAttemptContext)
+ val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
+ val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
+
+ if (partitionSchema.length == 0) {
+ iter.map(value => unsafeProjection(deserializer.deserialize(value)))
+ } else {
+ val joinedRow = new JoinedRow()
+ iter.map(value =>
+ unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues)))
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index b03ee06..13a2399 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.orc.{OrcFile, TypeDescription}
+import org.apache.orc.{OrcFile, Reader, TypeDescription}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
@@ -80,11 +80,8 @@ object OrcUtils extends Logging {
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
- file: Path,
+ reader: Reader,
conf: Configuration): Option[Array[Int]] = {
- val fs = file.getFileSystem(conf)
- val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val reader = OrcFile.createReader(file, readerOptions)
val orcFieldNames = reader.getSchema.getFieldNames.asScala
if (orcFieldNames.isEmpty) {
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
http://git-wip-us.apache.org/repos/asf/spark/blob/f44ba910/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
new file mode 100644
index 0000000..37ed846
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure ORC read performance.
+ *
+ * This is in `sql/hive` module in order to compare `sql/core` and `sql/hive` ORC data sources.
+ */
+// scalastyle:off line.size.limit
+object OrcReadBenchmark {
+ val conf = new SparkConf()
+ conf.set("orc.compression", "snappy")
+
+ private val spark = SparkSession.builder()
+ .master("local[1]")
+ .appName("OrcReadBenchmark")
+ .config(conf)
+ .getOrCreate()
+
+ // Set default configs. Individual cases will change them if necessary.
+ spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+ def withTempPath(f: File => Unit): Unit = {
+ val path = Utils.createTempDir()
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+
+ def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+ try f finally tableNames.foreach(spark.catalog.dropTempView)
+ }
+
+ def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+ (keys, values).zipped.foreach(spark.conf.set)
+ try f finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => spark.conf.set(key, value)
+ case (key, None) => spark.conf.unset(key)
+ }
+ }
+ }
+
+ private val NATIVE_ORC_FORMAT = classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName
+ private val HIVE_ORC_FORMAT = classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName
+
+ private def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
+ val dirORC = dir.getCanonicalPath
+
+ if (partition.isDefined) {
+ df.write.partitionBy(partition.get).orc(dirORC)
+ } else {
+ df.write.orc(dirORC)
+ }
+
+ spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable")
+ spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable")
+ }
+
+ def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
+ val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values)
+
+ withTempPath { dir =>
+ withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+ import spark.implicits._
+ spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1")
+
+ prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1"))
+
+ sqlBenchmark.addCase("Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+ }
+ }
+
+ sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+ spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+ }
+
+ sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+ spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+ }
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+ SQL Single TINYINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1192 / 1221 13.2 75.8 1.0X
+ Native ORC Vectorized 161 / 170 97.5 10.3 7.4X
+ Hive built-in ORC 1399 / 1413 11.2 89.0 0.9X
+
+ SQL Single SMALLINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1287 / 1333 12.2 81.8 1.0X
+ Native ORC Vectorized 164 / 172 95.6 10.5 7.8X
+ Hive built-in ORC 1629 / 1650 9.7 103.6 0.8X
+
+ SQL Single INT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1304 / 1388 12.1 82.9 1.0X
+ Native ORC Vectorized 227 / 240 69.3 14.4 5.7X
+ Hive built-in ORC 1866 / 1867 8.4 118.6 0.7X
+
+ SQL Single BIGINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1331 / 1357 11.8 84.6 1.0X
+ Native ORC Vectorized 289 / 297 54.4 18.4 4.6X
+ Hive built-in ORC 1922 / 1929 8.2 122.2 0.7X
+
+ SQL Single FLOAT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1410 / 1428 11.2 89.7 1.0X
+ Native ORC Vectorized 328 / 335 48.0 20.8 4.3X
+ Hive built-in ORC 1929 / 2012 8.2 122.6 0.7X
+
+ SQL Single DOUBLE Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1467 / 1485 10.7 93.3 1.0X
+ Native ORC Vectorized 402 / 411 39.1 25.6 3.6X
+ Hive built-in ORC 2023 / 2042 7.8 128.6 0.7X
+ */
+ sqlBenchmark.run()
+ }
+ }
+ }
+
+ def intStringScanBenchmark(values: Int): Unit = {
+ val benchmark = new Benchmark("Int and String Scan", values)
+
+ withTempPath { dir =>
+ withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+ import spark.implicits._
+ spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1")
+
+ prepareTable(
+ dir,
+ spark.sql("SELECT CAST(value AS INT) AS c1, CAST(value as STRING) AS c2 FROM t1"))
+
+ benchmark.addCase("Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
+ }
+ }
+
+ benchmark.addCase("Native ORC Vectorized") { _ =>
+ spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
+ }
+
+ benchmark.addCase("Hive built-in ORC") { _ =>
+ spark.sql("SELECT sum(c1), sum(length(c2)) FROM hiveOrcTable").collect()
+ }
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+ Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 2729 / 2744 3.8 260.2 1.0X
+ Native ORC Vectorized 1318 / 1344 8.0 125.7 2.1X
+ Hive built-in ORC 3731 / 3782 2.8 355.8 0.7X
+ */
+ benchmark.run()
+ }
+ }
+ }
+
+ def partitionTableScanBenchmark(values: Int): Unit = {
+ val benchmark = new Benchmark("Partitioned Table", values)
+
+ withTempPath { dir =>
+ withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+ import spark.implicits._
+ spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1")
+
+ prepareTable(dir, spark.sql("SELECT value % 2 AS p, value AS id FROM t1"), Some("p"))
+
+ benchmark.addCase("Read data column - Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+ }
+ }
+
+ benchmark.addCase("Read data column - Native ORC Vectorized") { _ =>
+ spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+ }
+
+ benchmark.addCase("Read data column - Hive built-in ORC") { _ =>
+ spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+ }
+
+ benchmark.addCase("Read partition column - Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
+ }
+ }
+
+ benchmark.addCase("Read partition column - Native ORC Vectorized") { _ =>
+ spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
+ }
+
+ benchmark.addCase("Read partition column - Hive built-in ORC") { _ =>
+ spark.sql("SELECT sum(p) FROM hiveOrcTable").collect()
+ }
+
+ benchmark.addCase("Read both columns - Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
+ }
+ }
+
+ benchmark.addCase("Read both columns - Native ORC Vectorized") { _ =>
+ spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
+ }
+
+ benchmark.addCase("Read both columns - Hive built-in ORC") { _ =>
+ spark.sql("SELECT sum(p), sum(id) FROM hiveOrcTable").collect()
+ }
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+ Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Read data column - Native ORC MR 1531 / 1536 10.3 97.4 1.0X
+ Read data column - Native ORC Vectorized 295 / 298 53.3 18.8 5.2X
+ Read data column - Hive built-in ORC 2125 / 2126 7.4 135.1 0.7X
+ Read partition column - Native ORC MR 1049 / 1062 15.0 66.7 1.5X
+ Read partition column - Native ORC Vectorized 54 / 57 290.1 3.4 28.2X
+ Read partition column - Hive built-in ORC 1282 / 1291 12.3 81.5 1.2X
+ Read both columns - Native ORC MR 1594 / 1598 9.9 101.3 1.0X
+ Read both columns - Native ORC Vectorized 332 / 336 47.4 21.1 4.6X
+ Read both columns - Hive built-in ORC 2145 / 2187 7.3 136.4 0.7X
+ */
+ benchmark.run()
+ }
+ }
+ }
+
+ def repeatedStringScanBenchmark(values: Int): Unit = {
+ val benchmark = new Benchmark("Repeated String", values)
+
+ withTempPath { dir =>
+ withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+ spark.range(values).createOrReplaceTempView("t1")
+
+ prepareTable(dir, spark.sql("SELECT CAST((id % 200) + 10000 as STRING) AS c1 FROM t1"))
+
+ benchmark.addCase("Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
+ }
+ }
+
+ benchmark.addCase("Native ORC Vectorized") { _ =>
+ spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
+ }
+
+ benchmark.addCase("Hive built-in ORC") { _ =>
+ spark.sql("SELECT sum(length(c1)) FROM hiveOrcTable").collect()
+ }
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+ Repeated String: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1325 / 1328 7.9 126.4 1.0X
+ Native ORC Vectorized 320 / 330 32.8 30.5 4.1X
+ Hive built-in ORC 1971 / 1972 5.3 188.0 0.7X
+ */
+ benchmark.run()
+ }
+ }
+ }
+
+ def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
+ withTempPath { dir =>
+ withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+ spark.range(values).createOrReplaceTempView("t1")
+
+ prepareTable(
+ dir,
+ spark.sql(
+ s"SELECT IF(RAND(1) < $fractionOfNulls, NULL, CAST(id as STRING)) AS c1, " +
+ s"IF(RAND(2) < $fractionOfNulls, NULL, CAST(id as STRING)) AS c2 FROM t1"))
+
+ val benchmark = new Benchmark(s"String with Nulls Scan ($fractionOfNulls%)", values)
+
+ benchmark.addCase("Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
+ "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+ }
+ }
+
+ benchmark.addCase("Native ORC Vectorized") { _ =>
+ spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
+ "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+ }
+
+ benchmark.addCase("Hive built-in ORC") { _ =>
+ spark.sql("SELECT SUM(LENGTH(c2)) FROM hiveOrcTable " +
+ "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+ }
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+ String with Nulls Scan (0.0%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 2553 / 2554 4.1 243.4 1.0X
+ Native ORC Vectorized 953 / 954 11.0 90.9 2.7X
+ Hive built-in ORC 3875 / 3898 2.7 369.6 0.7X
+
+ String with Nulls Scan (0.5%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 2389 / 2408 4.4 227.8 1.0X
+ Native ORC Vectorized 1208 / 1209 8.7 115.2 2.0X
+ Hive built-in ORC 2940 / 2952 3.6 280.4 0.8X
+
+ String with Nulls Scan (0.95%): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1295 / 1311 8.1 123.5 1.0X
+ Native ORC Vectorized 449 / 457 23.4 42.8 2.9X
+ Hive built-in ORC 1649 / 1660 6.4 157.3 0.8X
+ */
+ benchmark.run()
+ }
+ }
+ }
+
+ def columnsBenchmark(values: Int, width: Int): Unit = {
+ val sqlBenchmark = new Benchmark(s"SQL Single Column Scan from $width columns", values)
+
+ withTempPath { dir =>
+ withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+ import spark.implicits._
+ val middle = width / 2
+ val selectExpr = (1 to width).map(i => s"value as c$i")
+ spark.range(values).map(_ => Random.nextLong).toDF()
+ .selectExpr(selectExpr: _*).createOrReplaceTempView("t1")
+
+ prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+ sqlBenchmark.addCase("Native ORC MR") { _ =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
+ spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
+ }
+ }
+
+ sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+ spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
+ }
+
+ sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+ spark.sql(s"SELECT sum(c$middle) FROM hiveOrcTable").collect()
+ }
+
+ /*
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+ Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+ SQL Single Column Scan from 100 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 1103 / 1124 1.0 1052.0 1.0X
+ Native ORC Vectorized 92 / 100 11.4 87.9 12.0X
+ Hive built-in ORC 383 / 390 2.7 365.4 2.9X
+
+ SQL Single Column Scan from 200 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 2245 / 2250 0.5 2141.0 1.0X
+ Native ORC Vectorized 157 / 165 6.7 150.2 14.3X
+ Hive built-in ORC 587 / 593 1.8 559.4 3.8X
+
+ SQL Single Column Scan from 300 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Native ORC MR 3343 / 3350 0.3 3188.3 1.0X
+ Native ORC Vectorized 265 / 280 3.9 253.2 12.6X
+ Hive built-in ORC 828 / 842 1.3 789.8 4.0X
+ */
+ sqlBenchmark.run()
+ }
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType).foreach { dataType =>
+ numericScanBenchmark(1024 * 1024 * 15, dataType)
+ }
+ intStringScanBenchmark(1024 * 1024 * 10)
+ partitionTableScanBenchmark(1024 * 1024 * 15)
+ repeatedStringScanBenchmark(1024 * 1024 * 10)
+ for (fractionOfNulls <- List(0.0, 0.50, 0.95)) {
+ stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls)
+ }
+ columnsBenchmark(1024 * 1024 * 1, 100)
+ columnsBenchmark(1024 * 1024 * 1, 200)
+ columnsBenchmark(1024 * 1024 * 1, 300)
+ }
+}
+// scalastyle:on line.size.limit
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org