You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2022/09/30 01:05:06 UTC

[spark] branch master updated: [SPARK-39853][CORE] Support stage level task resource profile for standalone cluster when dynamic allocation disabled

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ab49dc21e43 [SPARK-39853][CORE] Support stage level task resource profile for standalone cluster when dynamic allocation disabled
ab49dc21e43 is described below

commit ab49dc21e43822abef5067f959e474c4c8dcfdff
Author: Tengfei Huang <te...@gmail.com>
AuthorDate: Fri Sep 30 09:04:39 2022 +0800

    [SPARK-39853][CORE] Support stage level task resource profile for standalone cluster when dynamic allocation disabled
    
    ### What changes were proposed in this pull request?
    Currently stage level scheduling works for yarn/k8s/standalone cluster when dynamic allocation is enabled, and spark app will acquire executors with different resource profiles and assign tasks to executors with the same resource profile id.
    
    This PR proposed to add stage level scheduling when dynamic allocation is off. In this case, spark app will only have executors with default resource profiles, but different `Stages` can still customize their task resource requests which should be compatible with default resource profile executor resources. And all these `Stages` with different task resource requests will reuse/share the same set of executors with default resource profile.
    
    And this PR proposed to:
    1. Introduces a new special `ResourceProfile`: `TaskResourceProfile`, it can be used to describe different task resource requests when dynamic allocation is off. And tasks bind to this `TaskResourceProfile` will reuse executors with default resource profile.
    `Exception` should be thrown if executors with default resource profile can not fulfill the task resource requests.
    ```
    class TaskResourceProfile(override val taskResources: Map[String, TaskResourceRequest])
      extends ResourceProfile(
        ResourceProfile.getOrCreateDefaultProfile(SparkEnv.get.conf).executorResources,
        taskResources)
    ```
    2. `DADScheduler` and `TaskScheduler` will schedule tasks with customized `ResourceProfile` based on resource profile type and resource profile Id, taskSets with `TaskResourceProfile` can be scheduled to executors with `DEFAULT_RESOURCE_PROFILE_ID` and other taskSets can be scheduled to executors with exactly same resource profile id.
    
    ### Why are the changes needed?
    When dynamic allocation is disabled, we can also leverage stage level schedule to customize task resource requests for different stages.
    
    ### Does this PR introduce _any_ user-facing change?
    Spark users can specify `TaskResourceProfile` to customize task resource requests for different stages when dynamic allocation is off.
    
    ### How was this patch tested?
    New UTs added.
    
    Closes #37268 from ivoson/stage-schedule-dynamic-off.
    
    Lead-authored-by: Tengfei Huang <te...@gmail.com>
    Co-authored-by: Huang Tengfei <te...@gmail.com>
    Signed-off-by: Yi Wu <yi...@databricks.com>
---
 .../spark/deploy/ApplicationDescription.scala      |  3 +-
 .../spark/deploy/master/ApplicationInfo.scala      |  4 +-
 .../apache/spark/resource/ResourceProfile.scala    | 63 ++++++++++----
 .../spark/resource/ResourceProfileBuilder.scala    |  6 +-
 .../spark/resource/ResourceProfileManager.scala    | 70 +++++++++++-----
 .../org/apache/spark/resource/ResourceUtils.scala  |  6 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  9 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 19 +++--
 .../resource/ResourceProfileManagerSuite.scala     | 37 +++++++++
 .../spark/resource/ResourceProfileSuite.scala      | 65 ++++++++++++++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 19 ++++-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 97 +++++++++++++++++++++-
 docs/configuration.md                              |  4 +-
 docs/spark-standalone.md                           |  4 +-
 14 files changed, 344 insertions(+), 62 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 39c2af01846..67d0d851b60 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy
 import java.net.URI
 
 import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
