You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/20 11:40:59 UTC
spark git commit: [SPARK-20101][SQL] Use OffHeapColumnVector when
"spark.sql.columnVector.offheap.enable" is set to "true"
Repository: spark
Updated Branches:
refs/heads/master 57c5514de -> 3c3eebc87
[SPARK-20101][SQL] Use OffHeapColumnVector when "spark.sql.columnVector.offheap.enable" is set to "true"
This PR enables to use ``OffHeapColumnVector`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``. While ``ColumnVector`` has two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only ``OnHeapColumnVector`` is always used.
This PR implements the followings
- Pass ``OffHeapColumnVector`` to ``ColumnarBatch.allocate()`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``
- Free all of off-heap memory regions by ``OffHeapColumnVector.close()``
- Ensure to call ``OffHeapColumnVector.close()``
Use existing tests
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #17436 from kiszk/SPARK-20101.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c3eebc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c3eebc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c3eebc8
Branch: refs/heads/master
Commit: 3c3eebc8734e36e61f4627e2c517fbbe342b3b42
Parents: 57c5514
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Mon Nov 20 12:40:16 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Nov 20 12:40:26 2017 +0100
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkConf.scala | 2 +-
.../apache/spark/internal/config/package.scala | 16 ++++++++++++++
.../org/apache/spark/memory/MemoryManager.scala | 7 ++++---
.../spark/memory/StaticMemoryManagerSuite.scala | 3 ++-
.../memory/UnifiedMemoryManagerSuite.scala | 7 ++++---
.../storage/BlockManagerReplicationSuite.scala | 3 ++-
.../spark/storage/BlockManagerSuite.scala | 2 +-
.../org/apache/spark/ui/UISeleniumSuite.scala | 3 ++-
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++++++++
.../parquet/VectorizedParquetRecordReader.java | 12 +++++++----
.../sql/execution/DataSourceScanExec.scala | 3 ++-
.../columnar/InMemoryTableScanExec.scala | 17 +++++++++++++--
.../sql/execution/datasources/FileFormat.scala | 4 +++-
.../datasources/parquet/ParquetFileFormat.scala | 22 ++++++++++++++------
.../sql/execution/joins/HashedRelation.scala | 7 ++++---
.../UnsafeFixedWidthAggregationMapSuite.scala | 3 ++-
.../execution/UnsafeKVExternalSorterSuite.scala | 6 +++---
.../benchmark/AggregateBenchmark.scala | 7 ++++---
.../parquet/ParquetEncodingSuite.scala | 6 +++---
.../datasources/parquet/ParquetIOSuite.scala | 11 +++++-----
.../parquet/ParquetReadBenchmark.scala | 8 ++++---
.../execution/joins/HashedRelationSuite.scala | 9 ++++----
22 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 57b3744..ee726df 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -655,7 +655,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
- "spark.memory.offHeap.enabled" -> Seq(
+ MEMORY_OFFHEAP_ENABLED.key -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6")),
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 84315f5..7be4d6b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -80,6 +80,22 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
+ private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
+ .doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
+ "If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")
+ .withAlternative("spark.unsafe.offHeap")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val MEMORY_OFFHEAP_SIZE = ConfigBuilder("spark.memory.offHeap.size")
+ .doc("The absolute amount of memory in bytes which can be used for off-heap allocation. " +
+ "This setting has no impact on heap memory usage, so if your executors' total memory " +
+ "consumption must fit within some hard limit then be sure to shrink your JVM heap size " +
+ "accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.")
+ .bytesConf(ByteUnit.BYTE)
+ .checkValue(_ >= 0, "The off-heap memory size must not be negative")
+ .createWithDefault(0)
+
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 82442cf..0641adc 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.Platform
@@ -54,7 +55,7 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
- protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
+ protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
@@ -194,8 +195,8 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
- if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
- require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
+ if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
+ require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 4e31fb5..0f32fe4 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.memory
import org.mockito.Mockito.when
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore
@@ -48,7 +49,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
conf.clone
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
+ .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxOnHeapStorageMemory = 0,
numCores = 1)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 02b04cd..d56cfc1 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.memory
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
import org.apache.spark.storage.TestBlockId
import org.apache.spark.storage.memory.MemoryStore
@@ -43,7 +44,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString)
+ .set(MEMORY_OFFHEAP_SIZE.key, maxOffHeapExecutionMemory.toString)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}
@@ -305,9 +306,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
test("not enough free memory in the storage pool --OFF_HEAP") {
val conf = new SparkConf()
- .set("spark.memory.offHeap.size", "1000")
+ .set(MEMORY_OFFHEAP_SIZE.key, "1000")
.set("spark.testing.memory", "1000")
- .set("spark.memory.offHeap.enabled", "true")
+ .set(MEMORY_OFFHEAP_ENABLED.key, "true")
val taskAttemptId = 0L
val mm = UnifiedMemoryManager(conf, numCores = 1)
val ms = makeMemoryStore(mm)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index c2101ba..3962bdc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -69,7 +70,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
- conf.set("spark.memory.offHeap.size", maxMem.toString)
+ conf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f3e8a2e..629eed4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -90,7 +90,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
testConf: Option[SparkConf] = None): BlockManager = {
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
bmConf.set("spark.testing.memory", maxMem.toString)
- bmConf.set("spark.memory.offHeap.size", maxMem.toString)
+ bmConf.set(MEMORY_OFFHEAP_SIZE.key, maxMem.toString)
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 6a6c378..df5f0b5 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
@@ -104,7 +105,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.killEnabled", killEnabled.toString)
- .set("spark.memory.offHeap.size", "64m")
+ .set(MEMORY_OFFHEAP_SIZE.key, "64m")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3452a1e..8485ed4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -140,6 +140,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val COLUMN_VECTOR_OFFHEAP_ENABLED =
+ buildConf("spark.sql.columnVector.offheap.enable")
+ .internal()
+ .doc("When true, use OffHeapColumnVector in ColumnarBatch.")
+ .booleanConf
+ .createWithDefault(false)
+
val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
.internal()
.doc("When true, prefer sort merge join over shuffle hash join.")
@@ -1210,6 +1217,8 @@ class SQLConf extends Serializable with Logging {
def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
+ def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
+
def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index e827229..669d71e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -101,9 +101,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private boolean returnColumnarBatch;
/**
- * The default config on whether columnarBatch should be offheap.
+ * The memory mode of the columnarBatch
*/
- private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+ private final MemoryMode MEMORY_MODE;
+
+ public VectorizedParquetRecordReader(boolean useOffHeap) {
+ MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+ }
/**
* Implementation of RecordReader API.
@@ -204,11 +208,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
}
public void initBatch() {
- initBatch(DEFAULT_MEMORY_MODE, null, null);
+ initBatch(MEMORY_MODE, null, null);
}
public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
- initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
+ initBatch(MEMORY_MODE, partitionColumns, partitionValues);
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index a607ec0..a477c23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -177,7 +177,8 @@ case class FileSourceScanExec(
override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
- partitionSchema = relation.partitionSchema)
+ partitionSchema = relation.partitionSchema,
+ relation.sparkSession.sessionState.conf)
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 2ae3f35..3e73393 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.columnar
+import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -37,7 +38,13 @@ case class InMemoryTableScanExec(
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
override def vectorTypes: Option[Seq[String]] =
- Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
+ Option(Seq.fill(attributes.length)(
+ if (!conf.offHeapColumnVectorEnabled) {
+ classOf[OnHeapColumnVector].getName
+ } else {
+ classOf[OffHeapColumnVector].getName
+ }
+ ))
/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
@@ -62,7 +69,12 @@ case class InMemoryTableScanExec(
private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
- val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+ val taskContext = Option(TaskContext.get())
+ val columnVectors = if (!conf.offHeapColumnVectorEnabled || taskContext.isEmpty) {
+ OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+ } else {
+ OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+ }
val columnarBatch = new ColumnarBatch(
columnarBatchSchema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount)
columnarBatch.setNumRows(rowCount)
@@ -73,6 +85,7 @@ case class InMemoryTableScanExec(
columnarBatch.column(i).asInstanceOf[WritableColumnVector],
columnarBatchSchema.fields(i).dataType, rowCount)
}
+ taskContext.foreach(_.addTaskCompletionListener(_ => columnarBatch.close()))
columnarBatch
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index e5a7aee..d3874b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
@@ -70,7 +71,8 @@ trait FileFormat {
*/
def vectorTypes(
requiredSchema: StructType,
- partitionSchema: StructType): Option[Seq[String]] = {
+ partitionSchema: StructType,
+ sqlConf: SQLConf): Option[Seq[String]] = {
None
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 044b1a8..2b10649 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -274,9 +274,15 @@ class ParquetFileFormat
override def vectorTypes(
requiredSchema: StructType,
- partitionSchema: StructType): Option[Seq[String]] = {
+ partitionSchema: StructType,
+ sqlConf: SQLConf): Option[Seq[String]] = {
Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
- classOf[OnHeapColumnVector].getName))
+ if (!sqlConf.offHeapColumnVectorEnabled) {
+ classOf[OnHeapColumnVector].getName
+ } else {
+ classOf[OffHeapColumnVector].getName
+ }
+ ))
}
override def isSplitable(
@@ -332,8 +338,10 @@ class ParquetFileFormat
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+ val sqlConf = sparkSession.sessionState.conf
+ val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
- sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
+ sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableRecordFilter: Boolean =
sparkSession.sessionState.conf.parquetRecordFilterEnabled
@@ -364,8 +372,10 @@ class ParquetFileFormat
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
+ val taskContext = Option(TaskContext.get())
val parquetReader = if (enableVectorizedReader) {
- val vectorizedReader = new VectorizedParquetRecordReader()
+ val vectorizedReader =
+ new VectorizedParquetRecordReader(enableOffHeapColumnVector && taskContext.isDefined)
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
@@ -387,7 +397,7 @@ class ParquetFileFormat
}
val iter = new RecordReaderIterator(parquetReader)
- Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+ taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index b2dcbe5..d98cf85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -23,6 +23,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.memory.{MemoryConsumer, StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -99,7 +100,7 @@ private[execution] object HashedRelation {
val mm = Option(taskMemoryManager).getOrElse {
new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
@@ -232,7 +233,7 @@ private[joins] class UnsafeHashedRelation(
// so that tests compile:
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
@@ -403,7 +404,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
this(
new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index d194f58..232c1be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl}
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -63,7 +64,7 @@ class UnsafeFixedWidthAggregationMapSuite
}
test(name) {
- val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false")
+ val conf = new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false")
memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 359525f..604502f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import scala.util.Random
import org.apache.spark._
-import org.apache.spark.internal.config
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -112,7 +112,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
pageSize: Long,
spill: Boolean): Unit = {
val memoryManager =
- new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"))
+ new TestMemoryManager(new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"))
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
@@ -125,7 +125,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
val sorter = new UnsafeKVExternalSorter(
keySchema, valueSchema, SparkEnv.get.blockManager, SparkEnv.get.serializerManager,
- pageSize, config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
+ pageSize, SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
// Insert the keys and values into the sorter
inputData.foreach { case (k, v) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index a834b7c..8f4ee85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark
import java.util.HashMap
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
@@ -538,7 +539,7 @@ class AggregateBenchmark extends BenchmarkBase {
value.setInt(0, 555)
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
@@ -569,8 +570,8 @@ class AggregateBenchmark extends BenchmarkBase {
benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { iter =>
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", s"${heap == "off"}")
- .set("spark.memory.offHeap.size", "102400000"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}")
+ .set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
Long.MaxValue,
Long.MaxValue,
1),
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 0079930..edb1290 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -40,7 +40,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -65,7 +65,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
data.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -94,7 +94,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
reader.initialize(file, null /* set columns to null to project all columns */)
val column = reader.resultBatch().column(0)
assert(reader.nextBatch())
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 633cfde..44a8b25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -653,7 +653,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
{
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
try {
reader.initialize(file, null)
val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -670,7 +670,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Project just one column
{
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
try {
reader.initialize(file, ("_2" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String)]
@@ -686,7 +686,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Project columns in opposite order
{
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
try {
reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -703,7 +703,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Empty projection
{
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
try {
reader.initialize(file, List[String]().asJava)
var result = 0
@@ -742,7 +742,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
- val vectorizedReader = new VectorizedParquetRecordReader
+ val vectorizedReader =
+ new VectorizedParquetRecordReader(sqlContext.conf.offHeapColumnVectorEnabled)
val partitionValues = new GenericInternalRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index de7a579..86a3c71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -75,6 +75,7 @@ object ParquetReadBenchmark {
withTempPath { dir =>
withTempTable("t1", "tempTable") {
+ val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
spark.range(values).createOrReplaceTempView("t1")
spark.sql("select cast(id as INT) as id from t1")
.write.parquet(dir.getCanonicalPath)
@@ -95,7 +96,7 @@ object ParquetReadBenchmark {
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -118,7 +119,7 @@ object ParquetReadBenchmark {
parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -260,6 +261,7 @@ object ParquetReadBenchmark {
def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
withTempPath { dir =>
withTempTable("t1", "tempTable") {
+ val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
spark.range(values).createOrReplaceTempView("t1")
spark.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " +
s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1")
@@ -277,7 +279,7 @@ object ParquetReadBenchmark {
benchmark.addCase("PR Vectorized") { num =>
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new VectorizedParquetRecordReader
+ val reader = new VectorizedParquetRecordReader(enableOffHeapColumnVector)
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
http://git-wip-us.apache.org/repos/asf/spark/blob/3c3eebc8/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index ede63fe..51f8c33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
@@ -36,7 +37,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
val mm = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
@@ -85,7 +86,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("test serialization empty hash map") {
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
@@ -157,7 +158,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("LongToUnsafeRowMap with very wide range") {
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
@@ -202,7 +203,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("LongToUnsafeRowMap with random keys") {
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org