You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/18 06:53:56 UTC

[spark] branch branch-3.0 updated: [3.0][SPARK-32518][CORE] CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds of resources

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a36514e  [3.0][SPARK-32518][CORE] CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds of resources
a36514e is described below

commit a36514e1fbf6920f0d18a7d8fb690822843eb2a2
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Aug 18 06:50:05 2020 +0000

    [3.0][SPARK-32518][CORE] CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds of resources
    
    ### What changes were proposed in this pull request?
    
    1.  Make `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` considers all kinds of resources when calculating the max concurrent tasks
    
    2. Refactor `calculateAvailableSlots()` to make it be able to be used for both `CoarseGrainedSchedulerBackend` and `TaskSchedulerImpl`
    
    ### Why are the changes needed?
    
    Currently, `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` only considers the CPU for the max concurrent tasks. This can cause the application to hang when a barrier stage requires extra custom resources but the cluster doesn't have enough corresponding resources. Because, without the checking for other custom resources in `maxNumConcurrentTasks`, the barrier stage can be submitted to the `TaskSchedulerImpl`. But the `TaskSchedulerImpl` won't launch tasks for the barrier stage [...]
    
    If the barrier stage doesn't launch all the tasks in one true, the application will fail and suggest user to disable delay scheduling. However, this actually a misleading suggestion since the real root cause is not enough resources.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. In case of a barrier stage requires more custom resources than the cluster has, previously, the application will fail with misleading suggestion of disabling delay scheduling. After this PR, the application will fail with the error message saying not enough resources.
    
    ### How was this patch tested?
    
    Added a unit test.
    
    Closes #29395 from Ngone51/backport-spark-32518.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 10 +++---
 .../org/apache/spark/internal/config/Tests.scala   | 15 +++++++++
 .../scheduler/BarrierJobAllocationFailed.scala     |  4 +--
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 10 +++---
 .../spark/scheduler/ExecutorResourceInfo.scala     |  1 +
 .../apache/spark/scheduler/SchedulerBackend.scala  |  3 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 36 +++++++++++++++++++++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 22 +++++++++++--
 .../spark/BarrierStageOnSubmittedSuite.scala       | 36 ++++++++++++++++++++++
 .../scala/org/apache/spark/SparkContextSuite.scala |  2 ++
 10 files changed, 124 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 65c08cf..66fe1d7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1597,7 +1597,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the resources
+   * could be used, even if some of them are being used at the moment.
    * Note that please don't cache the value returned by this method, because the number can change
    * due to add/remove executors.
    *
@@ -2776,8 +2777,9 @@ object SparkContext extends Logging {
       }
       // some cluster managers don't set the EXECUTOR_CORES config by default (standalone
       // and mesos coarse grained), so we can't rely on that config for those.
-      val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
+      var shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
         (master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))
+      shouldCheckExecCores &= !sc.conf.get(SKIP_VALIDATE_CORES_TESTING)
 
       // Number of cores per executor must meet at least one task requirement.
       if (shouldCheckExecCores && execCores < taskCores) {
@@ -2833,7 +2835,7 @@ object SparkContext extends Logging {
           limitingResourceName = taskReq.resourceName
         }
       }