-import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
@@ -40,7 +39,7 @@ private[spark] case class ApplicationDescription(
   def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
   def resourceReqsPerExecutor: Seq[ResourceRequirement] =
     ResourceUtils.executorResourceRequestToRequirement(
-      getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))
+      defaultProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName))
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index a2926ca64bc..e66933b84af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.deploy.ApplicationDescription
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceUtils}
-import org.apache.spark.resource.ResourceProfile.{getCustomExecutorResources, DEFAULT_RESOURCE_PROFILE_ID}
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
@@ -101,7 +101,7 @@ private[spark] class ApplicationInfo(
         .map(_.toInt)
         .getOrElse(defaultMemoryMbPerExecutor)
       val customResources = ResourceUtils.executorResourceRequestToRequirement(
-        getCustomExecutorResources(resourceProfile).values.toSeq.sortBy(_.resourceName))
+        resourceProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName))
 
       rpIdToResourceDesc(resourceProfile.id) =
         ExecutorResourceDescription(coresPerExecutor, memoryMbPerExecutor, customResources)
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 5e02c61459d..afd612433a7 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException}
 import org.apache.spark.annotation.{Evolving, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -94,6 +94,15 @@ class ResourceProfile(
     executorResources.get(ResourceProfile.MEMORY).map(_.amount)
   }
 
+  private[spark] def getCustomTaskResources(): Map[String, TaskResourceRequest] = {
+    taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
+  }
+
+  protected[spark] def getCustomExecutorResources(): Map[String, ExecutorResourceRequest] = {
+    executorResources.
+      filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
+  }
+
   /*
    * This function takes into account fractional amounts for the task resource requirement.
    * Spark only supports fractional amounts < 1 to basically allow for multiple tasks
@@ -182,8 +191,8 @@ class ResourceProfile(
     val numPartsPerResourceMap = new mutable.HashMap[String, Int]
     numPartsPerResourceMap(ResourceProfile.CORES) = 1
     val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
-    taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
-    val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
+    taskResourcesToCheck ++= this.getCustomTaskResources()
+    val execResourceToCheck = this.getCustomExecutorResources()
     execResourceToCheck.foreach { case (rName, execReq) =>
       val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
       numPartsPerResourceMap(rName) = 1
@@ -242,7 +251,8 @@ class ResourceProfile(
 
   // check that the task resources and executor resources are equal, but id's could be different
   private[spark] def resourcesEqual(rp: ResourceProfile): Boolean = {
-    rp.taskResources == taskResources && rp.executorResources == executorResources
+    rp.taskResources == taskResources && rp.executorResources == executorResources &&
+      rp.getClass == this.getClass
   }
 
   override def hashCode(): Int = Seq(taskResources, executorResources).hashCode()
@@ -253,6 +263,40 @@ class ResourceProfile(
   }
 }
 
+/**
+ * Resource profile which only contains task resources, can be used for stage level task schedule
+ * when dynamic allocation is disabled, tasks will be scheduled to executors with default resource
+ * profile based on task resources described by this task resource profile.
+ * And when dynamic allocation is enabled, will require new executors for this profile based on
+ * the default executor resources requested at startup and assign tasks only on executors created
+ * with this resource profile.
+ *
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ *                      name (e.g., cores, memory, CPU) to its specific request.
+ */
+@Evolving
+@Since("3.4.0")
+private[spark] class TaskResourceProfile(
+    override val taskResources: Map[String, TaskResourceRequest])
+  extends ResourceProfile(Map.empty, taskResources) {
+
+  override protected[spark] def getCustomExecutorResources()
+      : Map[String, ExecutorResourceRequest] = {
+    if (SparkEnv.get == null) {
+      // This will be called in standalone master when dynamic allocation enabled.
+      return super.getCustomExecutorResources()
+    }
+
+    val sparkConf = SparkEnv.get.conf
+    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
+      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+        .getCustomExecutorResources()
+    } else {
+      super.getCustomExecutorResources()
+    }
+  }
+}
+
 object ResourceProfile extends Logging {
   // task resources
   /**
@@ -393,17 +437,6 @@ object ResourceProfile extends Logging {
     }
   }
 
-  private[spark] def getCustomTaskResources(
-      rp: ResourceProfile): Map[String, TaskResourceRequest] = {
-    rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
-  }
-
-  private[spark] def getCustomExecutorResources(
-      rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
-    rp.executorResources.
-      filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
-  }
-
   /*
    * Get the number of cpus per task if its set in the profile, otherwise return the
    * cpus per task for the default profile.
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
index f6b30d32737..584ff32b447 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
@@ -93,7 +93,11 @@ class ResourceProfileBuilder() {
   }
 
   def build(): ResourceProfile = {
-    new ResourceProfile(executorResources, taskResources)
+    if (_executorResources.isEmpty) {
+      new TaskResourceProfile(taskResources)
+    } else {
+      new ResourceProfile(executorResources, taskResources)
+    }
   }
 }
 
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 489d9c3e858..3f48aaded5c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -59,35 +59,67 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
   private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   /**
-   * If we use anything except the default profile, it's only supported on YARN and Kubernetes
-   * with dynamic allocation enabled. Throw an exception if not supported.
+   * If we use anything except the default profile, it's supported on YARN, Kubernetes and
+   * Standalone with dynamic allocation enabled, and task resource profile with dynamic allocation
+   * disabled on Standalone. Throw an exception if not supported.
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
-    val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
-    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
-    // We want the exception to be thrown only when we are specifically testing for the
-    // exception or in a real application. Otherwise in all other testing scenarios we want
-    // to skip throwing the exception so that we can test in other modes to make testing easier.
-    if ((notRunningUnitTests || testExceptionThrown) &&
+    if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
+      if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+        throw new SparkException("TaskResourceProfiles are only supported for Standalone " +
+          "cluster for now when dynamic allocation is disabled.")
+      }
+    } else {
+      val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+      val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+        isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+      val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+        isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
+
+      // We want the exception to be thrown only when we are specifically testing for the
+      // exception or in a real application. Otherwise in all other testing scenarios we want
+      // to skip throwing the exception so that we can test in other modes to make testing easier.
+      if ((notRunningUnitTests || testExceptionThrown) &&
         (notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
           YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
-      throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
-        "and Standalone with dynamic allocation enabled.")
-    }
+        throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
+          "and Standalone with dynamic allocation enabled.")
+      }
 
-    if (isStandalone && rp.getExecutorCores.isEmpty &&
-      sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
-      logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " +
-        "is explicitly set, you may get more executors allocated than expected. It's recommended " +
-        "to set executor cores explicitly. Please check SPARK-30299 for more details.")
+      if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+        sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+        logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " +
+          "is explicitly set, you may get more executors allocated than expected. " +
+          "It's recommended to set executor cores explicitly. " +
+          "Please check SPARK-30299 for more details.")
+      }
     }
 
     true
   }
 
+  /**
+   * Check whether a task with specific taskRpId can be scheduled to executors
+   * with executorRpId.
+   *
+   * Here are the rules:
+   * 1. When dynamic allocation is disabled, only [[TaskResourceProfile]] is supported,
+   *    and tasks with [[TaskResourceProfile]] can be scheduled to executors with default
+   *    resource profile.
+   * 2. For other scenarios(when dynamic allocation is enabled), tasks can be scheduled to
+   *    executors where resource profile exactly matches.
+   */
+  private[spark] def canBeScheduled(taskRpId: Int, executorRpId: Int): Boolean = {
+    assert(resourceProfileIdToResourceProfile.contains(taskRpId) &&
+      resourceProfileIdToResourceProfile.contains(executorRpId),
+      "Tasks and executors must have valid resource profile id")
+    val taskRp = resourceProfileFromId(taskRpId)
+
+    // When dynamic allocation disabled, tasks with TaskResourceProfile can always reuse
+    // all the executors with default resource profile.
+    taskRpId == executorRpId || (!dynamicEnabled && taskRp.isInstanceOf[TaskResourceProfile])
+  }
+
   def addResourceProfile(rp: ResourceProfile): Unit = {
     isSupported(rp)
     var putNewProfile = false
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 58b37269be4..0e18ecf0e51 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -356,7 +356,7 @@ private[spark] object ResourceUtils extends Logging {
     val fileAllocated = parseAllocated(resourcesFileOpt, componentName)
     val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, a.toResourceInformation)).toMap
     // only want to look at the ResourceProfile for resources not in the resources file
-    val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile)
+    val execReq = resourceProfile.getCustomExecutorResources()
     val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) }
     val rpAllocations = filteredExecreq.map { case (rName, execRequest) =>
       val resourceId = new ResourceID(componentName, rName)
@@ -444,8 +444,8 @@ private[spark] object ResourceUtils extends Logging {
         maxTaskPerExec = numTasksPerExecCores
       }
     }
