You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/07/27 13:16:58 UTC
[spark] branch master updated: [SPARK-30794][CORE] Stage Level
scheduling: Add ability to set off heap memory
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 998086c [SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory
998086c is described below
commit 998086c9a179692b2687bc9a104dbbb35f5a44e2
Author: Warren Zhu <zh...@microsoft.com>
AuthorDate: Mon Jul 27 08:16:13 2020 -0500
[SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory
### What changes were proposed in this pull request?
Support set off heap memory in `ExecutorResourceRequests`
### Why are the changes needed?
Support stage level scheduling
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT in `ResourceProfileSuite` and `DAGSchedulerSuite`
Closes #28972 from warrenzhu25/30794.
Authored-by: Warren Zhu <zh...@microsoft.com>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../spark/resource/ExecutorResourceRequests.scala | 14 ++++++
.../apache/spark/resource/ResourceProfile.scala | 8 +++-
.../spark/resource/ResourceProfileSuite.scala | 50 +++++++++++++++++++---
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 5 ++-
python/pyspark/resource/requests.py | 9 ++++
python/pyspark/resource/tests/test_resources.py | 5 ++-
.../apache/spark/deploy/yarn/YarnAllocator.scala | 3 +-
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 23 ++++++++--
8 files changed, 102 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index 9da6ffb..654afa0 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -55,6 +55,20 @@ class ExecutorResourceRequests() extends Serializable {
}
/**
+ * Specify off heap memory. The value specified will be converted to MiB.
+ * This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
+ *
+ * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ * Default unit is MiB if not specified.
+ */
+ def offHeapMemory(amount: String): this.type = {
+ val amountMiB = JavaUtils.byteStringAsMb(amount)
+ val req = new ExecutorResourceRequest(OFFHEAP_MEM, amountMiB)
+ _executorResources.put(OFFHEAP_MEM, req)
+ this
+ }
+
+ /**
* Specify overhead memory. The value specified will be converted to MiB.
*
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index f56ea69..8a37670 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -243,13 +243,15 @@ object ResourceProfile extends Logging {
// task resources
val CPUS = "cpus"
// Executor resources
+ // Make sure add new executor resource in below allSupportedExecutorResources
val CORES = "cores"
val MEMORY = "memory"
+ val OFFHEAP_MEM = "offHeap"
val OVERHEAD_MEM = "memoryOverhead"
val PYSPARK_MEM = "pyspark.memory"
// all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
- val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM)
+ val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0
@@ -295,6 +297,10 @@ object ResourceProfile extends Logging {
ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString))
conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString))
+ if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
+ // Explicitly add suffix b as default unit of offHeapMemory is Mib
+ ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b")
+ }
val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
execReq.foreach { req =>
val name = req.id.resourceName
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index 29d3ef1..d0479ca 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.resource
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.resource.TestResourceIDs._
@@ -55,6 +55,8 @@ class ResourceProfileSuite extends SparkFunSuite {
"pyspark memory empty if not specified")
assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
"overhead memory empty if not specified")
+ assert(rprof.executorResources.get(ResourceProfile.OFFHEAP_MEM) == None,
+ "offHeap memory empty if not specified")
assert(rprof.taskResources.size === 1,
"Task resources should just contain cpus by default")
assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
@@ -69,14 +71,16 @@ class ResourceProfileSuite extends SparkFunSuite {
conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g")
conf.set(EXECUTOR_MEMORY.key, "4g")
conf.set(EXECUTOR_CORES.key, "4")
+ conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
+ conf.set(MEMORY_OFFHEAP_SIZE.key, "3m")
conf.set(TASK_GPU_ID.amountConf, "1")
conf.set(EXECUTOR_GPU_ID.amountConf, "1")
conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, "nameOfScript")
val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val execResources = rprof.executorResources
- assert(execResources.size === 5, s"Executor resources should contain cores, pyspark " +
- s"memory, memory overhead, memory, and gpu $execResources")
+ assert(execResources.size === 6, s"Executor resources should contain cores, pyspark " +
+ s"memory, memory overhead, memory, offHeap memory and gpu $execResources")
assert(execResources.contains("gpu"), "Executor resources should have gpu")
assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
"Executor resources should have 4 core")
@@ -88,6 +92,8 @@ class ResourceProfileSuite extends SparkFunSuite {
"pyspark memory empty if not specified")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024,
"overhead memory empty if not specified")
+ assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount == 3,
+ "Executor resources should have 3 offHeap memory")
assert(rprof.taskResources.size === 2,
"Task resources should just contain cpus and gpu")
assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
@@ -172,14 +178,14 @@ class ResourceProfileSuite extends SparkFunSuite {
val ereqs = new ExecutorResourceRequests()
ereqs.cores(2).memory("4096")
- ereqs.memoryOverhead("2048").pysparkMemory("1024")
+ ereqs.memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
val treqs = new TaskResourceRequests()
treqs.cpus(1)
rprof.require(treqs)
rprof.require(ereqs)
- assert(rprof.executorResources.size === 5)
+ assert(rprof.executorResources.size === 6)
assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
"Executor resources should have 2 cores")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
@@ -188,6 +194,8 @@ class ResourceProfileSuite extends SparkFunSuite {
"Executor resources should have 2048 overhead memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
"Executor resources should have 1024 pyspark memory")
+ assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 3072,
+ "Executor resources should have 3072 offHeap memory")
assert(rprof.taskResources.size === 2)
assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
@@ -217,7 +225,7 @@ class ResourceProfileSuite extends SparkFunSuite {
val rprof = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.memory("4g")
- ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
+ ereqs.memoryOverhead("2000m").pysparkMemory("512000k").offHeapMemory("1g")
rprof.require(ereqs)
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
@@ -226,6 +234,8 @@ class ResourceProfileSuite extends SparkFunSuite {
"Executor resources should have 2000 overhead memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
"Executor resources should have 512 pyspark memory")
+ assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 1024,
+ "Executor resources should have 1024 offHeap memory")
}
test("Test TaskResourceRequest fractional") {
@@ -256,4 +266,32 @@ class ResourceProfileSuite extends SparkFunSuite {
}.getMessage()
assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
}
+
+ test("ResourceProfile has correct custom executor resources") {
+ val rprof = new ResourceProfileBuilder()
+ val eReq = new ExecutorResourceRequests()
+ .cores(2).memory("4096")
+ .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
+ .resource("gpu", 2)
+ rprof.require(eReq)
+
+ // Update this if new resource type added
+ assert(ResourceProfile.allSupportedExecutorResources.size === 5,
+ "Executor resources should have 5 supported resources")
+ assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1,
+ "Executor resources should have 1 custom resource")
+ }
+
+ test("ResourceProfile has correct custom task resources") {
+ val rprof = new ResourceProfileBuilder()
+ val taskReq = new TaskResourceRequests()
+ .resource("gpu", 1)
+ val eReq = new ExecutorResourceRequests()
+ .cores(2).memory("4096")
+ .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
+ rprof.require(taskReq).require(eReq)
+
+ assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1,
+ "Task resources should have 1 custom resource")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2b38aa1..45af0d0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3286,7 +3286,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mergedRp.taskResources.get(GPU).get.amount == 1)
val ereqs5 = new ExecutorResourceRequests().cores(1).memory("3g")
- .memoryOverhead("1g").pysparkMemory("2g").resource(GPU, 1, "disc")
+ .memoryOverhead("1g").pysparkMemory("2g").offHeapMemory("4g").resource(GPU, 1, "disc")
val treqs5 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
val rp5 = new ResourceProfile(ereqs5.requests, treqs5.requests)
val ereqs6 = new ExecutorResourceRequests().cores(8).resource(FPGA, 2, "fdisc")
@@ -3296,7 +3296,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 8)
- assert(mergedRp.executorResources.size == 6)
+ assert(mergedRp.executorResources.size == 7)
assert(mergedRp.taskResources.size == 3)
assert(mergedRp.executorResources.get(GPU).get.amount == 1)
assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
@@ -3307,6 +3307,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mergedRp.executorResources.get(ResourceProfile.MEMORY).get.amount == 3072)
assert(mergedRp.executorResources.get(ResourceProfile.PYSPARK_MEM).get.amount == 2048)
assert(mergedRp.executorResources.get(ResourceProfile.OVERHEAD_MEM).get.amount == 1024)
+ assert(mergedRp.executorResources.get(ResourceProfile.OFFHEAP_MEM).get.amount == 4096)
val ereqs7 = new ExecutorResourceRequests().cores(1).memory("3g")
.resource(GPU, 4, "disc")
diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py
index 56ad6e8..6149108 100644
--- a/python/pyspark/resource/requests.py
+++ b/python/pyspark/resource/requests.py
@@ -91,6 +91,7 @@ class ExecutorResourceRequests(object):
_MEMORY = "memory"
_OVERHEAD_MEM = "memoryOverhead"
_PYSPARK_MEM = "pyspark.memory"
+ _OFFHEAP_MEM = "offHeap"
def __init__(self, _jvm=None, _requests=None):
from pyspark import SparkContext
@@ -139,6 +140,14 @@ class ExecutorResourceRequests(object):
ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount))
return self
+ def offheapMemory(self, amount):
+ if self._java_executor_resource_requests is not None:
+ self._java_executor_resource_requests.offHeapMemory(amount)
+ else:
+ self._executor_resources[self._OFFHEAP_MEM] = \
+ ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount))
+ return self
+
def cores(self, amount):
if self._java_executor_resource_requests is not None:
self._java_executor_resource_requests.cores(amount)
diff --git a/python/pyspark/resource/tests/test_resources.py b/python/pyspark/resource/tests/test_resources.py
index 9eb5a35..09c0d3c 100644
--- a/python/pyspark/resource/tests/test_resources.py
+++ b/python/pyspark/resource/tests/test_resources.py
@@ -25,15 +25,16 @@ class ResourceProfileTests(unittest.TestCase):
def test_profile_before_sc(self):
rpb = ResourceProfileBuilder()
ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
- ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
+ ereqs.pysparkMemory("2g").offheapMemory("3g").resource("gpu", 2, "testGpus", "nvidia.com")
treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
def assert_request_contents(exec_reqs, task_reqs):
- self.assertEqual(len(exec_reqs), 5)
+ self.assertEqual(len(exec_reqs), 6)
self.assertEqual(exec_reqs["cores"].amount, 2)
self.assertEqual(exec_reqs["memory"].amount, 6144)
self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024)
self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048)
+ self.assertEqual(exec_reqs["offHeap"].amount, 3072)
self.assertEqual(exec_reqs["gpu"].amount, 2)
self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus")
self.assertEqual(exec_reqs["gpu"].resourceName, "gpu")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index dc09323..adbbbc0 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -308,7 +308,6 @@ private[yarn] class YarnAllocator(
if (!rpIdToYarnResource.contains(rp.id)) {
// Start with the application or default settings
var heapMem = executorMemory.toLong
- // Note we currently don't support off heap memory in ResourceProfile - SPARK-30794
var offHeapMem = executorOffHeapMemory.toLong
var overheadMem = memoryOverhead.toLong
var pysparkMem = pysparkWorkerMemory.toLong
@@ -326,6 +325,8 @@ private[yarn] class YarnAllocator(
overheadMem = execReq.amount
case ResourceProfile.PYSPARK_MEM =>
pysparkMem = execReq.amount
+ case ResourceProfile.OFFHEAP_MEM =>
+ offHeapMem = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
case ResourceProfile.CORES =>
cores = execReq.amount.toInt
case "gpu" =>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 9d6b776..fe8990b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
+import org.apache.spark.resource.ExecutorResourceRequest
import org.apache.spark.util.Utils
object YarnSparkHadoopUtil {
@@ -187,11 +188,27 @@ object YarnSparkHadoopUtil {
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
+ val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
+ checkOffHeapEnabled(sparkConf, sizeInMB).toInt
+ }
+
+ /**
+ * Get offHeap memory size from [[ExecutorResourceRequest]]
+ * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+ */
+ def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+ execRequest: ExecutorResourceRequest): Long = {
+ checkOffHeapEnabled(sparkConf, execRequest.amount)
+ }
+
+ /**
+ * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+ */
+ def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
- val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
- require(sizeInMB > 0,
+ require(offHeapSize > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
- sizeInMB
+ offHeapSize
} else {
0
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org