You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/01 04:56:20 UTC

spark git commit: [SPARK-23188][SQL] Make vectorized columar reader batch size configurable

Repository: spark
Updated Branches:
  refs/heads/master b2e7677f4 -> cc41245fa


[SPARK-23188][SQL] Make vectorized columar reader batch size configurable

## What changes were proposed in this pull request?

This PR include the following changes:
- Make the capacity of `VectorizedParquetRecordReader` configurable;
- Make the capacity of `OrcColumnarBatchReader` configurable;
- Update the error message when required capacity in writable columnar vector cannot be fulfilled.

## How was this patch tested?

N/A

Author: Xingbo Jiang <xi...@databricks.com>

Closes #20361 from jiangxb1987/vectorCapacity.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc41245f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc41245f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc41245f

Branch: refs/heads/master
Commit: cc41245fa3f954f961541bf4b4275c28473042b8
Parents: b2e7677
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Thu Feb 1 12:56:07 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 1 12:56:07 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++++++++
 .../datasources/orc/OrcColumnarBatchReader.java | 22 +++++++++++---------
 .../parquet/VectorizedParquetRecordReader.java  | 20 ++++++++----------
 .../vectorized/WritableColumnVector.java        |  7 +++++--
 .../datasources/orc/OrcFileFormat.scala         |  3 ++-
 .../datasources/parquet/ParquetFileFormat.scala |  3 ++-
 .../parquet/ParquetEncodingSuite.scala          | 12 ++++++++---
 .../datasources/parquet/ParquetIOSuite.scala    | 21 +++++++++++++------
 .../parquet/ParquetReadBenchmark.scala          | 11 +++++++---
 9 files changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7394a0d..90654e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -375,6 +375,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
+    .doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
+      "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
+    .intConf
+    .createWithDefault(4096)
+
   val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
     .doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
       "`orc.compress` is specified in the table-specific options/properties, the precedence " +
@@ -398,6 +404,12 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.orc.columnarReaderBatchSize")
+    .doc("The number of rows to include in a orc vectorized reader batch. The number should " +
+      "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
+    .intConf
+    .createWithDefault(4096)
+
   val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
     .doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the " +
       "vectorized ORC reader.")
@@ -1250,10 +1262,14 @@ class SQLConf extends Serializable with Logging {
 
   def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
 
+  def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE)
+
   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
 
   def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
 
+  def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
+
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
   def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 5e7cad4..dcebdc3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -49,8 +49,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
  * After creating, `initialize` and `initBatch` should be called sequentially.
  */
 public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
-  // TODO: make this configurable.
-  private static final int CAPACITY = 4 * 1024;
+
+  // The capacity of vectorized batch.
+  private int capacity;
 
   // Vectorized ORC Row Batch
   private VectorizedRowBatch batch;
@@ -81,9 +82,10 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
   // Whether or not to copy the ORC columnar batch to Spark columnar batch.
   private final boolean copyToSpark;
 
-  public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark) {
+  public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark, int capacity) {
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
     this.copyToSpark = copyToSpark;
+    this.capacity = capacity;
   }
 
 