-    val taskReq = ResourceProfile.getCustomTaskResources(rp)
-    val execReq = ResourceProfile.getCustomExecutorResources(rp)
+    val taskReq = rp.getCustomTaskResources()
+    val execReq = rp.getCustomExecutorResources()
 
     if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS)) {
       if ((taskCpus * maxTaskPerExec) < cores) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 475afd01d00..86786e64ced 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -44,7 +44,7 @@ import org.apache.spark.network.shuffle.protocol.MergeStatuses
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile}
 import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
@@ -592,7 +592,12 @@ private[spark] class DAGScheduler(
         if (x.amount > v.amount) x else v).getOrElse(v)
       k -> larger
     }
-    new ResourceProfile(mergedExecReq, mergedTaskReq)
+
+    if (mergedExecReq.isEmpty) {
+      new TaskResourceProfile(mergedTaskReq)
+    } else {
+      new ResourceProfile(mergedExecReq, mergedTaskReq)
+    }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d3e27a94e29..a6735f380f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -388,9 +388,10 @@ private[spark] class TaskSchedulerImpl(
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
       val taskSetRpID = taskSet.taskSet.resourceProfileId
-      // make the resource profile id a hard requirement for now - ie only put tasksets
-      // on executors where resource profile exactly matches.
-      if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+
+      // check whether the task can be scheduled to the executor base on resource profile.
+      if (sc.resourceProfileManager
+        .canBeScheduled(taskSetRpID, shuffledOffers(i).resourceProfileId)) {
         val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, availableCpus(i),
           availableResources(i))
         taskResAssignmentsOpt.foreach { taskResAssignments =>
@@ -463,7 +464,7 @@ private[spark] class TaskSchedulerImpl(
     // check if the ResourceProfile has cpus first since that is common case
     if (availCpus < taskCpus) return None
     // only look at the resource other then cpus
-    val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf)
+    val tsResources = taskSetProf.getCustomTaskResources()
     if (tsResources.isEmpty) return Some(Map.empty)
     val localTaskReqAssign = HashMap[String, ResourceInformation]()
     // we go through all resources here so that we can make sure they match and also get what the
@@ -1222,13 +1223,13 @@ private[spark] object TaskSchedulerImpl {
 
   /**
    * Calculate the max available task slots given the `availableCpus` and `availableResources`
-   * from a collection of ResourceProfiles. And only those ResourceProfiles who has the
-   * same id with the `rpId` can be used to calculate the task slots.
+   * from a collection of ResourceProfiles. And only those ResourceProfiles who can be assigned
+   * tasks with the `rpId` can be used to calculate the task slots.
    *
    * @param scheduler the TaskSchedulerImpl instance
    * @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task
-   * @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id
-   *             with it can be used to calculate the task slots.
+   * @param rpId the ResourceProfile id for the task set. Only those ResourceProfiles who can be
+   *             assigned with the tasks can be used to calculate the task slots.
    * @param availableRPIds an Array of ids of the available ResourceProfiles from the executors.
    * @param availableCpus an Array of the amount of available cpus from the executors.
    * @param availableResources an Array of the resources map from the executors. In the resource
@@ -1257,7 +1258,7 @@ private[spark] object TaskSchedulerImpl {
     val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get
 
     availableCpus.zip(availableResources).zip(availableRPIds)
-      .filter { case (_, id) => id == rpId }
+      .filter { case (_, id) => scheduler.sc.resourceProfileManager.canBeScheduled(rpId, id) }
       .map { case ((cpu, resources), _) =>
         val numTasksPerExecCores = cpu / cpusPerTask
         if (limitedByCpu) {
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index aa008135609..e97d5c7883a 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -116,6 +116,43 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     assert(rpmanager.isSupported(immrprof))
   }
 
+  test("isSupported task resource profiles with dynamic allocation disabled") {
+    val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4)
+    conf.set(DYN_ALLOCATION_ENABLED, false)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+
+    var rpmanager = new ResourceProfileManager(conf, listenerBus)
+    // default profile should always work
+    val defaultProf = rpmanager.defaultResourceProfile
+    assert(rpmanager.isSupported(defaultProf))
+
+    // task resource profile.
+    val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
+    val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
+    assert(rpmanager.isSupported(taskProf))
+
+    conf.setMaster("local")
+    rpmanager = new ResourceProfileManager(conf, listenerBus)
+    val error = intercept[SparkException] {
+      rpmanager.isSupported(taskProf)
+    }.getMessage
+    assert(error === "TaskResourceProfiles are only supported for Standalone " +
+      "cluster for now when dynamic allocation is disabled.")
+  }
+
+  test("isSupported task resource profiles with dynamic allocation enabled") {
+    val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4)
+    conf.set(DYN_ALLOCATION_ENABLED, true)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
+
+    // task resource profile.
+    val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
+    val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
+    assert(rpmanager.isSupported(taskProf))
+  }
+
   test("isSupported with local mode") {
     val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index 6c36f5c8555..d07b85847e7 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.resource
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.mockito.Mockito.when
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
 import org.apache.spark.resource.TestResourceIDs._
 
-class ResourceProfileSuite extends SparkFunSuite {
+class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
 
   override def beforeAll(): Unit = {
     try {
@@ -190,6 +193,33 @@ class ResourceProfileSuite extends SparkFunSuite {
     assert(immrprof.isCoresLimitKnown == true)
   }
 
+  test("tasks and limit resource for task resource profile") {
+    val sparkConf = new SparkConf().setMaster("spark://testing")
+      .set(EXECUTOR_CORES, 2)
+      .set("spark.dynamicAllocation.enabled", "false")
+      .set("spark.executor.resource.gpu.amount", "2")
+      .set("spark.executor.resource.gpu.discoveryScript", "myscript")
+
+    withMockSparkEnv(sparkConf) {
+      val rpBuilder1 = new ResourceProfileBuilder()
+      val rp1 = rpBuilder1
+        .require(new TaskResourceRequests().resource("gpu", 1))
+        .build()
+      assert(rp1.isInstanceOf[TaskResourceProfile])
+      assert(rp1.limitingResource(sparkConf) == ResourceProfile.CPUS)
+      assert(rp1.maxTasksPerExecutor(sparkConf) == 2)
+      assert(rp1.isCoresLimitKnown)
+
+      val rpBuilder2 = new ResourceProfileBuilder()
+      val rp2 = rpBuilder2
+        .require(new TaskResourceRequests().resource("gpu", 2))
+        .build()
+      assert(rp1.isInstanceOf[TaskResourceProfile])
+      assert(rp2.limitingResource(sparkConf) == "gpu")
+      assert(rp2.maxTasksPerExecutor(sparkConf) == 1)
+      assert(rp2.isCoresLimitKnown)
+    }
+  }
 
   test("Create ResourceProfile") {
     val rprof = new ResourceProfileBuilder()
@@ -257,6 +287,22 @@ class ResourceProfileSuite extends SparkFunSuite {
     assert(rprof.resourcesEqual(rprof2), "resource profile resourcesEqual not working")
   }
 
+  test("test TaskResourceProfiles equal") {
+    val rprofBuilder = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    rprofBuilder.require(taskReq)
+    val rprof = rprofBuilder.build()
+
+    val taskReq1 = new TaskResourceRequests().resource("gpu", 1)
+    val rprof1 = new ResourceProfile(Map.empty, taskReq1.requests)
+    assert(!rprof.resourcesEqual(rprof1),
+      "resource profiles having different types should not equal")
+
+    val taskReq2 = new TaskResourceRequests().resource("gpu", 1)
+    val rprof2 = new TaskResourceProfile(taskReq2.requests)
+    assert(rprof.resourcesEqual(rprof2), "task resource profile resourcesEqual not working")
+  }
+
   test("Test ExecutorResourceRequests memory helpers") {
     val rprof = new ResourceProfileBuilder()
     val ereqs = new ExecutorResourceRequests()
@@ -314,7 +360,7 @@ class ResourceProfileSuite extends SparkFunSuite {
     // Update this if new resource type added
     assert(ResourceProfile.allSupportedExecutorResources.size === 5,
       "Executor resources should have 5 supported resources")
-    assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1,
+    assert(rprof.build().getCustomExecutorResources().size === 1,
       "Executor resources should have 1 custom resource")
   }
 
@@ -327,7 +373,18 @@ class ResourceProfileSuite extends SparkFunSuite {
       .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
     rprof.require(taskReq).require(eReq)
 
-    assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1,
+    assert(rprof.build().getCustomTaskResources().size === 1,
       "Task resources should have 1 custom resource")
   }
+
+  private def withMockSparkEnv(conf: SparkConf)(f: => Unit): Unit = {
+    val previousEnv = SparkEnv.get
+    val mockEnv = mock[SparkEnv]
+    when(mockEnv.conf).thenReturn(conf)
+    SparkEnv.set(mockEnv)
+
+    try f finally {
+      SparkEnv.set(previousEnv)
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 847e0622213..19a9af86afb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests
 import org.apache.spark.network.shuffle.ExternalBlockStoreClient
 import org.apache.spark.rdd.{DeterministicLevel, RDD}
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -3424,6 +3424,23 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(mergedRp.getExecutorCores.get == 4)
   }
 
+  test("test merge task resource profiles") {
+    conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
+    // Ensure the initialization of SparkEnv
+    sc
+
+    val treqs1 = new TaskResourceRequests().cpus(1)
+    val rp1 = new TaskResourceProfile(treqs1.requests)
+    val treqs2 = new TaskResourceRequests().cpus(1)
+    val rp2 = new TaskResourceProfile(treqs2.requests)
+    val treqs3 = new TaskResourceRequests().cpus(2)
+    val rp3 = new TaskResourceProfile(treqs3.requests)
+    val mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))
+
+    assert(mergedRp.isInstanceOf[TaskResourceProfile])
+    assert(mergedRp.getTaskCpus.get == 2)
+  }
+
   /**
    * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that
    * getShuffleDependenciesAndResourceProfiles correctly returns the direct shuffle dependencies
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 869a7232437..4e9e9755e85 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -33,7 +33,7 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.internal.config
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.util.{Clock, ManualClock, ThreadUtils}
@@ -1833,6 +1833,101 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
   }
 
+  test("Scheduler works with task resource profiles") {
+    val taskCpus = 1
+    val taskGpus = 1
+    val executorGpus = 4
+    val executorCpus = 4
+
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+
+    val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+    val taskSet = FakeTask.createTaskSet(3)
+    val rpTaskSet = FakeTask.createTaskSet(5, stageId = 1, stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val resources0 = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
+    val resources1 = Map(GPU -> ArrayBuffer("4", "5", "6", "7"))
+
+    val workerOffers =
+      IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0),
+        WorkerOffer("executor1", "host1", 4, None, resources1))
+
+    taskScheduler.submitTasks(taskSet)
+    taskScheduler.submitTasks(rpTaskSet)
+    // should have 3 for default profile and 2 for additional resource profile
+    var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(5 === taskDescriptions.length)
+    var has2Gpus = 0
+    var has1Gpu = 0
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      if (tDesc.resources(GPU).addresses.size == 2) {
+        has2Gpus += 1
+      }
+      if (tDesc.resources(GPU).addresses.size == 1) {
+        has1Gpu += 1
+      }
+    }
+    assert(has2Gpus == 2)
+    assert(has1Gpu == 3)
+
+    val resources3 = Map(GPU -> ArrayBuffer("8", "9", "10", "11"))
+
+    // clear the first 2 worker offers so they don't have any room and add a third
+    // for the resource profile
+    val workerOffers3 = IndexedSeq(
+      WorkerOffer("executor0", "host0", 0, None, Map.empty),
+      WorkerOffer("executor1", "host1", 0, None, Map.empty),
+      WorkerOffer("executor2", "host2", 4, None, resources3))
+    taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten
+    assert(2 === taskDescriptions.length)
+    assert(taskDescriptions.head.resources.contains(GPU))
+    assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
+  }
+
+  test("Calculate available tasks slots for task resource profiles") {
+    val taskCpus = 1
+    val taskGpus = 1
+    val executorGpus = 4
+    val executorCpus = 4
+
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+
+    val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val resources0 = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
+    val resources1 = Map(GPU -> ArrayBuffer("4", "5", "6", "7"))
+
+    val workerOffers =
+      IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0),
+        WorkerOffer("executor1", "host1", 4, None, resources1))
+    val availableResourcesAmount = workerOffers.map(_.resources).map { resourceMap =>
+        // available addresses already takes into account if there are fractional
+        // task resource requests
+        resourceMap.map { case (name, addresses) => (name, addresses.length) }
+      }
+
+    val taskSlotsForRp = TaskSchedulerImpl.calculateAvailableSlots(
+      taskScheduler, taskScheduler.conf, rp.id, workerOffers.map(_.resourceProfileId).toArray,
+      workerOffers.map(_.cores).toArray, availableResourcesAmount.toArray)
+    assert(taskSlotsForRp === 4)
+  }
+
   private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): TaskSchedulerImpl = {
     // one task per host
     val numHosts = numTasks
diff --git a/docs/configuration.md b/docs/configuration.md
index 55e595ad301..ffd36209e2d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3243,9 +3243,9 @@ See your cluster manager specific page for requirements and details on each of -
 # Stage Level Scheduling Overview
 
 The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at th [...]
-This is only available for the RDD API in Scala, Java, and Python.  It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.
+This is only available for the RDD API in Scala, Java, and Python.  It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standa [...]
 
-See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. The current implementation acquires new executors for each `ResourceProfile`  created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one Resour [...]
+See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. While when dynamic allocation is enabled, the current implementation acquires new executors for each `ResourceProfile`  created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the [...]
 
 # Push-based shuffle overview
 
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 559e3bca6c9..b431752f166 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -467,7 +467,9 @@ worker during one single schedule iteration.
 
 # Stage Level Scheduling Overview
 
-Stage level scheduling is supported on Standalone when dynamic allocation is enabled. Currently, when the Master allocates executors for one application, it will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to kee [...]
+Stage level scheduling is supported on Standalone:
+- When dynamic allocation is disabled: It allows users to specify different task resource requirements at the stage level and will use the same executors requested at startup.
+- When dynamic allocation is enabled: Currently, when the Master allocates executors for one application, it will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. For scheduling, we will only take exe [...]
 
 ## Caveats
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org