You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/01 04:56:20 UTC
spark git commit: [SPARK-23188][SQL] Make vectorized columar reader
batch size configurable
Repository: spark
Updated Branches:
refs/heads/master b2e7677f4 -> cc41245fa
[SPARK-23188][SQL] Make vectorized columar reader batch size configurable
## What changes were proposed in this pull request?
This PR include the following changes:
- Make the capacity of `VectorizedParquetRecordReader` configurable;
- Make the capacity of `OrcColumnarBatchReader` configurable;
- Update the error message when required capacity in writable columnar vector cannot be fulfilled.
## How was this patch tested?
N/A
Author: Xingbo Jiang <xi...@databricks.com>
Closes #20361 from jiangxb1987/vectorCapacity.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc41245f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc41245f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc41245f
Branch: refs/heads/master
Commit: cc41245fa3f954f961541bf4b4275c28473042b8
Parents: b2e7677
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Thu Feb 1 12:56:07 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 1 12:56:07 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++++++++
.../datasources/orc/OrcColumnarBatchReader.java | 22 +++++++++++---------
.../parquet/VectorizedParquetRecordReader.java | 20 ++++++++----------
.../vectorized/WritableColumnVector.java | 7 +++++--
.../datasources/orc/OrcFileFormat.scala | 3 ++-
.../datasources/parquet/ParquetFileFormat.scala | 3 ++-
.../parquet/ParquetEncodingSuite.scala | 12 ++++++++---
.../datasources/parquet/ParquetIOSuite.scala | 21 +++++++++++++------
.../parquet/ParquetReadBenchmark.scala | 11 +++++++---
9 files changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7394a0d..90654e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -375,6 +375,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
+ .doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
+ "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
+ .intConf
+ .createWithDefault(4096)
+
val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
@@ -398,6 +404,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val ORC_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.orc.columnarReaderBatchSize")
+ .doc("The number of rows to include in a orc vectorized reader batch. The number should " +
+ "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
+ .intConf
+ .createWithDefault(4096)
+
val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
.doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the " +
"vectorized ORC reader.")
@@ -1250,10 +1262,14 @@ class SQLConf extends Serializable with Logging {
def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
+ def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE)
+
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
+ def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
+
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 5e7cad4..dcebdc3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -49,8 +49,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
* After creating, `initialize` and `initBatch` should be called sequentially.
*/
public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
- // TODO: make this configurable.
- private static final int CAPACITY = 4 * 1024;
+
+ // The capacity of vectorized batch.
+ private int capacity;
// Vectorized ORC Row Batch
private VectorizedRowBatch batch;
@@ -81,9 +82,10 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
// Whether or not to copy the ORC columnar batch to Spark columnar batch.
private final boolean copyToSpark;
- public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark) {
+ public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark, int capacity) {
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.copyToSpark = copyToSpark;
+ this.capacity = capacity;
}
@@ -148,7 +150,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
StructField[] requiredFields,
StructType partitionSchema,
InternalRow partitionValues) {
- batch = orcSchema.createRowBatch(CAPACITY);
+ batch = orcSchema.createRowBatch(capacity);
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
this.requiredFields = requiredFields;
@@ -162,15 +164,15 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
if (copyToSpark) {
if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
- columnVectors = OffHeapColumnVector.allocateColumns(CAPACITY, resultSchema);
+ columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
} else {
- columnVectors = OnHeapColumnVector.allocateColumns(CAPACITY, resultSchema);
+ columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
}
// Initialize the missing columns once.
for (int i = 0; i < requiredFields.length; i++) {
if (requestedColIds[i] == -1) {
- columnVectors[i].putNulls(0, CAPACITY);
+ columnVectors[i].putNulls(0, capacity);
columnVectors[i].setIsConstant();
}
}
@@ -193,8 +195,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
int colId = requestedColIds[i];
// Initialize the missing columns once.
if (colId == -1) {
- OnHeapColumnVector missingCol = new OnHeapColumnVector(CAPACITY, dt);
- missingCol.putNulls(0, CAPACITY);
+ OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+ missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
} else {
@@ -206,7 +208,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
int partitionIdx = requiredFields.length;
for (int i = 0; i < partitionValues.numFields(); i++) {
DataType dt = partitionSchema.fields()[i].dataType();
- OnHeapColumnVector partitionCol = new OnHeapColumnVector(CAPACITY, dt);
+ OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
ColumnVectorUtils.populate(partitionCol, partitionValues, i);
partitionCol.setIsConstant();
orcVectorWrappers[partitionIdx + i] = partitionCol;
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index bb1b236..5934a23 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -50,8 +50,9 @@ import org.apache.spark.sql.types.StructType;
* TODO: make this always return ColumnarBatches.
*/
public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
- // TODO: make this configurable.
- private static final int CAPACITY = 4 * 1024;
+
+ // The capacity of vectorized batch.
+ private int capacity;
/**
* Batch of rows that we assemble and the current index we've returned. Every time this
@@ -115,13 +116,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private final MemoryMode MEMORY_MODE;
- public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap) {
+ public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int capacity) {
this.convertTz = convertTz;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
- }
-
- public VectorizedParquetRecordReader(boolean useOffHeap) {
- this(null, useOffHeap);
+ this.capacity = capacity;
}
/**
@@ -199,9 +197,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
}
if (memMode == MemoryMode.OFF_HEAP) {
- columnVectors = OffHeapColumnVector.allocateColumns(CAPACITY, batchSchema);
+ columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
} else {
- columnVectors = OnHeapColumnVector.allocateColumns(CAPACITY, batchSchema);
+ columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
}
columnarBatch = new ColumnarBatch(columnVectors);
if (partitionColumns != null) {
@@ -215,7 +213,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
// Initialize missing columns with nulls.
for (int i = 0; i < missingColumns.length; i++) {
if (missingColumns[i]) {
- columnVectors[i].putNulls(0, CAPACITY);
+ columnVectors[i].putNulls(0, capacity);
columnVectors[i].setIsConstant();
}
}
@@ -257,7 +255,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();
- int num = (int) Math.min((long) CAPACITY, totalCountLoadedSoFar - rowsReturned);
+ int num = (int) Math.min((long) capacity, totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) continue;
columnReaders[i].readBatch(num, columnVectors[i]);
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index c2e5954..9d447cd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -98,8 +98,11 @@ public abstract class WritableColumnVector extends ColumnVector {
private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
String message = "Cannot reserve additional contiguous bytes in the vectorized reader " +
"(requested = " + requiredCapacity + " bytes). As a workaround, you can disable the " +
- "vectorized reader by setting " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
- " to false.";
+ "vectorized reader, or increase the vectorized reader batch size. For parquet file " +
+ "format, refer to " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " and " +
+ SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().key() + "; for orc file format, refer to " +
+ SQLConf.ORC_VECTORIZED_READER_ENABLED().key() + " and " +
+ SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().key() + ".";
throw new RuntimeException(message, cause);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2dd314d..dbf3bc6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -151,6 +151,7 @@ class OrcFileFormat
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
+ val capacity = sqlConf.orcVectorizedReaderBatchSize
val copyToSpark = sparkSession.sessionState.conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK)
val broadcastedConf =
@@ -186,7 +187,7 @@ class OrcFileFormat
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val batchReader = new OrcColumnarBatchReader(
- enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
+ enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity)
batchReader.initialize(fileSplit, taskAttemptContext)
batchReader.initBatch(
reader.getSchema,
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f53a97b..ba69f9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -350,6 +350,7 @@ class ParquetFileFormat
sparkSession.sessionState.conf.parquetRecordFilterEnabled
val timestampConversion: Boolean =
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
+ val capacity = sqlConf.parquetVectorizedReaderBatchSize
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
@@ -396,7 +397,7 @@ class ParquetFileFormat
val taskContext = Option(TaskContext.get())
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
- convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined)
+ convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index edb1290..db73bfa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -40,7 +40,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -65,7 +67,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
data.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -94,7 +98,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file, null /* set columns to null to project all columns */)
val column = reader.resultBatch().column(0)
assert(reader.nextBatch())
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index f3ece5b..3af8093 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -653,7 +653,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
{
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, null)
val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -670,7 +672,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Project just one column
{
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, ("_2" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String)]
@@ -686,7 +690,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Project columns in opposite order
{
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -703,7 +709,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Empty projection
{
- val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val reader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
try {
reader.initialize(file, List[String]().asJava)
var result = 0
@@ -742,8 +750,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
- val vectorizedReader =
- new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+ val conf = sqlContext.conf
+ val vectorizedReader = new VectorizedParquetRecordReader(
+ null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
val partitionValues = new GenericInternalRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 86a3c71..e43336d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -76,6 +76,7 @@ object ParquetReadBenchmark {
withTempPath { dir =>
withTempTable("t1", "tempTable") {
val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
+ val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
spark.range(values).createOrReplaceTempView("t1")
spark.sql("select cast(id as INT) as id from t1")
.write.parquet(dir.getCanonicalPath)
@@ -96,7 +97,8 @@ object ParquetReadBenchmark {
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
+ val reader = new VectorizedParquetRecordReader(
+ null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -119,7 +121,8 @@ object ParquetReadBenchmark {
parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
+ val reader = new VectorizedParquetRecordReader(
+ null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -262,6 +265,7 @@ object ParquetReadBenchmark {
withTempPath { dir =>
withTempTable("t1", "tempTable") {
val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
+ val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
spark.range(values).createOrReplaceTempView("t1")
spark.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " +
s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1")
@@ -279,7 +283,8 @@ object ParquetReadBenchmark {
benchmark.addCase("PR Vectorized") { num =>
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
+ val reader = new VectorizedParquetRecordReader(
+ null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org