You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/10 16:42:53 UTC
[spark] branch master updated: [SPARK-26584][SQL] Remove
`spark.sql.orc.copyBatchToSpark` internal conf
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 270916f [SPARK-26584][SQL] Remove `spark.sql.orc.copyBatchToSpark` internal conf
270916f is described below
commit 270916f8cd8ba01341f2a38a8376e9e4be08a2e8
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu Jan 10 08:42:23 2019 -0800
[SPARK-26584][SQL] Remove `spark.sql.orc.copyBatchToSpark` internal conf
## What changes were proposed in this pull request?
This PR aims to remove internal ORC configuration to simplify the code path for Spark 3.0.0. This removes the configuration `spark.sql.orc.copyBatchToSpark` and related ORC codes including tests and benchmarks.
## How was this patch tested?
Pass the Jenkins with the reduced test coverage.
Closes #23503 from dongjoon-hyun/SPARK-26584.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 7 -
.../benchmarks/DataSourceReadBenchmark-results.txt | 17 -
.../datasources/orc/OrcColumnarBatchReader.java | 400 ++-------------------
.../execution/datasources/orc/OrcFileFormat.scala | 7 +-
.../benchmark/DataSourceReadBenchmark.scala | 50 ---
.../orc/OrcColumnarBatchReaderSuite.scala | 2 +-
sql/hive/benchmarks/OrcReadBenchmark-results.txt | 17 -
.../spark/sql/hive/orc/OrcReadBenchmark.scala | 49 ---
8 files changed, 26 insertions(+), 523 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fe445e0..9804af7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -524,13 +524,6 @@ object SQLConf {
.intConf
.createWithDefault(4096)
- val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
- .doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the " +
- "vectorized ORC reader.")
- .internal()
- .booleanConf
- .createWithDefault(false)
-
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
diff --git a/sql/core/benchmarks/DataSourceReadBenchmark-results.txt b/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
index b07e8b1..f547f61 100644
--- a/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
+++ b/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
@@ -11,7 +11,6 @@ SQL Json 8709 / 8724 1.8 5
SQL Parquet Vectorized 166 / 187 94.8 10.5 159.0X
SQL Parquet MR 1706 / 1720 9.2 108.4 15.5X
SQL ORC Vectorized 167 / 174 94.2 10.6 157.9X
-SQL ORC Vectorized with copy 226 / 231 69.6 14.4 116.7X
SQL ORC MR 1433 / 1465 11.0 91.1 18.4X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -30,7 +29,6 @@ SQL Json 8990 / 8998 1.7 5
SQL Parquet Vectorized 209 / 221 75.1 13.3 126.5X
SQL Parquet MR 1949 / 1949 8.1 123.9 13.6X
SQL ORC Vectorized 221 / 228 71.3 14.0 120.1X
-SQL ORC Vectorized with copy 315 / 319 49.9 20.1 84.0X
SQL ORC MR 1527 / 1549 10.3 97.1 17.3X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -49,7 +47,6 @@ SQL Json 9703 / 9733 1.6 6
SQL Parquet Vectorized 176 / 182 89.2 11.2 157.0X
SQL Parquet MR 2164 / 2173 7.3 137.6 12.8X
SQL ORC Vectorized 307 / 314 51.2 19.5 90.2X
-SQL ORC Vectorized with copy 312 / 319 50.4 19.8 88.7X
SQL ORC MR 1690 / 1700 9.3 107.4 16.4X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -68,7 +65,6 @@ SQL Json 12570 / 12617 1.3 7
SQL Parquet Vectorized 270 / 308 58.2 17.2 128.9X
SQL Parquet MR 2427 / 2431 6.5 154.3 14.3X
SQL ORC Vectorized 388 / 398 40.6 24.6 89.8X
-SQL ORC Vectorized with copy 395 / 402 39.9 25.1 88.2X
SQL ORC MR 1819 / 1851 8.6 115.7 19.1X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -87,7 +83,6 @@ SQL Json 12039 / 12215 1.3 7
SQL Parquet Vectorized 170 / 177 92.4 10.8 169.0X
SQL Parquet MR 2184 / 2196 7.2 138.9 13.2X
SQL ORC Vectorized 432 / 440 36.4 27.5 66.5X
-SQL ORC Vectorized with copy 439 / 442 35.9 27.9 65.6X
SQL ORC MR 1812 / 1833 8.7 115.2 15.9X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -106,7 +101,6 @@ SQL Json 18895 / 18898 0.8 12
SQL Parquet Vectorized 267 / 276 58.9 17.0 135.6X
SQL Parquet MR 2355 / 2363 6.7 149.7 15.4X
SQL ORC Vectorized 543 / 546 29.0 34.5 66.6X
-SQL ORC Vectorized with copy 548 / 557 28.7 34.8 66.0X
SQL ORC MR 2246 / 2258 7.0 142.8 16.1X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -130,7 +124,6 @@ SQL Json 12145 / 12174 0.9 11
SQL Parquet Vectorized 2363 / 2377 4.4 225.3 8.9X
SQL Parquet MR 4555 / 4557 2.3 434.4 4.6X
SQL ORC Vectorized 2361 / 2388 4.4 225.1 9.0X
-SQL ORC Vectorized with copy 2540 / 2557 4.1 242.2 8.3X
SQL ORC MR 4186 / 4209 2.5 399.2 5.0X
@@ -147,7 +140,6 @@ SQL Json 7025 / 7025 1.5 6
SQL Parquet Vectorized 803 / 821 13.1 76.6 14.6X
SQL Parquet MR 1776 / 1790 5.9 169.4 6.6X
SQL ORC Vectorized 491 / 494 21.4 46.8 23.8X
-SQL ORC Vectorized with copy 723 / 725 14.5 68.9 16.2X
SQL ORC MR 2050 / 2063 5.1 195.5 5.7X
@@ -164,21 +156,18 @@ Data column - Json 12876 / 12882 1.2 8
Data column - Parquet Vectorized 277 / 282 56.7 17.6 111.6X
Data column - Parquet MR 3398 / 3402 4.6 216.0 9.1X
Data column - ORC Vectorized 399 / 407 39.4 25.4 77.5X
-Data column - ORC Vectorized with copy 407 / 447 38.6 25.9 76.0X
Data column - ORC MR 2583 / 2589 6.1 164.2 12.0X
Partition column - CSV 7403 / 7427 2.1 470.7 4.2X
Partition column - Json 5587 / 5625 2.8 355.2 5.5X
Partition column - Parquet Vectorized 71 / 78 222.6 4.5 438.3X
Partition column - Parquet MR 1798 / 1808 8.7 114.3 17.2X
Partition column - ORC Vectorized 72 / 75 219.0 4.6 431.2X
-Partition column - ORC Vectorized with copy 71 / 77 221.1 4.5 435.4X
Partition column - ORC MR 1772 / 1778 8.9 112.6 17.5X
Both columns - CSV 30211 / 30212 0.5 1920.7 1.0X
Both columns - Json 13382 / 13391 1.2 850.8 2.3X
Both columns - Parquet Vectorized 321 / 333 49.0 20.4 96.4X
Both columns - Parquet MR 3656 / 3661 4.3 232.4 8.5X
Both columns - ORC Vectorized 443 / 448 35.5 28.2 69.9X
-Both column - ORC Vectorized with copy 527 / 533 29.9 33.5 58.8X
Both columns - ORC MR 2626 / 2633 6.0 167.0 11.8X
@@ -196,7 +185,6 @@ SQL Parquet Vectorized 1563 / 1564 6.7 1
SQL Parquet MR 3835 / 3836 2.7 365.8 3.6X
ParquetReader Vectorized 1115 / 1118 9.4 106.4 12.5X
SQL ORC Vectorized 1172 / 1208 8.9 111.8 11.9X
-SQL ORC Vectorized with copy 1630 / 1644 6.4 155.5 8.5X
SQL ORC MR 3708 / 3711 2.8 353.6 3.8X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -209,7 +197,6 @@ SQL Parquet Vectorized 1103 / 1112 9.5 1
SQL Parquet MR 2841 / 2847 3.7 271.0 4.9X
ParquetReader Vectorized 992 / 1012 10.6 94.6 14.1X
SQL ORC Vectorized 1275 / 1349 8.2 121.6 11.0X
-SQL ORC Vectorized with copy 1631 / 1644 6.4 155.5 8.6X
SQL ORC MR 3244 / 3259 3.2 309.3 4.3X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -222,7 +209,6 @@ SQL Parquet Vectorized 238 / 242 44.1
SQL Parquet MR 1730 / 1734 6.1 165.0 6.5X
ParquetReader Vectorized 237 / 238 44.3 22.6 47.4X
SQL ORC Vectorized 459 / 462 22.8 43.8 24.4X
-SQL ORC Vectorized with copy 581 / 583 18.1 55.4 19.3X
SQL ORC MR 1767 / 1783 5.9 168.5 6.4X
@@ -239,7 +225,6 @@ SQL Json 2808 / 2843 0.4 26
SQL Parquet Vectorized 56 / 63 18.9 52.9 59.8X
SQL Parquet MR 215 / 219 4.9 205.4 15.4X
SQL ORC Vectorized 64 / 76 16.4 60.9 52.0X
-SQL ORC Vectorized with copy 64 / 67 16.3 61.3 51.7X
SQL ORC MR 314 / 316 3.3 299.6 10.6X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -251,7 +236,6 @@ SQL Json 10294 / 10325 0.1 98
SQL Parquet Vectorized 72 / 85 14.5 69.0 110.3X
SQL Parquet MR 237 / 241 4.4 226.4 33.6X
SQL ORC Vectorized 82 / 92 12.7 78.5 97.0X
-SQL ORC Vectorized with copy 82 / 88 12.7 78.5 97.0X
SQL ORC MR 900 / 909 1.2 858.5 8.9X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -263,7 +247,6 @@ SQL Json 18813 / 18827 0.1 179
SQL Parquet Vectorized 107 / 111 9.8 101.8 126.3X
SQL Parquet MR 275 / 286 3.8 262.3 49.0X
SQL ORC Vectorized 107 / 115 9.8 101.7 126.4X
-SQL ORC Vectorized with copy 107 / 115 9.8 102.3 125.8X
SQL ORC MR 1659 / 1664 0.6 1582.3 8.1X
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 7dc90df..efca96e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.orc;
import java.io.IOException;
-import java.util.stream.IntStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
@@ -31,16 +30,11 @@ import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcInputFormat;
-import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
-import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
-import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
-import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -77,21 +71,10 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
@VisibleForTesting
public ColumnarBatch columnarBatch;
- // Writable column vectors of the result columnar batch.
- private WritableColumnVector[] columnVectors;
-
- // The wrapped ORC column vectors. It should be null if `copyToSpark` is true.
+ // The wrapped ORC column vectors.
private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
- // The memory mode of the columnarBatch
- private final MemoryMode MEMORY_MODE;
-
- // Whether or not to copy the ORC columnar batch to Spark columnar batch.
- private final boolean copyToSpark;
-
- public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark, int capacity) {
- MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
- this.copyToSpark = copyToSpark;
+ public OrcColumnarBatchReader(int capacity) {
this.capacity = capacity;
}
@@ -177,53 +160,32 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
this.requestedDataColIds = requestedDataColIds;
StructType resultSchema = new StructType(requiredFields);
- if (copyToSpark) {
- if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
- columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
- } else {
- columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
- }
-
- // Initialize the partition columns and missing columns once.
- for (int i = 0; i < requiredFields.length; i++) {
- if (requestedPartitionColIds[i] != -1) {
- ColumnVectorUtils.populate(columnVectors[i],
- partitionValues, requestedPartitionColIds[i]);
- columnVectors[i].setIsConstant();
- } else if (requestedDataColIds[i] == -1) {
- columnVectors[i].putNulls(0, capacity);
- columnVectors[i].setIsConstant();
- }
- }
- columnarBatch = new ColumnarBatch(columnVectors);
- } else {
- // Just wrap the ORC column vector instead of copying it to Spark column vector.
- orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
+ // Just wrap the ORC column vector instead of copying it to Spark column vector.
+ orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
- for (int i = 0; i < requiredFields.length; i++) {
- DataType dt = requiredFields[i].dataType();
- if (requestedPartitionColIds[i] != -1) {
- OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
- ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
- partitionCol.setIsConstant();
- orcVectorWrappers[i] = partitionCol;
+ for (int i = 0; i < requiredFields.length; i++) {
+ DataType dt = requiredFields[i].dataType();
+ if (requestedPartitionColIds[i] != -1) {
+ OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
+ ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
+ partitionCol.setIsConstant();
+ orcVectorWrappers[i] = partitionCol;
+ } else {
+ int colId = requestedDataColIds[i];
+ // Initialize the missing columns once.
+ if (colId == -1) {
+ OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+ missingCol.putNulls(0, capacity);
+ missingCol.setIsConstant();
+ orcVectorWrappers[i] = missingCol;
} else {
- int colId = requestedDataColIds[i];
- // Initialize the missing columns once.
- if (colId == -1) {
- OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
- missingCol.putNulls(0, capacity);
- missingCol.setIsConstant();
- orcVectorWrappers[i] = missingCol;
- } else {
- orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
- }
+ orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
}
}
-
- columnarBatch = new ColumnarBatch(orcVectorWrappers);
}
+
+ columnarBatch = new ColumnarBatch(orcVectorWrappers);
}
/**
@@ -238,325 +200,11 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
}
columnarBatch.setNumRows(batchSize);
- if (!copyToSpark) {
- for (int i = 0; i < requiredFields.length; i++) {
- if (requestedDataColIds[i] != -1) {
- ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
- }
- }
- return true;
- }
-
- for (WritableColumnVector vector : columnVectors) {
- vector.reset();
- }
-
for (int i = 0; i < requiredFields.length; i++) {
- StructField field = requiredFields[i];
- WritableColumnVector toColumn = columnVectors[i];
-
- if (requestedDataColIds[i] >= 0) {
- ColumnVector fromColumn = batch.cols[requestedDataColIds[i]];
-
- if (fromColumn.isRepeating) {
- putRepeatingValues(batchSize, field, fromColumn, toColumn);
- } else if (fromColumn.noNulls) {
- putNonNullValues(batchSize, field, fromColumn, toColumn);
- } else {
- putValues(batchSize, field, fromColumn, toColumn);
- }
+ if (requestedDataColIds[i] != -1) {
+ ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
}
}
return true;
}
-
- private void putRepeatingValues(
- int batchSize,
- StructField field,
- ColumnVector fromColumn,
- WritableColumnVector toColumn) {
- if (fromColumn.isNull[0]) {
- toColumn.putNulls(0, batchSize);
- } else {
- DataType type = field.dataType();
- if (type instanceof BooleanType) {
- toColumn.putBooleans(0, batchSize, ((LongColumnVector)fromColumn).vector[0] == 1);
- } else if (type instanceof ByteType) {
- toColumn.putBytes(0, batchSize, (byte)((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof ShortType) {
- toColumn.putShorts(0, batchSize, (short)((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof IntegerType || type instanceof DateType) {
- toColumn.putInts(0, batchSize, (int)((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof LongType) {
- toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof TimestampType) {
- toColumn.putLongs(0, batchSize,
- fromTimestampColumnVector((TimestampColumnVector)fromColumn, 0));
- } else if (type instanceof FloatType) {
- toColumn.putFloats(0, batchSize, (float)((DoubleColumnVector)fromColumn).vector[0]);
- } else if (type instanceof DoubleType) {
- toColumn.putDoubles(0, batchSize, ((DoubleColumnVector)fromColumn).vector[0]);
- } else if (type instanceof StringType || type instanceof BinaryType) {
- BytesColumnVector data = (BytesColumnVector)fromColumn;
- int size = data.vector[0].length;
- toColumn.arrayData().reserve(size);
- toColumn.arrayData().putBytes(0, size, data.vector[0], 0);
- for (int index = 0; index < batchSize; index++) {
- toColumn.putArray(index, 0, size);
- }
- } else if (type instanceof DecimalType) {
- DecimalType decimalType = (DecimalType)type;
- putDecimalWritables(
- toColumn,
- batchSize,
- decimalType.precision(),
- decimalType.scale(),
- ((DecimalColumnVector)fromColumn).vector[0]);
- } else {
- throw new UnsupportedOperationException("Unsupported Data Type: " + type);
- }
- }
- }
-
- private void putNonNullValues(
- int batchSize,
- StructField field,
- ColumnVector fromColumn,
- WritableColumnVector toColumn) {
- DataType type = field.dataType();
- if (type instanceof BooleanType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putBoolean(index, data[index] == 1);
- }
- } else if (type instanceof ByteType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putByte(index, (byte)data[index]);
- }
- } else if (type instanceof ShortType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putShort(index, (short)data[index]);
- }
- } else if (type instanceof IntegerType || type instanceof DateType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putInt(index, (int)data[index]);
- }
- } else if (type instanceof LongType) {
- toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector, 0);
- } else if (type instanceof TimestampType) {
- TimestampColumnVector data = ((TimestampColumnVector)fromColumn);
- for (int index = 0; index < batchSize; index++) {
- toColumn.putLong(index, fromTimestampColumnVector(data, index));
- }
- } else if (type instanceof FloatType) {
- double[] data = ((DoubleColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putFloat(index, (float)data[index]);
- }
- } else if (type instanceof DoubleType) {
- toColumn.putDoubles(0, batchSize, ((DoubleColumnVector)fromColumn).vector, 0);
- } else if (type instanceof StringType || type instanceof BinaryType) {
- BytesColumnVector data = ((BytesColumnVector)fromColumn);
- WritableColumnVector arrayData = toColumn.arrayData();
- int totalNumBytes = IntStream.of(data.length).sum();
- arrayData.reserve(totalNumBytes);
- for (int index = 0, pos = 0; index < batchSize; pos += data.length[index], index++) {
- arrayData.putBytes(pos, data.length[index], data.vector[index], data.start[index]);
- toColumn.putArray(index, pos, data.length[index]);
- }
- } else if (type instanceof DecimalType) {
- DecimalType decimalType = (DecimalType)type;
- DecimalColumnVector data = ((DecimalColumnVector)fromColumn);
- if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
- toColumn.arrayData().reserve(batchSize * 16);
- }
- for (int index = 0; index < batchSize; index++) {
- putDecimalWritable(
- toColumn,
- index,
- decimalType.precision(),
- decimalType.scale(),
- data.vector[index]);
- }
- } else {
- throw new UnsupportedOperationException("Unsupported Data Type: " + type);
- }
- }
-
- private void putValues(
- int batchSize,
- StructField field,
- ColumnVector fromColumn,
- WritableColumnVector toColumn) {
- DataType type = field.dataType();
- if (type instanceof BooleanType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putBoolean(index, vector[index] == 1);
- }
- }
- } else if (type instanceof ByteType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putByte(index, (byte)vector[index]);
- }
- }
- } else if (type instanceof ShortType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putShort(index, (short)vector[index]);
- }
- }
- } else if (type instanceof IntegerType || type instanceof DateType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putInt(index, (int)vector[index]);
- }
- }
- } else if (type instanceof LongType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putLong(index, vector[index]);
- }
- }
- } else if (type instanceof TimestampType) {
- TimestampColumnVector vector = ((TimestampColumnVector)fromColumn);
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putLong(index, fromTimestampColumnVector(vector, index));
- }
- }
- } else if (type instanceof FloatType) {
- double[] vector = ((DoubleColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putFloat(index, (float)vector[index]);
- }
- }
- } else if (type instanceof DoubleType) {
- double[] vector = ((DoubleColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putDouble(index, vector[index]);
- }
- }
- } else if (type instanceof StringType || type instanceof BinaryType) {
- BytesColumnVector vector = (BytesColumnVector)fromColumn;
- WritableColumnVector arrayData = toColumn.arrayData();
- int totalNumBytes = IntStream.of(vector.length).sum();
- arrayData.reserve(totalNumBytes);
- for (int index = 0, pos = 0; index < batchSize; pos += vector.length[index], index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- arrayData.putBytes(pos, vector.length[index], vector.vector[index], vector.start[index]);
- toColumn.putArray(index, pos, vector.length[index]);
- }
- }
- } else if (type instanceof DecimalType) {
- DecimalType decimalType = (DecimalType)type;
- HiveDecimalWritable[] vector = ((DecimalColumnVector)fromColumn).vector;
- if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
- toColumn.arrayData().reserve(batchSize * 16);
- }
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- putDecimalWritable(
- toColumn,
- index,
- decimalType.precision(),
- decimalType.scale(),
- vector[index]);
- }
- }
- } else {
- throw new UnsupportedOperationException("Unsupported Data Type: " + type);
- }
- }
-
- /**
- * Returns the number of micros since epoch from an element of TimestampColumnVector.
- */
- private static long fromTimestampColumnVector(TimestampColumnVector vector, int index) {
- return vector.time[index] * 1000 + (vector.nanos[index] / 1000 % 1000);
- }
-
- /**
- * Put a `HiveDecimalWritable` to a `WritableColumnVector`.
- */
- private static void putDecimalWritable(
- WritableColumnVector toColumn,
- int index,
- int precision,
- int scale,
- HiveDecimalWritable decimalWritable) {
- HiveDecimal decimal = decimalWritable.getHiveDecimal();
- Decimal value =
- Decimal.apply(decimal.bigDecimalValue(), decimal.precision(), decimal.scale());
- value.changePrecision(precision, scale);
-
- if (precision <= Decimal.MAX_INT_DIGITS()) {
- toColumn.putInt(index, (int) value.toUnscaledLong());
- } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
- toColumn.putLong(index, value.toUnscaledLong());
- } else {
- byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
- toColumn.arrayData().putBytes(index * 16, bytes.length, bytes, 0);
- toColumn.putArray(index, index * 16, bytes.length);
- }
- }
-
- /**
- * Put `HiveDecimalWritable`s to a `WritableColumnVector`.
- */
- private static void putDecimalWritables(
- WritableColumnVector toColumn,
- int size,
- int precision,
- int scale,
- HiveDecimalWritable decimalWritable) {
- HiveDecimal decimal = decimalWritable.getHiveDecimal();
- Decimal value =
- Decimal.apply(decimal.bigDecimalValue(), decimal.precision(), decimal.scale());
- value.changePrecision(precision, scale);
-
- if (precision <= Decimal.MAX_INT_DIGITS()) {
- toColumn.putInts(0, size, (int) value.toUnscaledLong());
- } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
- toColumn.putLongs(0, size, value.toUnscaledLong());
- } else {
- byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
- toColumn.arrayData().reserve(bytes.length);
- toColumn.arrayData().putBytes(0, bytes.length, bytes, 0);
- for (int index = 0; index < size; index++) {
- toColumn.putArray(index, 0, bytes.length);
- }
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index cd10ad2..14779cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -162,10 +161,8 @@ class OrcFileFormat
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
val sqlConf = sparkSession.sessionState.conf
- val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
val capacity = sqlConf.orcVectorizedReaderBatchSize
- val copyToSpark = sparkSession.sessionState.conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK)
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -197,10 +194,8 @@ class OrcFileFormat
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
- val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
- val batchReader = new OrcColumnarBatchReader(
- enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity)
+ val batchReader = new OrcColumnarBatchReader(capacity)
// SPARK-23399 Register a task completion listener first to call `close()` in all cases.
// There is a possibility that `initialize` and `initBatch` hit some errors (like OOM)
// after opening a file.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index ecd9ead..aca7081 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -56,7 +56,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
// Set default configs. Individual cases will change them if necessary.
spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
- spark.conf.set(SQLConf.ORC_COPY_BATCH_TO_SPARK.key, "false")
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
@@ -139,12 +138,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(id) FROM orcTable").collect()
}
- sqlBenchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM orcTable").collect()
- }
- }
-
sqlBenchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(id) FROM orcTable").collect()
@@ -261,12 +254,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(c1), sum(length(c2)) FROM orcTable").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(c1), sum(length(c2)) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(c1), sum(length(c2)) FROM orcTable").collect()
@@ -312,12 +299,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("select sum(length(c1)) from orcTable").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("select sum(length(c1)) from orcTable").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("select sum(length(c1)) from orcTable").collect()
@@ -361,12 +342,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(id) FROM orcTable").collect()
}
- benchmark.addCase("Data column - ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("Data column - ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(id) FROM orcTable").collect()
@@ -395,12 +370,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(p) FROM orcTable").collect()
}
- benchmark.addCase("Partition column - ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("Partition column - ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(p) FROM orcTable").collect()
@@ -429,12 +398,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(p), sum(id) FROM orcTable").collect()
}
- benchmark.addCase("Both column - ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p), sum(id) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("Both columns - ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(p), sum(id) FROM orcTable").collect()
@@ -513,13 +476,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
"WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT SUM(LENGTH(c2)) FROM orcTable " +
- "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT SUM(LENGTH(c2)) FROM orcTable " +
@@ -570,12 +526,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql(s"SELECT sum(c$middle) FROM orcTable").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql(s"SELECT sum(c$middle) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql(s"SELECT sum(c$middle) FROM orcTable").collect()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
index 52abeb2..c16fcc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -43,7 +43,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with Share
requestedDataColIds: Array[Int],
requestedPartitionColIds: Array[Int],
resultFields: Array[StructField]): OrcColumnarBatchReader = {
- val reader = new OrcColumnarBatchReader(false, false, 4096)
+ val reader = new OrcColumnarBatchReader(4096)
reader.initBatch(
orcFileSchema,
resultFields,
diff --git a/sql/hive/benchmarks/OrcReadBenchmark-results.txt b/sql/hive/benchmarks/OrcReadBenchmark-results.txt
index 80c2f5e..caa78b9 100644
--- a/sql/hive/benchmarks/OrcReadBenchmark-results.txt
+++ b/sql/hive/benchmarks/OrcReadBenchmark-results.txt
@@ -8,7 +8,6 @@ SQL Single TINYINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1725 / 1759 9.1 109.7 1.0X
Native ORC Vectorized 272 / 316 57.8 17.3 6.3X
-Native ORC Vectorized with copy 239 / 254 65.7 15.2 7.2X
Hive built-in ORC 1970 / 1987 8.0 125.3 0.9X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -17,7 +16,6 @@ SQL Single SMALLINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1633 / 1672 9.6 103.8 1.0X
Native ORC Vectorized 238 / 255 66.0 15.1 6.9X
-Native ORC Vectorized with copy 235 / 253 66.8 15.0 6.9X
Hive built-in ORC 2293 / 2305 6.9 145.8 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -26,7 +24,6 @@ SQL Single INT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1677 / 1699 9.4 106.6 1.0X
Native ORC Vectorized 325 / 342 48.3 20.7 5.2X
-Native ORC Vectorized with copy 328 / 341 47.9 20.9 5.1X
Hive built-in ORC 2561 / 2569 6.1 162.8 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -35,7 +32,6 @@ SQL Single BIGINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1791 / 1795 8.8 113.9 1.0X
Native ORC Vectorized 400 / 408 39.3 25.4 4.5X
-Native ORC Vectorized with copy 410 / 417 38.4 26.1 4.4X
Hive built-in ORC 2713 / 2720 5.8 172.5 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -44,7 +40,6 @@ SQL Single FLOAT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1791 / 1805 8.8 113.8 1.0X
Native ORC Vectorized 433 / 438 36.3 27.5 4.1X
-Native ORC Vectorized with copy 441 / 447 35.7 28.0 4.1X
Hive built-in ORC 2690 / 2803 5.8 171.0 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -53,7 +48,6 @@ SQL Single DOUBLE Column Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1911 / 1930 8.2 121.5 1.0X
Native ORC Vectorized 543 / 552 29.0 34.5 3.5X
-Native ORC Vectorized with copy 547 / 555 28.8 34.8 3.5X
Hive built-in ORC 2967 / 3065 5.3 188.6 0.6X
@@ -67,7 +61,6 @@ Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 4160 / 4188 2.5 396.7 1.0X
Native ORC Vectorized 2405 / 2406 4.4 229.4 1.7X
-Native ORC Vectorized with copy 2588 / 2592 4.1 246.8 1.6X
Hive built-in ORC 5514 / 5562 1.9 525.9 0.8X
@@ -81,15 +74,12 @@ Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Data column - Native ORC MR 1863 / 1867 8.4 118.4 1.0X
Data column - Native ORC Vectorized 411 / 418 38.2 26.2 4.5X
-Data column - Native ORC Vectorized with copy 417 / 422 37.8 26.5 4.5X
Data column - Hive built-in ORC 3297 / 3308 4.8 209.6 0.6X
Partition column - Native ORC MR 1505 / 1506 10.4 95.7 1.2X
Partition column - Native ORC Vectorized 80 / 93 195.6 5.1 23.2X
-Partition column - Native ORC Vectorized with copy 78 / 86 201.4 5.0 23.9X
Partition column - Hive built-in ORC 1960 / 1979 8.0 124.6 1.0X
Both columns - Native ORC MR 2076 / 2090 7.6 132.0 0.9X
Both columns - Native ORC Vectorized 450 / 463 34.9 28.6 4.1X
-Both column - Native ORC Vectorized with copy 532 / 538 29.6 33.8 3.5X
Both columns - Hive built-in ORC 3528 / 3548 4.5 224.3 0.5X
@@ -103,7 +93,6 @@ Repeated String: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1727 / 1733 6.1 164.7 1.0X
Native ORC Vectorized 375 / 379 28.0 35.7 4.6X
-Native ORC Vectorized with copy 552 / 556 19.0 52.6 3.1X
Hive built-in ORC 2665 / 2666 3.9 254.2 0.6X
@@ -117,7 +106,6 @@ String with Nulls Scan (0.0%): Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 3324 / 3325 3.2 317.0 1.0X
Native ORC Vectorized 1085 / 1106 9.7 103.4 3.1X
-Native ORC Vectorized with copy 1463 / 1471 7.2 139.5 2.3X
Hive built-in ORC 5272 / 5299 2.0 502.8 0.6X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -126,7 +114,6 @@ String with Nulls Scan (50.0%): Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 3045 / 3046 3.4 290.4 1.0X
Native ORC Vectorized 1248 / 1260 8.4 119.0 2.4X
-Native ORC Vectorized with copy 1609 / 1624 6.5 153.5 1.9X
Hive built-in ORC 3989 / 3999 2.6 380.4 0.8X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -135,7 +122,6 @@ String with Nulls Scan (95.0%): Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1692 / 1694 6.2 161.3 1.0X
Native ORC Vectorized 471 / 493 22.3 44.9 3.6X
-Native ORC Vectorized with copy 588 / 590 17.8 56.1 2.9X
Hive built-in ORC 2398 / 2411 4.4 228.7 0.7X
@@ -149,7 +135,6 @@ Single Column Scan from 100 columns: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1371 / 1379 0.8 1307.5 1.0X
Native ORC Vectorized 121 / 135 8.6 115.8 11.3X
-Native ORC Vectorized with copy 122 / 138 8.6 116.2 11.3X
Hive built-in ORC 521 / 561 2.0 497.1 2.6X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -158,7 +143,6 @@ Single Column Scan from 200 columns: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 2711 / 2767 0.4 2585.5 1.0X
Native ORC Vectorized 210 / 232 5.0 200.5 12.9X
-Native ORC Vectorized with copy 208 / 219 5.0 198.4 13.0X
Hive built-in ORC 764 / 775 1.4 728.3 3.5X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -167,7 +151,6 @@ Single Column Scan from 300 columns: Best/Avg Time(ms) Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 3979 / 3988 0.3 3794.4 1.0X
Native ORC Vectorized 357 / 366 2.9 340.2 11.2X
-Native ORC Vectorized with copy 361 / 371 2.9 344.5 11.0X
Hive built-in ORC 1091 / 1095 1.0 1040.5 3.6X
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
index eb3cde8..c03ae14 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -96,12 +96,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
}
@@ -133,12 +127,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT sum(c1), sum(length(c2)) FROM hiveOrcTable").collect()
}
@@ -168,12 +156,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Data column - Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Data column - Hive built-in ORC") { _ =>
spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
}
@@ -188,12 +170,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Partition column - Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Partition column - Hive built-in ORC") { _ =>
spark.sql("SELECT sum(p) FROM hiveOrcTable").collect()
}
@@ -208,12 +184,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Both column - Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Both columns - Hive built-in ORC") { _ =>
spark.sql("SELECT sum(p), sum(id) FROM hiveOrcTable").collect()
}
@@ -242,12 +212,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT sum(length(c1)) FROM hiveOrcTable").collect()
}
@@ -284,13 +248,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
"WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
- "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT SUM(LENGTH(c2)) FROM hiveOrcTable " +
"WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
@@ -324,12 +281,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper {
spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql(s"SELECT sum(c$middle) FROM hiveOrcTable").collect()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org