You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/07/27 13:16:58 UTC

[spark] branch master updated: [SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 998086c  [SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory
998086c is described below

commit 998086c9a179692b2687bc9a104dbbb35f5a44e2
Author: Warren Zhu <zh...@microsoft.com>
AuthorDate: Mon Jul 27 08:16:13 2020 -0500

    [SPARK-30794][CORE] Stage Level scheduling: Add ability to set off heap memory
    
    ### What changes were proposed in this pull request?
    Support set off heap memory in `ExecutorResourceRequests`
    
    ### Why are the changes needed?
    Support stage level scheduling
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT in `ResourceProfileSuite` and `DAGSchedulerSuite`
    
    Closes #28972 from warrenzhu25/30794.
    
    Authored-by: Warren Zhu <zh...@microsoft.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../spark/resource/ExecutorResourceRequests.scala  | 14 ++++++
 .../apache/spark/resource/ResourceProfile.scala    |  8 +++-
 .../spark/resource/ResourceProfileSuite.scala      | 50 +++++++++++++++++++---
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |  5 ++-
 python/pyspark/resource/requests.py                |  9 ++++
 python/pyspark/resource/tests/test_resources.py    |  5 ++-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  3 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    | 23 ++++++++--
 8 files changed, 102 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index 9da6ffb..654afa0 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -55,6 +55,20 @@ class ExecutorResourceRequests() extends Serializable {
   }
 
   /**
+   * Specify off heap memory. The value specified will be converted to MiB.
+   * This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
+   *
+   * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+   *               Default unit is MiB if not specified.
+   */
+  def offHeapMemory(amount: String): this.type = {
+    val amountMiB = JavaUtils.byteStringAsMb(amount)
+    val req = new ExecutorResourceRequest(OFFHEAP_MEM, amountMiB)
+    _executorResources.put(OFFHEAP_MEM, req)
+    this
+  }
+
+  /**
    * Specify overhead memory. The value specified will be converted to MiB.
    *
    * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index f56ea69..8a37670 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -243,13 +243,15 @@ object ResourceProfile extends Logging {
   // task resources
   val CPUS = "cpus"
   // Executor resources
+  // Make sure add new executor resource in below allSupportedExecutorResources
   val CORES = "cores"
   val MEMORY = "memory"
+  val OFFHEAP_MEM = "offHeap"
   val OVERHEAD_MEM = "memoryOverhead"
   val PYSPARK_MEM = "pyspark.memory"
 
   // all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
-  val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM)
+  val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
 
   val UNKNOWN_RESOURCE_PROFILE_ID = -1
   val DEFAULT_RESOURCE_PROFILE_ID = 0
@@ -295,6 +297,10 @@ object ResourceProfile extends Logging {
     ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
     conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString))
     conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString))
+    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
+      // Explicitly add suffix b as default unit of offHeapMemory is Mib
+      ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b")
+    }
     val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
     execReq.foreach { req =>
       val name = req.id.resourceName
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index 29d3ef1..d0479ca 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.resource
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
 import org.apache.spark.resource.TestResourceIDs._
 
@@ -55,6 +55,8 @@ class ResourceProfileSuite extends SparkFunSuite {
       "pyspark memory empty if not specified")
     assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
       "overhead memory empty if not specified")
+    assert(rprof.executorResources.get(ResourceProfile.OFFHEAP_MEM) == None,
+      "offHeap memory empty if not specified")
     assert(rprof.taskResources.size === 1,
       "Task resources should just contain cpus by default")
     assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
@@ -69,14 +71,16 @@ class ResourceProfileSuite extends SparkFunSuite {
     conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g")
     conf.set(EXECUTOR_MEMORY.key, "4g")
     conf.set(EXECUTOR_CORES.key, "4")
+    conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
+    conf.set(MEMORY_OFFHEAP_SIZE.key, "3m")
     conf.set(TASK_GPU_ID.amountConf, "1")
     conf.set(EXECUTOR_GPU_ID.amountConf, "1")
     conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, "nameOfScript")
     val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
     assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val execResources = rprof.executorResources
-    assert(execResources.size === 5, s"Executor resources should contain cores, pyspark " +
-      s"memory, memory overhead, memory, and gpu $execResources")
+    assert(execResources.size === 6, s"Executor resources should contain cores, pyspark " +
+      s"memory, memory overhead, memory, offHeap memory and gpu $execResources")
     assert(execResources.contains("gpu"), "Executor resources should have gpu")
     assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
       "Executor resources should have 4 core")
@@ -88,6 +92,8 @@ class ResourceProfileSuite extends SparkFunSuite {
       "pyspark memory empty if not specified")
     assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024,
       "overhead memory empty if not specified")
+    assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount == 3,
+      "Executor resources should have 3 offHeap memory")
     assert(rprof.taskResources.size === 2,
       "Task resources should just contain cpus and gpu")
     assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
@@ -172,14 +178,14 @@ class ResourceProfileSuite extends SparkFunSuite {
 
     val ereqs = new ExecutorResourceRequests()
     ereqs.cores(2).memory("4096")
-    ereqs.memoryOverhead("2048").pysparkMemory("1024")
+    ereqs.memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
     val treqs = new TaskResourceRequests()
     treqs.cpus(1)
 
     rprof.require(treqs)
     rprof.require(ereqs)
 
-    assert(rprof.executorResources.size === 5)
+    assert(rprof.executorResources.size === 6)
     assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
       "Executor resources should have 2 cores")
     assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
@@ -188,6 +194,8 @@ class ResourceProfileSuite extends SparkFunSuite {
       "Executor resources should have 2048 overhead memory")
     assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
       "Executor resources should have 1024 pyspark memory")
+    assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 3072,
+      "Executor resources should have 3072 offHeap memory")
 
     assert(rprof.taskResources.size === 2)
     assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
@@ -217,7 +225,7 @@ class ResourceProfileSuite extends SparkFunSuite {
     val rprof = new ResourceProfileBuilder()
     val ereqs = new ExecutorResourceRequests()
     ereqs.memory("4g")
-    ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
+    ereqs.memoryOverhead("2000m").pysparkMemory("512000k").offHeapMemory("1g")
     rprof.require(ereqs)
 
     assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
@@ -226,6 +234,8 @@ class ResourceProfileSuite extends SparkFunSuite {
       "Executor resources should have 2000 overhead memory")
     assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
       "Executor resources should have 512 pyspark memory")
+    assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 1024,
+      "Executor resources should have 1024 offHeap memory")
   }
 
   test("Test TaskResourceRequest fractional") {
@@ -256,4 +266,32 @@ class ResourceProfileSuite extends SparkFunSuite {
     }.getMessage()
     assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
   }
+
+  test("ResourceProfile has correct custom executor resources") {
+    val rprof = new ResourceProfileBuilder()
+    val eReq = new ExecutorResourceRequests()
+      .cores(2).memory("4096")
+      .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
+      .resource("gpu", 2)
+    rprof.require(eReq)
+
+    // Update this if new resource type added
+    assert(ResourceProfile.allSupportedExecutorResources.size === 5,
+      "Executor resources should have 5 supported resources")
+    assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1,
+      "Executor resources should have 1 custom resource")
+  }
+
+  test("ResourceProfile has correct custom task resources") {
+    val rprof = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests()
+      .resource("gpu", 1)
+    val eReq = new ExecutorResourceRequests()
+      .cores(2).memory("4096")
+      .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
+    rprof.require(taskReq).require(eReq)
+
+    assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1,
+      "Task resources should have 1 custom resource")
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2b38aa1..45af0d0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3286,7 +3286,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assert(mergedRp.taskResources.get(GPU).get.amount == 1)
 
     val ereqs5 = new ExecutorResourceRequests().cores(1).memory("3g")
-      .memoryOverhead("1g").pysparkMemory("2g").resource(GPU, 1, "disc")
+      .memoryOverhead("1g").pysparkMemory("2g").offHeapMemory("4g").resource(GPU, 1, "disc")
     val treqs5 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
     val rp5 = new ResourceProfile(ereqs5.requests, treqs5.requests)
     val ereqs6 = new ExecutorResourceRequests().cores(8).resource(FPGA, 2, "fdisc")
@@ -3296,7 +3296,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
     assert(mergedRp.getTaskCpus.get == 2)
     assert(mergedRp.getExecutorCores.get == 8)
-    assert(mergedRp.executorResources.size == 6)
+    assert(mergedRp.executorResources.size == 7)
     assert(mergedRp.taskResources.size == 3)
     assert(mergedRp.executorResources.get(GPU).get.amount == 1)
     assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
@@ -3307,6 +3307,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assert(mergedRp.executorResources.get(ResourceProfile.MEMORY).get.amount == 3072)
     assert(mergedRp.executorResources.get(ResourceProfile.PYSPARK_MEM).get.amount == 2048)
     assert(mergedRp.executorResources.get(ResourceProfile.OVERHEAD_MEM).get.amount == 1024)
+    assert(mergedRp.executorResources.get(ResourceProfile.OFFHEAP_MEM).get.amount == 4096)
 
     val ereqs7 = new ExecutorResourceRequests().cores(1).memory("3g")
       .resource(GPU, 4, "disc")
diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py
index 56ad6e8..6149108 100644
--- a/python/pyspark/resource/requests.py
+++ b/python/pyspark/resource/requests.py
@@ -91,6 +91,7 @@ class ExecutorResourceRequests(object):
     _MEMORY = "memory"
     _OVERHEAD_MEM = "memoryOverhead"
     _PYSPARK_MEM = "pyspark.memory"
+    _OFFHEAP_MEM = "offHeap"
 
     def __init__(self, _jvm=None, _requests=None):
         from pyspark import SparkContext
@@ -139,6 +140,14 @@ class ExecutorResourceRequests(object):
                 ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount))
         return self
 
+    def offheapMemory(self, amount):
+        if self._java_executor_resource_requests is not None:
+            self._java_executor_resource_requests.offHeapMemory(amount)
+        else:
+            self._executor_resources[self._OFFHEAP_MEM] = \
+                ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount))
+        return self
+
     def cores(self, amount):
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.cores(amount)
diff --git a/python/pyspark/resource/tests/test_resources.py b/python/pyspark/resource/tests/test_resources.py
index 9eb5a35..09c0d3c 100644
--- a/python/pyspark/resource/tests/test_resources.py
+++ b/python/pyspark/resource/tests/test_resources.py
@@ -25,15 +25,16 @@ class ResourceProfileTests(unittest.TestCase):
     def test_profile_before_sc(self):
         rpb = ResourceProfileBuilder()
         ereqs = ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
-        ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
+        ereqs.pysparkMemory("2g").offheapMemory("3g").resource("gpu", 2, "testGpus", "nvidia.com")
         treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
 
         def assert_request_contents(exec_reqs, task_reqs):
-            self.assertEqual(len(exec_reqs), 5)
+            self.assertEqual(len(exec_reqs), 6)
             self.assertEqual(exec_reqs["cores"].amount, 2)
             self.assertEqual(exec_reqs["memory"].amount, 6144)
             self.assertEqual(exec_reqs["memoryOverhead"].amount, 1024)
             self.assertEqual(exec_reqs["pyspark.memory"].amount, 2048)
+            self.assertEqual(exec_reqs["offHeap"].amount, 3072)
             self.assertEqual(exec_reqs["gpu"].amount, 2)
             self.assertEqual(exec_reqs["gpu"].discoveryScript, "testGpus")
             self.assertEqual(exec_reqs["gpu"].resourceName, "gpu")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index dc09323..adbbbc0 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -308,7 +308,6 @@ private[yarn] class YarnAllocator(
       if (!rpIdToYarnResource.contains(rp.id)) {
         // Start with the application or default settings
         var heapMem = executorMemory.toLong
-        // Note we currently don't support off heap memory in ResourceProfile - SPARK-30794
         var offHeapMem = executorOffHeapMemory.toLong
         var overheadMem = memoryOverhead.toLong
         var pysparkMem = pysparkWorkerMemory.toLong
@@ -326,6 +325,8 @@ private[yarn] class YarnAllocator(
               overheadMem = execReq.amount
             case ResourceProfile.PYSPARK_MEM =>
               pysparkMem = execReq.amount
+            case ResourceProfile.OFFHEAP_MEM =>
+              offHeapMem = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
             case ResourceProfile.CORES =>
               cores = execReq.amount.toInt
             case "gpu" =>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 9d6b776..fe8990b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
+import org.apache.spark.resource.ExecutorResourceRequest
 import org.apache.spark.util.Utils
 
 object YarnSparkHadoopUtil {
@@ -187,11 +188,27 @@ object YarnSparkHadoopUtil {
    * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
    */
   def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
+    val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
+    checkOffHeapEnabled(sparkConf, sizeInMB).toInt
+  }
+
+  /**
+   * Get offHeap memory size from [[ExecutorResourceRequest]]
+   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+    execRequest: ExecutorResourceRequest): Long = {
+    checkOffHeapEnabled(sparkConf, execRequest.amount)
+  }
+
+  /**
+   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
     if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
-      val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
-      require(sizeInMB > 0,
+      require(offHeapSize > 0,
         s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
-      sizeInMB
+      offHeapSize
     } else {
       0
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org