-      if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
+      if(!shouldCheckExecCores) {
         // if we can't rely on the executor cores config throw a warning for user
         logWarning("Please ensure that the number of slots available on your " +
           "executors is limited by the number of cores to task cpus and not another " +
@@ -2857,7 +2859,7 @@ object SparkContext extends Logging {
             s"result in wasted resources due to resource ${limitingResourceName} limiting the " +
             s"number of runnable tasks per executor to: ${numSlots}. Please adjust " +
             s"your configuration."
-          if (Utils.isTesting) {
+          if (sc.conf.get(RESOURCES_WARNING_TESTING)) {
             throw new SparkException(message)
           } else {
             logWarning(message)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
index 232264d6..e328ed0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -61,4 +61,19 @@ private[spark] object Tests {
     .version("3.0.0")
     .intConf
     .createWithDefault(2)
+
+  val RESOURCES_WARNING_TESTING = ConfigBuilder("spark.resources.warnings.testing")
+    .version("3.0.1")
+    .booleanConf
+    .createWithDefault(false)
+
+  // This configuration is used for unit tests to allow skipping the task cpus to cores validation
+  // to allow emulating standalone mode behavior while running in local mode. Standalone mode
+  // by default doesn't specify a number of executor cores, it just uses all the ones available
+  // on the host.
+  val SKIP_VALIDATE_CORES_TESTING =
+  ConfigBuilder("spark.testing.skipValidateCores")
+    .version("3.0.1")
+    .booleanConf
+    .createWithDefault(false)
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
index 2274e68..043c6b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
@@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed {
   val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
     "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
       "more slots than the total number of slots in the cluster currently. Please init a new " +
-      "cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
-      "slots required to run this barrier stage."
+      "cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " +
+      "the number of slots required to run this barrier stage."
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 51445bf..b483b52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -453,10 +453,12 @@ private[spark] class DAGScheduler(
    * submission.
    */
   private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
-    val numPartitions = rdd.getNumPartitions
-    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
-    if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
-      throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
+    if (rdd.isBarrier()) {
+      val numPartitions = rdd.getNumPartitions
+      val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
+      if (numPartitions > maxNumConcurrentTasks) {
+        throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
+      }
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
index fd04db8..508c6ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
@@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo(
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
   override protected def slotsPerAddress: Int = numParts
+  def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 9159d2a..7b76af2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -77,7 +77,8 @@ private[spark] trait SchedulerBackend {
   def getDriverAttributes: Option[Map[String, String]] = None
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the resources
+   * could be used, even if some of them are being used at the moment.
    * Note that please don't cache the value returned by this method, because the number can change
    * due to add/remove executors.
    *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 97125f6..46641e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -446,7 +446,17 @@ private[spark] class TaskSchedulerImpl(
     // of locality levels so that it gets a chance to launch local tasks on all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
     for (taskSet <- sortedTaskSets) {
-      val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
+      // we only need to calculate available slots if using barrier scheduling, otherwise the
+      // value is -1
+      val availableSlots = if (taskSet.isBarrier) {
+        val availableResourcesAmount = availableResources.map { resourceMap =>
+          // note that the addresses here have been expanded according to the numParts
+          resourceMap.map { case (name, addresses) => (name, addresses.length) }
+        }
+        calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
+      } else {
+        -1
+      }
       // Skip the barrier taskSet if the available slots are less than the number of pending tasks.
       if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
         // Skip the launch process.
@@ -934,6 +944,30 @@ private[spark] object TaskSchedulerImpl {
   val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key
 
   /**
+   * Calculate the max available task slots given the `availableCpus` and `availableResources`
+   * from a collection of executors.
+   *
+   * @param scheduler the TaskSchedulerImpl instance
+   * @param availableCpus an Array of the amount of available cpus from the executors.
+   * @param availableResources an Array of the resources map from the executors. In the resource
+   *                           map, it maps from the resource name to its amount.
+   * @return the number of max task slots
+   */
+  def calculateAvailableSlots(
+      scheduler: TaskSchedulerImpl,
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Int]]): Int = {
+    val cpusPerTask = scheduler.CPUS_PER_TASK
+    val resourcesReqsPerTask = scheduler.resourcesReqsPerTask
+    availableCpus.zip(availableResources).map { case (cpu, resources) =>
+      val cpuNum = cpu / cpusPerTask
+      resourcesReqsPerTask.map { req =>
+        resources.get(req.resourceName).map(_ / req.amount).getOrElse(0)
+      }.reduceOption(Math.min).map(_.min(cpuNum)).getOrElse(cpuNum)
+    }.sum
+  }
+
+  /**
    * Used to balance containers across hosts.
    *
    * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9d8fb8f..8b55e2c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -563,10 +563,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       !executorsPendingLossReason.contains(id)
   }
 
+  /**
+   * Get the max number of tasks that can be concurrent launched based on the resources
+   * could be used, even if some of them are being used at the moment.
+   * Note that please don't cache the value returned by this method, because the number can change
+   * due to add/remove executors.
+   *
+   * @return The max number of tasks that can be concurrent launched currently.
+   */
   override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+    val (cpus, resources) = {
+      executorDataMap
+        .filter { case (id, _) => isExecutorActive(id) }
+        .values.toArray.map { executor =>
+        (
+          executor.totalCores,
+          executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) }
+        )
+      }.unzip
+    }
+    TaskSchedulerImpl.calculateAvailableSlots(scheduler, cpus, resources)
   }
 
   // this function is for testing only
diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index 435b927..7052d1a 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -19,9 +19,12 @@ package org.apache.spark
 
 import scala.concurrent.duration._
 
+import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID}
 import org.apache.spark.scheduler.BarrierJobAllocationFailed._
+import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
     testSubmitJob(sc, rdd,
       message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
   }
+
+  test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " +
+    "consider all kinds of resources for the barrier stage") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")
+
+      val conf = new SparkConf()
+        .setMaster("local-cluster[1, 2, 1024]")
+        .setAppName("test-cluster")
+        .set(WORKER_GPU_ID.amountConf, "1")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "1")
+        .set(TASK_GPU_ID.amountConf, "1")
+        // disable barrier stage retry to fail the application as soon as possible
+        .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
+        // disable the check to simulate the behavior of Standalone in order to
+        // reproduce the issue.
+        .set(Tests.SKIP_VALIDATE_CORES_TESTING, true)
+      sc = new SparkContext(conf)
+      // setup an executor which will have 2 CPUs and 1 GPU
+      TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+
+      val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
+        sc.parallelize(Range(1, 10), 2)
+          .barrier()
+          .mapPartitions { iter => iter }
+          .collect()
+      }
+      assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " +
+        "mode does not allow run a barrier stage that requires more slots"))
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index ce437a5..dc1c045 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.TestUtils._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.resource.ResourceAllocation
 import org.apache.spark.resource.ResourceUtils._
@@ -890,6 +891,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       .setAppName("test-cluster")
     conf.set(TASK_GPU_ID.amountConf, "2")
     conf.set(EXECUTOR_GPU_ID.amountConf, "4")
+    conf.set(RESOURCES_WARNING_TESTING, true)
 
     var error = intercept[SparkException] {
       sc = new SparkContext(conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org