You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/01/10 14:32:51 UTC

[spark] branch master updated: [SPARK-30448][CORE] accelerator aware scheduling enforce cores as limiting resource

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d6532c7  [SPARK-30448][CORE] accelerator aware scheduling enforce cores as limiting resource
d6532c7 is described below

commit d6532c7079f22f32e90e1c69c25bdfab51c7c53e
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Fri Jan 10 08:32:28 2020 -0600

    [SPARK-30448][CORE] accelerator aware scheduling enforce cores as limiting resource
    
    ### What changes were proposed in this pull request?
    
    This PR is to make sure cores is the limiting resource when using accelerator aware scheduling and fix a few issues with SparkContext.checkResourcesPerTask
    
    For the first version of accelerator aware scheduling(SPARK-27495), the SPIP had a condition that we can support dynamic allocation because we were going to have a strict requirement that we don't waste any resources. This means that the number of slots each executor has could be calculated from the number of cores and task cpus just as is done today.
    
    Somewhere along the line of development we relaxed that and only warn when we are wasting resources. This breaks the dynamic allocation logic if the limiting resource is no longer the cores because its using the cores and task cpus to calculate the number of executors it needs.  This means we will request less executors then we really need to run everything. We have to enforce that cores is always the limiting resource so we should throw if its not.
    
    The only issue with us enforcing this is on cluster managers (standalone and mesos coarse grained) where we don't know the executor cores up front by default. Meaning the spark.executor.cores config defaults to 1 but when the executor is started by default it gets all the cores of the Worker. So we have to add logic specifically to handle that and we can't enforce this requirements, we can just warn when dynamic allocation is enabled for those.
    
    ### Why are the changes needed?
    
    Bug in dynamic allocation if cores is not limiting resource and warnings not correct.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Unit test added and manually tested the confiditions on local mode, local cluster mode, standalone mode, and yarn.
    
    Closes #27138 from tgravescs/SPARK-30446.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 39 +++++++++++++++++-----
 .../scala/org/apache/spark/SparkContextSuite.scala | 22 ++++++++++--
 .../CoarseGrainedSchedulerBackendSuite.scala       |  2 +-
 3 files changed, 51 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 94a0ce7..3262631 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2779,9 +2779,13 @@ object SparkContext extends Logging {
       } else {
         executorCores.get
       }
+      // some cluster managers don't set the EXECUTOR_CORES config by default (standalone
+      // and mesos coarse grained), so we can't rely on that config for those.
+      val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
+        (master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))
 
       // Number of cores per executor must meet at least one task requirement.
-      if (execCores < taskCores) {
+      if (shouldCheckExecCores && execCores < taskCores) {
         throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
           s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on $master.")
       }
@@ -2789,11 +2793,14 @@ object SparkContext extends Logging {
       // Calculate the max slots each executor can provide based on resources available on each
       // executor and resources required by each task.
       val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
-      val executorResourcesAndAmounts =
-        parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
+      val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
           .map(request => (request.id.resourceName, request.amount)).toMap
-      var numSlots = execCores / taskCores
-      var limitingResourceName = "CPU"
+
+      var (numSlots, limitingResourceName) = if (shouldCheckExecCores) {
+        (execCores / taskCores, "CPU")
+      } else {
+        (-1, "")
+      }
 
       taskResourceRequirements.foreach { taskReq =>
         // Make sure the executor resources were specified through config.
@@ -2818,12 +2825,28 @@ object SparkContext extends Logging {
         // multiple executor resources.
         val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
         if (resourceNumSlots < numSlots) {
+          if (shouldCheckExecCores) {
+            throw new IllegalArgumentException("The number of slots on an executor has to be " +
+              "limited by the number of cores, otherwise you waste resources and " +
+              "dynamic allocation doesn't work properly. Your configuration has " +
+              s"core/task cpu slots = ${numSlots} and " +
+              s"${taskReq.resourceName} = ${resourceNumSlots}. " +
+              "Please adjust your configuration so that all resources require same number " +
+              "of executor slots.")
+          }
           numSlots = resourceNumSlots
           limitingResourceName = taskReq.resourceName
         }
       }
-      // There have been checks above to make sure the executor resources were specified and are
-      // large enough if any task resources were specified.
+      if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
+        // if we can't rely on the executor cores config throw a warning for user
+        logWarning("Please ensure that the number of slots available on your " +
+          "executors is limited by the number of cores to task cpus and not another " +
+          "custom resource. If cores is not the limiting resource then dynamic " +
+          "allocation will not work properly!")
+      }
+      // warn if we would waste any resources due to another resource limiting the number of
+      // slots on an executor
       taskResourceRequirements.foreach { taskReq =>
         val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
         if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
@@ -2832,7 +2855,7 @@ object SparkContext extends Logging {
           } else {
             s"${taskReq.amount}"
           }
-          val resourceNumSlots = Math.floor(execAmount * taskReq.numParts/taskReq.amount).toInt
+          val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
           val message = s"The configuration of resource: ${taskReq.resourceName} " +
             s"(exec = ${execAmount}, task = ${taskReqStr}, " +
             s"runnable tasks = ${resourceNumSlots}) will " +
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index fe0a80f..df9c7c5 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -898,8 +898,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
 
     assert(error.contains(
       "The configuration of resource: gpu (exec = 4, task = 2, runnable tasks = 2) will result " +
-      "in wasted resources due to resource CPU limiting the number of runnable tasks per " +
-      "executor to: 1. Please adjust your configuration."))
+        "in wasted resources due to resource CPU limiting the number of runnable tasks per " +
+        "executor to: 1. Please adjust your configuration."))
+  }
+
+  test("Parse resources executor config cpus not limiting resource") {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[1, 8, 1024]")
+      .setAppName("test-cluster")
+    conf.set(TASK_GPU_ID.amountConf, "2")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "4")
+
+    var error = intercept[IllegalArgumentException] {
+      sc = new SparkContext(conf)
+    }.getMessage()
+
+    assert(error.contains("The number of slots on an executor has to be " +
+      "limited by the number of cores, otherwise you waste resources and " +
+      "dynamic allocation doesn't work properly"))
   }
 
   test("test resource scheduling under local-cluster mode") {
@@ -911,7 +927,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
         """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", "8"]}""")
 
       val conf = new SparkConf()
-        .setMaster("local-cluster[3, 3, 1024]")
+        .setMaster("local-cluster[3, 1, 1024]")
         .setAppName("test-cluster")
         .set(WORKER_GPU_ID.amountConf, "3")
         .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 8a16ae6..f916f63 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -187,7 +187,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     import TestUtils._
 
     val conf = new SparkConf()
-      .set(EXECUTOR_CORES, 3)
+      .set(EXECUTOR_CORES, 1)
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
       .setMaster(
       "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org