@@ -148,7 +150,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
       StructField[] requiredFields,
       StructType partitionSchema,
       InternalRow partitionValues) {
-    batch = orcSchema.createRowBatch(CAPACITY);
+    batch = orcSchema.createRowBatch(capacity);
     assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
 
     this.requiredFields = requiredFields;
@@ -162,15 +164,15 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
 
     if (copyToSpark) {
       if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
-        columnVectors = OffHeapColumnVector.allocateColumns(CAPACITY, resultSchema);
+        columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
       } else {
-        columnVectors = OnHeapColumnVector.allocateColumns(CAPACITY, resultSchema);
+        columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
       }
 
       // Initialize the missing columns once.
       for (int i = 0; i < requiredFields.length; i++) {
         if (requestedColIds[i] == -1) {
-          columnVectors[i].putNulls(0, CAPACITY);
+          columnVectors[i].putNulls(0, capacity);
           columnVectors[i].setIsConstant();
         }
       }
@@ -193,8 +195,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
         int colId = requestedColIds[i];
         // Initialize the missing columns once.
         if (colId == -1) {
-          OnHeapColumnVector missingCol = new OnHeapColumnVector(CAPACITY, dt);
-          missingCol.putNulls(0, CAPACITY);
+          OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+          missingCol.putNulls(0, capacity);
           missingCol.setIsConstant();
           orcVectorWrappers[i] = missingCol;
         } else {
@@ -206,7 +208,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
         int partitionIdx = requiredFields.length;
         for (int i = 0; i < partitionValues.numFields(); i++) {
           DataType dt = partitionSchema.fields()[i].dataType();
-          OnHeapColumnVector partitionCol = new OnHeapColumnVector(CAPACITY, dt);
+          OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
           ColumnVectorUtils.populate(partitionCol, partitionValues, i);
           partitionCol.setIsConstant();
           orcVectorWrappers[partitionIdx + i] = partitionCol;

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index bb1b236..5934a23 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -50,8 +50,9 @@ import org.apache.spark.sql.types.StructType;
  * TODO: make this always return ColumnarBatches.
  */
 public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
-  // TODO: make this configurable.
-  private static final int CAPACITY = 4 * 1024;
+
+  // The capacity of vectorized batch.
+  private int capacity;
 
   /**
    * Batch of rows that we assemble and the current index we've returned. Every time this
@@ -115,13 +116,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    */
   private final MemoryMode MEMORY_MODE;
 
-  public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap) {
+  public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int capacity) {
     this.convertTz = convertTz;
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
-  }
-
-  public VectorizedParquetRecordReader(boolean useOffHeap) {
-    this(null, useOffHeap);
+    this.capacity = capacity;
   }
 
   /**
@@ -199,9 +197,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     }
 
     if (memMode == MemoryMode.OFF_HEAP) {
-      columnVectors = OffHeapColumnVector.allocateColumns(CAPACITY, batchSchema);
+      columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
     } else {
-      columnVectors = OnHeapColumnVector.allocateColumns(CAPACITY, batchSchema);
+      columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
     }
     columnarBatch = new ColumnarBatch(columnVectors);
     if (partitionColumns != null) {
@@ -215,7 +213,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     // Initialize missing columns with nulls.
     for (int i = 0; i < missingColumns.length; i++) {
       if (missingColumns[i]) {
-        columnVectors[i].putNulls(0, CAPACITY);
+        columnVectors[i].putNulls(0, capacity);
         columnVectors[i].setIsConstant();
       }
     }
@@ -257,7 +255,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     if (rowsReturned >= totalRowCount) return false;
     checkEndOfRowGroup();
 
-    int num = (int) Math.min((long) CAPACITY, totalCountLoadedSoFar - rowsReturned);
+    int num = (int) Math.min((long) capacity, totalCountLoadedSoFar - rowsReturned);
     for (int i = 0; i < columnReaders.length; ++i) {
       if (columnReaders[i] == null) continue;
       columnReaders[i].readBatch(num, columnVectors[i]);

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index c2e5954..9d447cd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -98,8 +98,11 @@ public abstract class WritableColumnVector extends ColumnVector {
   private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
     String message = "Cannot reserve additional contiguous bytes in the vectorized reader " +
         "(requested = " + requiredCapacity + " bytes). As a workaround, you can disable the " +
-        "vectorized reader by setting " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
-        " to false.";
+        "vectorized reader, or increase the vectorized reader batch size. For parquet file " +
+        "format, refer to " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " and " +
+        SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().key() + "; for orc file format, refer to " +
+        SQLConf.ORC_VECTORIZED_READER_ENABLED().key() + " and " +
+        SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().key() + ".";
     throw new RuntimeException(message, cause);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2dd314d..dbf3bc6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -151,6 +151,7 @@ class OrcFileFormat
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
     val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
+    val capacity = sqlConf.orcVectorizedReaderBatchSize
     val copyToSpark = sparkSession.sessionState.conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK)
 
     val broadcastedConf =
@@ -186,7 +187,7 @@ class OrcFileFormat
         val taskContext = Option(TaskContext.get())
         if (enableVectorizedReader) {
           val batchReader = new OrcColumnarBatchReader(
-            enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
+            enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity)
           batchReader.initialize(fileSplit, taskAttemptContext)
           batchReader.initBatch(
             reader.getSchema,

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f53a97b..ba69f9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -350,6 +350,7 @@ class ParquetFileFormat
       sparkSession.sessionState.conf.parquetRecordFilterEnabled
     val timestampConversion: Boolean =
       sparkSession.sessionState.conf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
     val returningBatch = supportBatch(sparkSession, resultSchema)
 
@@ -396,7 +397,7 @@ class ParquetFileFormat
       val taskContext = Option(TaskContext.get())
       val parquetReader = if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader(
-          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined)
+          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
         vectorizedReader.initialize(split, hadoopAttemptContext)
         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
         vectorizedReader.initBatch(partitionSchema, file.partitionValues)

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index edb1290..db73bfa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -40,7 +40,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -65,7 +67,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         data.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -94,7 +98,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
 
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file, null /* set columns to null to project all columns */)
         val column = reader.resultBatch().column(0)
         assert(reader.nextBatch())

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index f3ece5b..3af8093 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -653,7 +653,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
       val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
       {
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, null)
           val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -670,7 +672,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Project just one column
       {
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, ("_2" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String)]
@@ -686,7 +690,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Project columns in opposite order
       {
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -703,7 +709,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Empty projection
       {
-        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val reader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, List[String]().asJava)
           var result = 0
@@ -742,8 +750,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       dataTypes.zip(constantValues).foreach { case (dt, v) =>
         val schema = StructType(StructField("pcol", dt) :: Nil)
-        val vectorizedReader =
-          new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
+        val conf = sqlContext.conf
+        val vectorizedReader = new VectorizedParquetRecordReader(
+          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         val partitionValues = new GenericInternalRow(Array(v))
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc41245f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 86a3c71..e43336d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -76,6 +76,7 @@ object ParquetReadBenchmark {
     withTempPath { dir =>
       withTempTable("t1", "tempTable") {
         val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
+        val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
         spark.range(values).createOrReplaceTempView("t1")
         spark.sql("select cast(id as INT) as id from t1")
             .write.parquet(dir.getCanonicalPath)
@@ -96,7 +97,8 @@ object ParquetReadBenchmark {
         parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
+            val reader = new VectorizedParquetRecordReader(
+              null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -119,7 +121,8 @@ object ParquetReadBenchmark {
         parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
+            val reader = new VectorizedParquetRecordReader(
+              null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -262,6 +265,7 @@ object ParquetReadBenchmark {
     withTempPath { dir =>
       withTempTable("t1", "tempTable") {
         val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
+        val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
         spark.range(values).createOrReplaceTempView("t1")
         spark.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " +
           s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1")
@@ -279,7 +283,8 @@ object ParquetReadBenchmark {
         benchmark.addCase("PR Vectorized") { num =>
           var sum = 0
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
+            val reader = new VectorizedParquetRecordReader(
+              null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org