You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/20 11:40:59 UTC

spark git commit: [SPARK-20101][SQL] Use OffHeapColumnVector when "spark.sql.columnVector.offheap.enable" is set to "true"

Repository: spark
Updated Branches:
  refs/heads/master 57c5514de -> 3c3eebc87


[SPARK-20101][SQL] Use OffHeapColumnVector when "spark.sql.columnVector.offheap.enable" is set to "true"

This PR enables to use ``OffHeapColumnVector`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``. While ``ColumnVector`` has two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only ``OnHeapColumnVector`` is always used.

This PR implements the followings
- Pass ``OffHeapColumnVector`` to ``ColumnarBatch.allocate()`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``
- Free all of off-heap memory regions by ``OffHeapColumnVector.close()``
- Ensure to call ``OffHeapColumnVector.close()``

Use existing tests

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #17436 from kiszk/SPARK-20101.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c3eebc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c3eebc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c3eebc8

Branch: refs/heads/master
Commit: 3c3eebc8734e36e61f4627e2c517fbbe342b3b42
Parents: 57c5514
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Mon Nov 20 12:40:16 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Nov 20 12:40:26 2017 +0100

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  2 +-
 .../apache/spark/internal/config/package.scala  | 16 ++++++++++++++
 .../org/apache/spark/memory/MemoryManager.scala |  7 ++++---
 .../spark/memory/StaticMemoryManagerSuite.scala |  3 ++-
 .../memory/UnifiedMemoryManagerSuite.scala      |  7 ++++---
 .../storage/BlockManagerReplicationSuite.scala  |  3 ++-
 .../spark/storage/BlockManagerSuite.scala       |  2 +-
 .../org/apache/spark/ui/UISeleniumSuite.scala   |  3 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++++++
 .../parquet/VectorizedParquetRecordReader.java  | 12 +++++++----
 .../sql/execution/DataSourceScanExec.scala      |  3 ++-
 .../columnar/InMemoryTableScanExec.scala        | 17 +++++++++++++--
 .../sql/execution/datasources/FileFormat.scala  |  4 +++-
 .../datasources/parquet/ParquetFileFormat.scala | 22 ++++++++++++++------
 .../sql/execution/joins/HashedRelation.scala    |  7 ++++---
 .../UnsafeFixedWidthAggregationMapSuite.scala   |  3 ++-
 .../execution/UnsafeKVExternalSorterSuite.scala |  6 +++---
 .../benchmark/AggregateBenchmark.scala          |  7 ++++---
 .../parquet/ParquetEncodingSuite.scala          |  6 +++---
 .../datasources/parquet/ParquetIOSuite.scala    | 11 +++++-----
 .../parquet/ParquetReadBenchmark.scala          |  8 ++++---
 .../execution/joins/HashedRelationSuite.scala   |  9 ++++----
 22 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 57b3744..ee726df 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -655,7 +655,7 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
     "spark.yarn.max.executor.failures" -> Seq(
       AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
-    "spark.memory.offHeap.enabled" -> Seq(
+    MEMORY_OFFHEAP_ENABLED.key -> Seq(
       AlternateConfig("spark.unsafe.offHeap", "1.6")),
     "spark.rpc.message.maxSize" -> Seq(
       AlternateConfig("spark.akka.frameSize", "1.6")),

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 84315f5..7be4d6b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -80,6 +80,22 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createWithDefaultString("1g")
 
+  private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
+    .doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
+      "If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")
+    .withAlternative("spark.unsafe.offHeap")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val MEMORY_OFFHEAP_SIZE = ConfigBuilder("spark.memory.offHeap.size")
+    .doc("The absolute amount of memory in bytes which can be used for off-heap allocation. " +
+      "This setting has no impact on heap memory usage, so if your executors' total memory " +
+      "consumption must fit within some hard limit then be sure to shrink your JVM heap size " +
+      "accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.")
+    .bytesConf(ByteUnit.BYTE)
+    .checkValue(_ >= 0, "The off-heap memory size must not be negative")
+    .createWithDefault(0)
+
   private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
     .booleanConf.createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 82442cf..0641adc 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.storage.BlockId
 import org.apache.spark.storage.memory.MemoryStore
 import org.apache.spark.unsafe.Platform
@@ -54,7 +55,7 @@ private[spark] abstract class MemoryManager(
   onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
   onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
 
-  protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
+  protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
   protected[this] val offHeapStorageMemory =
     (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
 
@@ -194,8 +195,8 @@ private[spark] abstract class MemoryManager(
    * sun.misc.Unsafe.
    */
   final val tungstenMemoryMode: MemoryMode = {
-    if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
-      require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
+    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
+      require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
         "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
       require(Platform.unaligned(),
         "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 4e31fb5..0f32fe4 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.memory
 import org.mockito.Mockito.when
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
 import org.apache.spark.storage.TestBlockId
 import org.apache.spark.storage.memory.MemoryStore
 
@@ -48,7 +49,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
       conf.clone
         .set("spark.memory.fraction", "1")
         .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-        .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
+        .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString),
       maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
       maxOnHeapStorageMemory = 0,
       numCores = 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 02b04cd..d56cfc1 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.memory
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
 import org.apache.spark.storage.TestBlockId
 import org.apache.spark.storage.memory.MemoryStore
 
@@ -43,7 +44,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val conf = new SparkConf()
       .set("spark.memory.fraction", "1")
       .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-      .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString)
+      .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString)
       .set("spark.memory.storageFraction", storageFraction.toString)
     UnifiedMemoryManager(conf, numCores = 1)
   }
@@ -305,9 +306,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
 
   test("not enough free memory in the storage pool --OFF_HEAP") {
     val conf = new SparkConf()
-      .set("spark.memory.offHeap.size", "1000")
+      .set(MEMORY_OFFHEAP_SIZE.key, "1000")
       .set("spark.testing.memory", "1000")
-      .set("spark.memory.offHeap.enabled", "true")
+      .set(MEMORY_OFFHEAP_ENABLED.key, "true")
     val taskAttemptId = 0L
     val mm = UnifiedMemoryManager(conf, numCores = 1)
     val ms = makeMemoryStore(mm)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index c2101ba..3962bdc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
 import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
@@ -69,7 +70,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     conf.set("spark.testing.memory", maxMem.toString)
-    conf.set("spark.memory.offHeap.size", maxMem.toString)
+    conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
     val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f3e8a2e..629eed4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -90,7 +90,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       testConf: Option[SparkConf] = None): BlockManager = {
     val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
     bmConf.set("spark.testing.memory", maxMem.toString)
-    bmConf.set("spark.memory.offHeap.size", maxMem.toString)
+    bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
     val serializer = new KryoSerializer(bmConf)
     val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
       Some(CryptoStreamUtils.createKey(bmConf))

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 6a6c378..df5f0b5 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark._
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.api.java.StorageLevels
 import org.apache.spark.deploy.history.HistoryServerSuite
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
 
@@ -104,7 +105,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       .set("spark.ui.enabled", "true")
       .set("spark.ui.port", "0")
       .set("spark.ui.killEnabled", killEnabled.toString)
-      .set("spark.memory.offHeap.size", "64m")
+      .set(MEMORY_OFFHEAP_SIZE.key, "64m")
     val sc = new SparkContext(conf)
     assert(sc.ui.isDefined)
     sc

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3452a1e..8485ed4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -140,6 +140,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+    buildConf("spark.sql.columnVector.offheap.enable")
+      .internal()
+      .doc("When true, use OffHeapColumnVector in ColumnarBatch.")
+      .booleanConf
+      .createWithDefault(false)
+
   val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
     .internal()
     .doc("When true, prefer sort merge join over shuffle hash join.")
@@ -1210,6 +1217,8 @@ class SQLConf extends Serializable with Logging {
 
   def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
 
+  def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
+
   def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
 
   def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index e827229..669d71e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -101,9 +101,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   private boolean returnColumnarBatch;
 
   /**
-   * The default config on whether columnarBatch should be offheap.
+   * The memory mode of the columnarBatch
    */
-  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+  private final MemoryMode MEMORY_MODE;
+
+  public VectorizedParquetRecordReader(boolean useOffHeap) {
+    MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+  }
 
   /**
    * Implementation of RecordReader API.
@@ -204,11 +208,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   }
 
   public void initBatch() {
-    initBatch(DEFAULT_MEMORY_MODE, null, null);
+    initBatch(MEMORY_MODE, null, null);
   }
 
   public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
-    initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
+    initBatch(MEMORY_MODE, partitionColumns, partitionValues);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index a607ec0..a477c23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -177,7 +177,8 @@ case class FileSourceScanExec(
   override def vectorTypes: Option[Seq[String]] =
     relation.fileFormat.vectorTypes(
       requiredSchema = requiredSchema,
-      partitionSchema = relation.partitionSchema)
+      partitionSchema = relation.partitionSchema,
+      relation.sparkSession.sessionState.conf)
 
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
     val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 2ae3f35..3e73393 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.columnar
 
+import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -37,7 +38,13 @@ case class InMemoryTableScanExec(
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
 
   override def vectorTypes: Option[Seq[String]] =
-    Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
+    Option(Seq.fill(attributes.length)(
+      if (!conf.offHeapColumnVectorEnabled) {
+        classOf[OnHeapColumnVector].getName
+      } else {
+        classOf[OffHeapColumnVector].getName
+      }
+    ))
 
   /**
    * If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
@@ -62,7 +69,12 @@ case class InMemoryTableScanExec(
 
   private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
     val rowCount = cachedColumnarBatch.numRows
-    val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+    val taskContext = Option(TaskContext.get())
+    val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) {
+      OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+    } else {
+      OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+    }
     val columnarBatch = new ColumnarBatch(
       columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount)
     columnarBatch.setNumRows(rowCount)
@@ -73,6 +85,7 @@ case class InMemoryTableScanExec(
         columnarBatch.column(i).asInstanceOf[WritableColumnVector],
         columnarBatchSchema.fields(i).dataType, rowCount)
     }
+    taskContext.foreach(_.addTaskCompletionListener(_ => columnarBatch.close()))
     columnarBatch
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index e5a7aee..d3874b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
@@ -70,7 +71,8 @@ trait FileFormat {
    */
   def vectorTypes(
       requiredSchema: StructType,
-      partitionSchema: StructType): Option[Seq[String]] = {
+      partitionSchema: StructType,
+      sqlConf: SQLConf): Option[Seq[String]] = {
     None
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 044b1a8..2b10649 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -274,9 +274,15 @@ class ParquetFileFormat
 
   override def vectorTypes(
       requiredSchema: StructType,
-      partitionSchema: StructType): Option[Seq[String]] = {
+      partitionSchema: StructType,
+      sqlConf: SQLConf): Option[Seq[String]] = {
     Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
-      classOf[OnHeapColumnVector].getName))
+      if (!sqlConf.offHeapColumnVectorEnabled) {
+        classOf[OnHeapColumnVector].getName
+      } else {
+        classOf[OffHeapColumnVector].getName
+      }
+    ))
   }
 
   override def isSplitable(
@@ -332,8 +338,10 @@ class ParquetFileFormat
     // If true, enable using the custom RecordReader for parquet. This only works for
     // a subset of the types (no complex types).
     val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
     val enableVectorizedReader: Boolean =
-      sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
+      sqlConf.parquetVectorizedReaderEnabled &&
       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
     val enableRecordFilter: Boolean =
       sparkSession.sessionState.conf.parquetRecordFilterEnabled
@@ -364,8 +372,10 @@ class ParquetFileFormat
       if (pushed.isDefined) {
         ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
       }
+      val taskContext = Option(TaskContext.get())
       val parquetReader = if (enableVectorizedReader) {
-        val vectorizedReader = new VectorizedParquetRecordReader()
+        val vectorizedReader =
+          new VectorizedParquetRecordReader(enableOffHeapColumnVector && taskContext.isDefined)
         vectorizedReader.initialize(split, hadoopAttemptContext)
         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
@@ -387,7 +397,7 @@ class ParquetFileFormat
       }
 
       val iter = new RecordReaderIterator(parquetReader)
-      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+      taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
 
       // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
       if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index b2dcbe5..d98cf85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -23,6 +23,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
 import com.esotericsoftware.kryo.io.{Input, Output}
 
 import org.apache.spark.{SparkConf, SparkEnv, SparkException}
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
 import org.apache.spark.memory.{MemoryConsumer, StaticMemoryManager, TaskMemoryManager}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -99,7 +100,7 @@ private[execution] object HashedRelation {
     val mm = Option(taskMemoryManager).getOrElse {
       new TaskMemoryManager(
         new StaticMemoryManager(
-          new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+          new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
           Long.MaxValue,
           Long.MaxValue,
           1),
@@ -232,7 +233,7 @@ private[joins] class UnsafeHashedRelation(
     // so that tests compile:
     val taskMemoryManager = new TaskMemoryManager(
       new StaticMemoryManager(
-        new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+        new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
         1),
@@ -403,7 +404,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
     this(
       new TaskMemoryManager(
         new StaticMemoryManager(
-          new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+          new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
           Long.MaxValue,
           Long.MaxValue,
           1),

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index d194f58..232c1be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
 import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl}
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
 import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -63,7 +64,7 @@ class UnsafeFixedWidthAggregationMapSuite
     }
 
     test(name) {
-      val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false")
+      val conf = new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false")
       memoryManager = new TestMemoryManager(conf)
       taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 359525f..604502f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import scala.util.Random
 
 import org.apache.spark._
-import org.apache.spark.internal.config
+import org.apache.spark.internal.config._
 import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -112,7 +112,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
       pageSize: Long,
       spill: Boolean): Unit = {
     val memoryManager =
-      new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"))
+      new TestMemoryManager(new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"))
     val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
     TaskContext.setTaskContext(new TaskContextImpl(
       stageId = 0,
@@ -125,7 +125,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
 
     val sorter = new UnsafeKVExternalSorter(
       keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager,
-      pageSize, config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
+      pageSize, SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
     // Insert the keys and values into the sorter
     inputData.foreach { case (k, v) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index a834b7c..8f4ee85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark
 import java.util.HashMap
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
 import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
@@ -538,7 +539,7 @@ class AggregateBenchmark extends BenchmarkBase {
         value.setInt(0, 555)
         val taskMemoryManager = new TaskMemoryManager(
           new StaticMemoryManager(
-            new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+            new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
             Long.MaxValue,
             Long.MaxValue,
             1),
@@ -569,8 +570,8 @@ class AggregateBenchmark extends BenchmarkBase {
       benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { iter =>
         val taskMemoryManager = new TaskMemoryManager(
           new StaticMemoryManager(
-            new SparkConf().set("spark.memory.offHeap.enabled", s"${heap == "off"}")
-              .set("spark.memory.offHeap.size", "102400000"),
+            new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}")
+              .set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
             Long.MaxValue,
             Long.MaxValue,
             1),

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 0079930..edb1290 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -40,7 +40,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -65,7 +65,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         data.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -94,7 +94,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
 
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         reader.initialize(file, null /* set columns to null to project all columns */)
         val column = reader.resultBatch().column(0)
         assert(reader.nextBatch())

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 633cfde..44a8b25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -653,7 +653,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
       val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
       {
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         try {
           reader.initialize(file, null)
           val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -670,7 +670,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Project just one column
       {
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         try {
           reader.initialize(file, ("_2" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String)]
@@ -686,7 +686,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Project columns in opposite order
       {
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         try {
           reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -703,7 +703,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Empty projection
       {
-        val reader = new VectorizedParquetRecordReader
+        val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         try {
           reader.initialize(file, List[String]().asJava)
           var result = 0
@@ -742,7 +742,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       dataTypes.zip(constantValues).foreach { case (dt, v) =>
         val schema = StructType(StructField("pcol", dt) :: Nil)
-        val vectorizedReader = new VectorizedParquetRecordReader
+        val vectorizedReader =
+          new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
         val partitionValues = new GenericInternalRow(Array(v))
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index de7a579..86a3c71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -75,6 +75,7 @@ object ParquetReadBenchmark {
 
     withTempPath { dir =>
       withTempTable("t1", "tempTable") {
+        val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
         spark.range(values).createOrReplaceTempView("t1")
         spark.sql("select cast(id as INT) as id from t1")
             .write.parquet(dir.getCanonicalPath)
@@ -95,7 +96,7 @@ object ParquetReadBenchmark {
         parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader
+            val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -118,7 +119,7 @@ object ParquetReadBenchmark {
         parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader
+            val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -260,6 +261,7 @@ object ParquetReadBenchmark {
   def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
     withTempPath { dir =>
       withTempTable("t1", "tempTable") {
+        val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
         spark.range(values).createOrReplaceTempView("t1")
         spark.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " +
           s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1")
@@ -277,7 +279,7 @@ object ParquetReadBenchmark {
         benchmark.addCase("PR Vectorized") { num =>
           var sum = 0
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader
+            val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()

http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index ede63fe..51f8c33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
 import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.InternalRow
@@ -36,7 +37,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   val mm = new TaskMemoryManager(
     new StaticMemoryManager(
-      new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+      new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
       Long.MaxValue,
       Long.MaxValue,
       1),
@@ -85,7 +86,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
   test("test serialization empty hash map") {
     val taskMemoryManager = new TaskMemoryManager(
       new StaticMemoryManager(
-        new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+        new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
         1),
@@ -157,7 +158,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
   test("LongToUnsafeRowMap with very wide range") {
     val taskMemoryManager = new TaskMemoryManager(
       new StaticMemoryManager(
-        new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+        new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
         1),
@@ -202,7 +203,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
   test("LongToUnsafeRowMap with random keys") {
     val taskMemoryManager = new TaskMemoryManager(
       new StaticMemoryManager(
-        new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+        new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
         1),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org