You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/01/17 14:16:31 UTC

[spark] branch master updated: [SPARK-29306][CORE] Stage Level Sched: Executors need to track what ResourceProfile they are created with

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dbfa2b  [SPARK-29306][CORE] Stage Level Sched: Executors need to track what ResourceProfile they are created with
6dbfa2b is described below

commit 6dbfa2bb9c5215aab97ec3f86b3325a11a7ff4d1
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Fri Jan 17 08:15:25 2020 -0600

    [SPARK-29306][CORE] Stage Level Sched: Executors need to track what ResourceProfile they are created with
    
    ### What changes were proposed in this pull request?
    
    This is the second PR for the Stage Level Scheduling. This is adding in the necessary executor side changes:
    1) executors to know what ResourceProfile they should be using
    2) handle parsing the resource profile settings - these are not in the global configs
    3) then reporting back to the driver what resource profile it was started with.
    
    This PR adds all the piping for YARN to pass the information all the way to executors, but it just uses the default ResourceProfile (which is the global applicatino level configs).
    
    At a high level these changes include:
    1) adding a new --resourceProfileId option to the CoarseGrainedExecutorBackend
    2) Add the ResourceProfile settings to new internal confs that gets passed into the Executor
    3) Executor changes that use the resource profile id passed in to read the corresponding ResourceProfile confs and then parse those requests and discover resources as necessary
    4) Executor registers to Driver with the Resource profile id so that the ExecutorMonitor can track how many executor with each profile are running
    5) YARN side changes to show that passing the resource profile id and confs actually works. Just uses the DefaultResourceProfile for now.
    
    I also removed a check from the CoarseGrainedExecutorBackend that used to check to make sure there were task requirements before parsing any custom resource executor requests.  With the resource profiles this becomes much more expensive because we would then have to pass the task requests to each executor and the check was just a short cut and not really needed. It was much cleaner just to remove it.
    
    Note there were some changes to the ResourceProfile, ExecutorResourceRequests, and TaskResourceRequests in this PR as well because I discovered some issues with things not being immutable. That api now look like:
    
    val rpBuilder = new ResourceProfileBuilder()
    val ereq = new ExecutorResourceRequests()
    val treq = new TaskResourceRequests()
    
    ereq.cores(2).memory("6g").memoryOverhead("2g").pysparkMemory("2g").resource("gpu", 2, "/home/tgraves/getGpus")
    treq.cpus(2).resource("gpu", 2)
    
    val resourceProfile = rpBuilder.require(ereq).require(treq).build
    
    This makes is so that ResourceProfile is immutable and Spark can use it directly without worrying about the user changing it.
    
    ### Why are the changes needed?
    
    These changes are needed for the executor to report which ResourceProfile they are using so that ultimately the dynamic allocation manager can use that information to know how many with a profile are running and how many more it needs to request.  Its also needed to get the resource profile confs to the executor so that it can run the appropriate discovery script if needed.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests and manually on YARN.
    
    Closes #26682 from tgravescs/SPARK-29306.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../executor/CoarseGrainedExecutorBackend.scala    |  56 ++++----
 .../spark/resource/ExecutorResourceRequest.scala   |  39 +++--
 .../spark/resource/ExecutorResourceRequests.scala  |  37 +++--
 .../apache/spark/resource/ResourceProfile.scala    | 160 +++++++++++++--------
 .../spark/resource/ResourceProfileBuilder.scala    |  84 +++++++++++
 .../org/apache/spark/resource/ResourceUtils.scala  | 113 ++++++++++++---
 .../spark/resource/TaskResourceRequest.scala       |  20 ++-
 .../spark/resource/TaskResourceRequests.scala      |  25 ++--
 .../cluster/CoarseGrainedClusterMessage.scala      |  10 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  21 ++-
 .../spark/scheduler/cluster/ExecutorData.scala     |   7 +-
 .../spark/scheduler/cluster/ExecutorInfo.scala     |  32 +++--
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala |  60 ++++++--
 .../spark/resource/JavaResourceProfileSuite.java   |   2 +-
 .../spark/ExecutorAllocationManagerSuite.scala     |   6 +-
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |   5 +-
 .../scala/org/apache/spark/LocalSparkContext.scala |   3 +
 .../deploy/StandaloneDynamicAllocationSuite.scala  |   5 +-
 .../CoarseGrainedExecutorBackendSuite.scala        | 130 +++++++++++------
 .../spark/resource/ResourceProfileSuite.scala      | 114 ++++++++-------
 .../apache/spark/resource/ResourceUtilsSuite.scala |  37 ++++-
 .../CoarseGrainedSchedulerBackendSuite.scala       |  24 ++--
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  |  57 +++++---
 project/MimaExcludes.scala                         |   8 ++
 .../MesosCoarseGrainedSchedulerBackendSuite.scala  |   3 +-
 .../spark/deploy/yarn/ApplicationMaster.scala      |   4 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala       |  11 +-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |   4 +-
 .../YarnCoarseGrainedExecutorBackend.scala         |  13 +-
 29 files changed, 755 insertions(+), 335 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 1fe901a..f56e7c6 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -35,6 +35,8 @@ import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.ResourceProfile._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
@@ -51,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     cores: Int,
     userClassPath: Seq[URL],
     env: SparkEnv,
-    resourcesFileOpt: Option[String])
+    resourcesFileOpt: Option[String],
+    resourceProfile: ResourceProfile)
   extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
 
   import CoarseGrainedExecutorBackend._
@@ -80,7 +83,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
       ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
-        extractAttributes, resources))
+        extractAttributes, resources, resourceProfile.id))
     }(ThreadUtils.sameThread).onComplete {
       case Success(_) =>
         self.send(RegisteredExecutor)
@@ -91,24 +94,13 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   // visible for testing
   def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = {
-    // only parse the resources if a task requires them
-    val resourceInfo = if (parseResourceRequirements(env.conf, SPARK_TASK_PREFIX).nonEmpty) {
-      val resources = getOrDiscoverAllResources(env.conf, SPARK_EXECUTOR_PREFIX, resourcesFileOpt)
-      if (resources.isEmpty) {
-        throw new SparkException("User specified resources per task via: " +
-          s"$SPARK_TASK_PREFIX, but can't find any resources available on the executor.")
-      } else {
-        logResourceInfo(SPARK_EXECUTOR_PREFIX, resources)
-      }
-      resources
-    } else {
-      if (resourcesFileOpt.nonEmpty) {
-        logWarning("A resources file was specified but the application is not configured " +
-          s"to use any resources, see the configs with prefix: ${SPARK_TASK_PREFIX}")
-      }
-      Map.empty[String, ResourceInformation]
-    }
-    resourceInfo
+    logDebug(s"Resource profile id is: ${resourceProfile.id}")
+    val resources = getOrDiscoverAllResourcesForResourceProfile(
+      resourcesFileOpt,
+      SPARK_EXECUTOR_PREFIX,
+      resourceProfile)
+    logResourceInfo(SPARK_EXECUTOR_PREFIX, resources)
+    resources
   }
 
   def extractLogUrls: Map[String, String] = {
@@ -237,14 +229,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       appId: String,
       workerUrl: Option[String],
       userClassPath: mutable.ListBuffer[URL],
-      resourcesFileOpt: Option[String])
+      resourcesFileOpt: Option[String],
+      resourceProfileId: Int)
 
   def main(args: Array[String]): Unit = {
-    val createFn: (RpcEnv, Arguments, SparkEnv) =>
-      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
+    val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
+      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
       new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
         arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
-        arguments.resourcesFileOpt)
+        arguments.resourcesFileOpt, resourceProfile)
     }
     run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
     System.exit(0)
@@ -252,7 +245,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
   def run(
       arguments: Arguments,
-      backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = {
+      backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
+        CoarseGrainedExecutorBackend): Unit = {
 
     Utils.initDaemon(log)
 
@@ -284,7 +278,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         }
       }
 
-      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
+      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
       val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
       fetcher.shutdown()
 
@@ -307,7 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
         arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
 
-      env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
+      env.rpcEnv.setupEndpoint("Executor",
+        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
       arguments.workerUrl.foreach { url =>
         env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
       }
@@ -325,6 +320,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     var appId: String = null
     var workerUrl: Option[String] = None
     val userClassPath = new mutable.ListBuffer[URL]()
+    var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
 
     var argv = args.toList
     while (!argv.isEmpty) {
@@ -357,6 +353,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         case ("--user-class-path") :: value :: tail =>
           userClassPath += new URL(value)
           argv = tail
+        case ("--resourceProfileId") :: value :: tail =>
+          resourceProfileId = value.toInt
+          argv = tail
         case Nil =>
         case tail =>
           // scalastyle:off println
@@ -380,7 +379,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     }
 
     Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
-      userClassPath, resourcesFileOpt)
+      userClassPath, resourcesFileOpt, resourceProfileId)
   }
 
   private def printUsageAndExit(classNameForEntry: String): Unit = {
@@ -399,6 +398,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       |   --app-id <appid>
       |   --worker-url <workerUrl>
       |   --user-class-path <url>
+      |   --resourceProfileId <id>
       |""".stripMargin)
     // scalastyle:on println
     System.exit(1)
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
index 88ceaad..9a92091 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
@@ -17,10 +17,6 @@
 
 package org.apache.spark.resource
 
-import scala.collection.mutable
-
-import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
-
 /**
  * An Executor resource request. This is used in conjunction with the ResourceProfile to
  * programmatically specify the resources needed for an RDD that will be applied at the
@@ -28,16 +24,13 @@ import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
  *
  * This is used to specify what the resource requirements are for an Executor and how
  * Spark can find out specific details about those resources. Not all the parameters are
- * required for every resource type. The resources names supported
- * correspond to the regular Spark configs with the prefix removed. For instance overhead
- * memory in this api is memoryOverhead, which is spark.executor.memoryOverhead with
- * spark.executor removed. Resources like GPUs are resource.gpu
- * (spark configs spark.executor.resource.gpu.*). The amount, discoveryScript, and vendor
- * parameters for resources are all the same parameters a user would specify through the
+ * required for every resource type. Resources like GPUs are supported and have same limitations
+ * as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript,
+ * and vendor parameters for resources are all the same parameters a user would specify through the
  * configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}.
  *
  * For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has
- * to specify the resource name (resource.gpu), the amount or number of GPUs per Executor,
+ * to specify the resource name (gpu), the amount or number of GPUs per Executor,
  * the discovery script would be specified so that when the Executor starts up it can
  * discovery what GPU addresses are available for it to use because YARN doesn't tell
  * Spark that, then vendor would not be used because its specific for Kubernetes.
@@ -63,15 +56,21 @@ private[spark] class ExecutorResourceRequest(
     val discoveryScript: String = "",
     val vendor: String = "") extends Serializable {
 
-  // A list of allowed Spark internal resources. Custom resources (spark.executor.resource.*)
-  // like GPUs/FPGAs are also allowed, see the check below.
-  private val allowedExecutorResources = mutable.HashSet[String](
-    ResourceProfile.MEMORY,
-    ResourceProfile.OVERHEAD_MEM,
-    ResourceProfile.PYSPARK_MEM,
-    ResourceProfile.CORES)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: ExecutorResourceRequest =>
+        that.getClass == this.getClass &&
+          that.resourceName == resourceName && that.amount == amount &&
+        that.discoveryScript == discoveryScript && that.vendor == vendor
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int =
+    Seq(resourceName, amount, discoveryScript, vendor).hashCode()
 
-  if (!allowedExecutorResources.contains(resourceName) && !resourceName.startsWith(RESOURCE_DOT)) {
-    throw new IllegalArgumentException(s"Executor resource not allowed: $resourceName")
+  override def toString(): String = {
+    s"name: $resourceName, amount: $amount, script: $discoveryScript, vendor: $vendor"
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index 6ffcc0c..d345674 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.resource
 
-import scala.collection.mutable
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.resource.ResourceProfile._
@@ -32,9 +34,9 @@ import org.apache.spark.resource.ResourceProfile._
  */
 private[spark] class ExecutorResourceRequests() extends Serializable {
 
-  private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
+  private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
 
-  def requests: Map[String, ExecutorResourceRequest] = _executorResources.toMap
+  def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
 
   /**
    * Specify heap memory. The value specified will be converted to MiB.
@@ -44,8 +46,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
    */
   def memory(amount: String): this.type = {
     val amountMiB = JavaUtils.byteStringAsMb(amount)
-    val rr = new ExecutorResourceRequest(MEMORY, amountMiB)
-    _executorResources(MEMORY) = rr
+    val req = new ExecutorResourceRequest(MEMORY, amountMiB)
+    _executorResources.put(MEMORY, req)
     this
   }
 
@@ -57,8 +59,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
    */
   def memoryOverhead(amount: String): this.type = {
     val amountMiB = JavaUtils.byteStringAsMb(amount)
-    val rr = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB)
-    _executorResources(OVERHEAD_MEM) = rr
+    val req = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB)
+    _executorResources.put(OVERHEAD_MEM, req)
     this
   }
 
@@ -70,8 +72,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
    */
   def pysparkMemory(amount: String): this.type = {
     val amountMiB = JavaUtils.byteStringAsMb(amount)
-    val rr = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB)
-    _executorResources(PYSPARK_MEM) = rr
+    val req = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB)
+    _executorResources.put(PYSPARK_MEM, req)
     this
   }
 
@@ -81,15 +83,17 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
    * @param amount Number of cores to allocate per Executor.
    */
   def cores(amount: Int): this.type = {
-    val t = new ExecutorResourceRequest(CORES, amount)
-    _executorResources(CORES) = t
+    val req = new ExecutorResourceRequest(CORES, amount)
+    _executorResources.put(CORES, req)
     this
   }
 
   /**
    *  Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
    *  correspond to the regular Spark configs with the prefix removed. For instance, resources
-   *  like GPUs are resource.gpu (spark configs spark.executor.resource.gpu.*)
+   *  like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
+   *  that the cluster manager doesn't support the result is undefined, it may error or may just
+   *  be ignored.
    *
    * @param resourceName Name of the resource.
    * @param amount amount of that resource per executor to use.
@@ -106,13 +110,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
       vendor: String = ""): this.type = {
     // a bit weird but for Java api use empty string as meaning None because empty
     // string is otherwise invalid for those paramters anyway
-    val eReq = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
-    _executorResources(resourceName) = eReq
-    this
-  }
-
-  def addRequest(ereq: ExecutorResourceRequest): this.type = {
-    _executorResources(ereq.resourceName) = ereq
+    val req = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
+    _executorResources.put(resourceName, req)
     this
   }
 
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 876a655..eb713a2 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -18,130 +18,164 @@
 package org.apache.spark.resource
 
 import java.util.{Map => JMap}
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX
+import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
 
 /**
  * Resource profile to associate with an RDD. A ResourceProfile allows the user to
  * specify executor and task requirements for an RDD that will get applied during a
  * stage. This allows the user to change the resource requirements between stages.
- *
- * This class is private now for initial development, once we have the feature in place
- * this will become public.
+ * This is meant to be immutable so user can't change it after building.
  */
 @Evolving
-private[spark] class ResourceProfile() extends Serializable {
+class ResourceProfile(
+    val executorResources: Map[String, ExecutorResourceRequest],
+    val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {
 
-  private val _id = ResourceProfile.getNextProfileId
-  private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
-  private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
+  // _id is only a var for testing purposes
+  private var _id = ResourceProfile.getNextProfileId
 
   def id: Int = _id
-  def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap
-  def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap
 
   /**
    * (Java-specific) gets a Java Map of resources to TaskResourceRequest
    */
-  def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava
+  def taskResourcesJMap: JMap[String, TaskResourceRequest] = taskResources.asJava
 
   /**
    * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
    */
-  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava
+  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
+    executorResources.asJava
+  }
+
+  // Note that some cluster managers don't set the executor cores explicitly so
+  // be sure to check the Option as required
+  private[spark] def getExecutorCores: Option[Int] = {
+    executorResources.get(ResourceProfile.CORES).map(_.amount.toInt)
+  }
 
-  def reset(): Unit = {
-    _taskResources.clear()
-    _executorResources.clear()
+  private[spark] def getTaskCpus: Option[Int] = {
+    taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
   }
 
-  def require(requests: ExecutorResourceRequests): this.type = {
-    _executorResources ++= requests.requests
-    this
+  // testing only
+  private[spark] def setToDefaultProfile(): Unit = {
+    _id = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
   }
 
-  def require(requests: TaskResourceRequests): this.type = {
-    _taskResources ++= requests.requests
-    this
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: ResourceProfile =>
+        that.getClass == this.getClass && that.id == _id &&
+          that.taskResources == taskResources && that.executorResources == executorResources
+      case _ =>
+        false
+    }
   }
 
+  override def hashCode(): Int = Seq(taskResources, executorResources).hashCode()
+
   override def toString(): String = {
-    s"Profile: id = ${_id}, executor resources: ${_executorResources}, " +
-      s"task resources: ${_taskResources}"
+    s"Profile: id = ${_id}, executor resources: ${executorResources.mkString(",")}, " +
+      s"task resources: ${taskResources.mkString(",")}"
   }
 }
 
-private[spark] object ResourceProfile extends Logging {
-  val UNKNOWN_RESOURCE_PROFILE_ID = -1
-  val DEFAULT_RESOURCE_PROFILE_ID = 0
-
+object ResourceProfile extends Logging {
+  // task resources
   val CPUS = "cpus"
+  // Executor resources
   val CORES = "cores"
   val MEMORY = "memory"
   val OVERHEAD_MEM = "memoryOverhead"
   val PYSPARK_MEM = "pyspark.memory"
 
+  // all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
+  val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM)
+
+  val UNKNOWN_RESOURCE_PROFILE_ID = -1
+  val DEFAULT_RESOURCE_PROFILE_ID = 0
+
   private lazy val nextProfileId = new AtomicInteger(0)
+  private val DEFAULT_PROFILE_LOCK = new Object()
 
   // The default resource profile uses the application level configs.
-  // Create the default profile immediately to get ID 0, its initialized later when fetched.
-  private val defaultProfileRef: AtomicReference[ResourceProfile] =
-    new AtomicReference[ResourceProfile](new ResourceProfile())
-
-  assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID,
-    s"Default Profile must have the default profile id: $DEFAULT_RESOURCE_PROFILE_ID")
-
-  def getNextProfileId: Int = nextProfileId.getAndIncrement()
-
-  def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
-    val defaultProf = defaultProfileRef.get()
-    // check to see if the default profile was initialized yet
-    if (defaultProf.executorResources == Map.empty) {
-      synchronized {
-        val prof = defaultProfileRef.get()
-        if (prof.executorResources == Map.empty) {
-          addDefaultTaskResources(prof, conf)
-          addDefaultExecutorResources(prof, conf)
-        }
-        prof
+  // var so that it can be reset for testing purposes.
+  @GuardedBy("DEFAULT_PROFILE_LOCK")
+  private var defaultProfile: Option[ResourceProfile] = None
+
+  private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement()
+
+  private[spark] def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
+    DEFAULT_PROFILE_LOCK.synchronized {
+      defaultProfile match {
+        case Some(prof) => prof
+        case None =>
+          val taskResources = getDefaultTaskResources(conf)
+          val executorResources = getDefaultExecutorResources(conf)
+          val defProf = new ResourceProfile(executorResources, taskResources)
+          defProf.setToDefaultProfile
+          defaultProfile = Some(defProf)
+          logInfo("Default ResourceProfile created, executor resources: " +
+            s"${defProf.executorResources}, task resources: " +
+            s"${defProf.taskResources}")
+          defProf
       }
-    } else {
-      defaultProf
     }
   }
 
-  private def addDefaultTaskResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
+  private def getDefaultTaskResources(conf: SparkConf): Map[String, TaskResourceRequest] = {
     val cpusPerTask = conf.get(CPUS_PER_TASK)
     val treqs = new TaskResourceRequests().cpus(cpusPerTask)
-    val taskReq = ResourceUtils.parseResourceRequirements(conf, SPARK_TASK_PREFIX)
-    taskReq.foreach { req =>
-      val name = s"${RESOURCE_PREFIX}.${req.resourceName}"
-      treqs.resource(name, req.amount)
-    }
-    rprof.require(treqs)
+    ResourceUtils.addTaskResourceRequests(conf, treqs)
+    treqs.requests
   }
 
-  private def addDefaultExecutorResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
+  private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
     ereqs.cores(conf.get(EXECUTOR_CORES))
     ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
+    conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString))
+    conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString))
     val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
     execReq.foreach { req =>
-      val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}"
+      val name = req.id.resourceName
       ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""),
         req.vendor.getOrElse(""))
     }
-    rprof.require(ereqs)
+    ereqs.requests
   }
 
-  // for testing purposes
-  def resetDefaultProfile(conf: SparkConf): Unit = getOrCreateDefaultProfile(conf).reset()
+  // for testing only
+  private[spark] def reInitDefaultProfile(conf: SparkConf): Unit = {
+    clearDefaultProfile
+    // force recreate it after clearing
+    getOrCreateDefaultProfile(conf)
+  }
+
+  // for testing only
+  private[spark] def clearDefaultProfile: Unit = {
+    DEFAULT_PROFILE_LOCK.synchronized {
+      defaultProfile = None
+    }
+  }
+
+  private[spark] def getCustomTaskResources(
+      rp: ResourceProfile): Map[String, TaskResourceRequest] = {
+    rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS))
+  }
+
+  private[spark] def getCustomExecutorResources(
+      rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
+    rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k))
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
new file mode 100644
index 0000000..0d55c17
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.util.{Map => JMap}
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+
+/**
+ * Resource profile builder to build a Resource profile to associate with an RDD.
+ * A ResourceProfile allows the user to specify executor and task requirements for an RDD
+ * that will get applied during a stage. This allows the user to change the resource
+ * requirements between stages.
+ */
+@Evolving
+class ResourceProfileBuilder() {
+
+  private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
+  private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
+
+  def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
+  def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
+
+  /**
+   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
+   */
+  def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava
+
+  /**
+   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   */
+  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
+    _executorResources.asScala.asJava
+  }
+
+  def require(requests: ExecutorResourceRequests): this.type = {
+    _executorResources.putAll(requests.requests.asJava)
+    this
+  }
+
+  def require(requests: TaskResourceRequests): this.type = {
+    _taskResources.putAll(requests.requests.asJava)
+    this
+  }
+
+  def clearExecutorResourceRequests(): this.type = {
+    _executorResources.clear()
+    this
+  }
+
+  def clearTaskResourceRequests(): this.type = {
+    _taskResources.clear()
+    this
+  }
+
+  override def toString(): String = {
+    "Profile executor resources: " +
+      s"${_executorResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}, " +
+      s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
+  }
+
+  def build: ResourceProfile = {
+    new ResourceProfile(executorResources, taskResources)
+  }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index fdd5c9a..190b0cd 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -111,7 +111,7 @@ private[spark] object ResourceUtils extends Logging {
   }
 
   def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = {
-    sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_DOT").map { case (key, _) =>
+    sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_PREFIX.").map { case (key, _) =>
       key.substring(0, key.indexOf('.'))
     }.toSet.toSeq.map(name => ResourceID(componentName, name))
   }
@@ -124,6 +124,35 @@ private[spark] object ResourceUtils extends Logging {
       .filter(_.amount > 0)
   }
 
+  // Used to take a fraction amount from a task resource requirement and split into a real
+  // integer amount and the number of parts expected. For instance, if the amount is 0.5,
+  // the we get (1, 2) back out.
+  // Returns tuple of (amount, numParts)
+  def calculateAmountAndPartsForFraction(amount: Double): (Int, Int) = {
+    val parts = if (amount <= 0.5) {
+      Math.floor(1.0 / amount).toInt
+    } else if (amount % 1 != 0) {
+      throw new SparkException(
+        s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
+    } else {
+      1
+    }
+    (Math.ceil(amount).toInt, parts)
+  }
+
+  // Add any task resource requests from the spark conf to the TaskResourceRequests passed in
+  def addTaskResourceRequests(
+      sparkConf: SparkConf,
+      treqs: TaskResourceRequests): Unit = {
+    listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
+      val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
+      val amountDouble = settings.getOrElse(AMOUNT,
+        throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}")
+      ).toDouble
+      treqs.resource(resourceId.resourceName, amountDouble)
+    }
+  }
+
   def parseResourceRequirements(sparkConf: SparkConf, componentName: String)
     : Seq[ResourceRequirement] = {
     val resourceIds = listResourceIds(sparkConf, componentName)
@@ -136,15 +165,7 @@ private[spark] object ResourceUtils extends Logging {
     }
     rnamesAndAmounts.filter { case (_, amount) => amount > 0 }.map { case (rName, amountDouble) =>
       val (amount, parts) = if (componentName.equalsIgnoreCase(SPARK_TASK_PREFIX)) {
-        val parts = if (amountDouble <= 0.5) {
-          Math.floor(1.0 / amountDouble).toInt
-        } else if (amountDouble % 1 != 0) {
-          throw new SparkException(
-            s"The resource amount ${amountDouble} must be either <= 0.5, or a whole number.")
-        } else {
-          1
-        }
-        (Math.ceil(amountDouble).toInt, parts)
+        calculateAmountAndPartsForFraction(amountDouble)
       } else if (amountDouble % 1 != 0) {
         throw new SparkException(
           s"Only tasks support fractional resources, please check your $componentName settings")
@@ -181,12 +202,18 @@ private[spark] object ResourceUtils extends Logging {
     }
   }
 
+  def parseAllocated(
+      resourcesFileOpt: Option[String],
+      componentName: String): Seq[ResourceAllocation] = {
+    resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile)
+      .filter(_.id.componentName == componentName)
+  }
+
   private def parseAllocatedOrDiscoverResources(
       sparkConf: SparkConf,
       componentName: String,
       resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = {
-    val allocated = resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile)
-      .filter(_.id.componentName == componentName)
+    val allocated = parseAllocated(resourcesFileOpt, componentName)
     val otherResourceIds = listResourceIds(sparkConf, componentName).diff(allocated.map(_.id))
     val otherResources = otherResourceIds.flatMap { id =>
       val request = parseResourceRequest(sparkConf, id)
@@ -215,9 +242,24 @@ private[spark] object ResourceUtils extends Logging {
     requests.foreach(r => assertResourceAllocationMeetsRequest(allocated(r.id), r))
   }
 
+  private def assertAllResourceAllocationsMatchResourceProfile(
+      allocations: Map[String, ResourceInformation],
+      execReqs: Map[String, ExecutorResourceRequest]): Unit = {
+    execReqs.foreach { case (rName, req) =>
+      require(allocations.contains(rName) && allocations(rName).addresses.size >= req.amount,
+        s"Resource: ${rName}, with addresses: " +
+          s"${allocations(rName).addresses.mkString(",")} " +
+          s"is less than what the user requested: ${req.amount})")
+    }
+  }
+
   /**
    * Gets all allocated resource information for the input component from input resources file and
-   * discover the remaining via discovery scripts.
+   * the application level Spark configs. It first looks to see if resource were explicitly
+   * specified in the resources file (this would include specified address assignments and it only
+   * specified in certain cluster managers) and then it looks at the Spark configs to get any
+   * others not specified in the file. The resources not explicitly set in the file require a
+   * discovery script for it to run to get the addresses of the resource.
    * It also verifies the resource allocation meets required amount for each resource.
    * @return a map from resource name to resource info
    */
@@ -232,6 +274,37 @@ private[spark] object ResourceUtils extends Logging {
     resourceInfoMap
   }
 
+  /**
+   * This function is similar to getOrDiscoverallResources, except for it uses the ResourceProfile
+   * information instead of the application level configs.
+   *
+   * It first looks to see if resource were explicitly specified in the resources file
+   * (this would include specified address assignments and it only specified in certain
+   * cluster managers) and then it looks at the ResourceProfile to get
+   * any others not specified in the file. The resources not explicitly set in the file require a
+   * discovery script for it to run to get the addresses of the resource.
+   * It also verifies the resource allocation meets required amount for each resource.
+   *
+   * @return a map from resource name to resource info
+   */
+  def getOrDiscoverAllResourcesForResourceProfile(
+      resourcesFileOpt: Option[String],
+      componentName: String,
+      resourceProfile: ResourceProfile): Map[String, ResourceInformation] = {
+    val fileAllocated = parseAllocated(resourcesFileOpt, componentName)
+    val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, a.toResourceInformation)).toMap
+    // only want to look at the ResourceProfile for resources not in the resources file
+    val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile)
+    val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) }
+    val rpAllocations = filteredExecreq.map { case (rName, execRequest) =>
+      val addrs = discoverResource(rName, Option(execRequest.discoveryScript)).addresses
+      (rName, new ResourceInformation(rName, addrs))
+    }
+    val allAllocations = fileAllocResMap ++ rpAllocations
+    assertAllResourceAllocationsMatchResourceProfile(allAllocations, execReq)
+    allAllocations
+  }
+
   def logResourceInfo(componentName: String, resources: Map[String, ResourceInformation])
     : Unit = {
     logInfo("==============================================================")
@@ -240,9 +313,9 @@ private[spark] object ResourceUtils extends Logging {
   }
 
   // visible for test
-  private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = {
-    val resourceName = resourceRequest.id.resourceName
-    val script = resourceRequest.discoveryScript
+  private[spark] def discoverResource(
+      resourceName: String,
+      script: Option[String]): ResourceInformation = {
     val result = if (script.nonEmpty) {
       val scriptFile = new File(script.get)
       // check that script exists and try to execute
@@ -264,10 +337,16 @@ private[spark] object ResourceUtils extends Logging {
     result
   }
 
+  // visible for test
+  private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = {
+    val resourceName = resourceRequest.id.resourceName
+    val script = resourceRequest.discoveryScript
+    discoverResource(resourceName, script)
+  }
+
   // known types of resources
   final val GPU: String = "gpu"
   final val FPGA: String = "fpga"
 
   final val RESOURCE_PREFIX: String = "resource"
-  final val RESOURCE_DOT: String = s"$RESOURCE_PREFIX."
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
index 22eda52..bffb0a2 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
@@ -17,10 +17,6 @@
 
 package org.apache.spark.resource
 
-import scala.collection.mutable
-
-import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
-
 /**
  * A task resource request. This is used in conjuntion with the ResourceProfile to
  * programmatically specify the resources needed for an RDD that will be applied at the
@@ -37,7 +33,19 @@ private[spark] class TaskResourceRequest(val resourceName: String, val amount: D
   assert(amount <= 0.5 || amount % 1 == 0,
     s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
 
-  if (!resourceName.equals(ResourceProfile.CPUS) && !resourceName.startsWith(RESOURCE_DOT)) {
-    throw new IllegalArgumentException(s"Task resource not allowed: $resourceName")
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: TaskResourceRequest =>
+        that.getClass == this.getClass &&
+          that.resourceName == resourceName && that.amount == amount
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int = Seq(resourceName, amount).hashCode()
+
+  override def toString(): String = {
+    s"name: $resourceName, amount: $amount"
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
index 21cbc5d..9624b51 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.resource
 
-import scala.collection.mutable
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.resource.ResourceProfile._
-import org.apache.spark.resource.ResourceUtils._
 
 /**
  * A set of task resource requests. This is used in conjuntion with the ResourceProfile to
@@ -32,9 +33,9 @@ import org.apache.spark.resource.ResourceUtils._
  */
 private[spark] class TaskResourceRequests() extends Serializable {
 
-  private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
+  private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
 
-  def requests: Map[String, TaskResourceRequest] = _taskResources.toMap
+  def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
 
   /**
    * Specify number of cpus per Task.
@@ -42,15 +43,13 @@ private[spark] class TaskResourceRequests() extends Serializable {
    * @param amount Number of cpus to allocate per Task.
    */
   def cpus(amount: Int): this.type = {
-    val t = new TaskResourceRequest(CPUS, amount)
-    _taskResources(CPUS) = t
+    val treq = new TaskResourceRequest(CPUS, amount)
+    _taskResources.put(CPUS, treq)
     this
   }
 
   /**
-   *  Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
-   *  correspond to the regular Spark configs with the prefix removed. For instance, resources
-   *  like GPUs are resource.gpu (spark configs spark.task.resource.gpu.*)
+   *  Amount of a particular custom resource(GPU, FPGA, etc) to use.
    *
    * @param resourceName Name of the resource.
    * @param amount Amount requesting as a Double to support fractional resource requests.
@@ -58,14 +57,14 @@ private[spark] class TaskResourceRequests() extends Serializable {
    *               lets you configure X number of tasks to run on a single resource,
    *               ie amount equals 0.5 translates into 2 tasks per resource address.
    */
-  def resource(rName: String, amount: Double): this.type = {
-    val t = new TaskResourceRequest(rName, amount)
-    _taskResources(rName) = t
+  def resource(resourceName: String, amount: Double): this.type = {
+    val treq = new TaskResourceRequest(resourceName, amount)
+    _taskResources.put(resourceName, treq)
     this
   }
 
   def addRequest(treq: TaskResourceRequest): this.type = {
-    _taskResources(treq.resourceName) = treq
+    _taskResources.put(treq.resourceName, treq)
     this
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 57317e7..2833908 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.ExecutorLossReason
 import org.apache.spark.util.SerializableBuffer
@@ -29,12 +29,13 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
 
 private[spark] object CoarseGrainedClusterMessages {
 
-  case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
+  case class RetrieveSparkAppConfig(resourceProfileId: Int) extends CoarseGrainedClusterMessage
 
   case class SparkAppConfig(
       sparkProperties: Seq[(String, String)],
       ioEncryptionKey: Option[Array[Byte]],
-      hadoopDelegationCreds: Option[Array[Byte]])
+      hadoopDelegationCreds: Option[Array[Byte]],
+      resourceProfile: ResourceProfile)
     extends CoarseGrainedClusterMessage
 
   case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
@@ -59,7 +60,8 @@ private[spark] object CoarseGrainedClusterMessages {
       cores: Int,
       logUrls: Map[String, String],
       attributes: Map[String, String],
-      resources: Map[String, ResourceInformation])
+      resources: Map[String, ResourceInformation],
+      resourceProfileId: Int)
     extends CoarseGrainedClusterMessage
 
   case class LaunchedExecutor(executorId: String) extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 031b9af..55f4005 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -33,7 +33,7 @@ import org.apache.spark.executor.ExecutorLogUrlHandler
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network._
-import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -205,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 
       case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
-          attributes, resources) =>
+          attributes, resources, resourceProfileId) =>
         if (executorDataMap.contains(executorId)) {
           context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
         } else if (scheduler.nodeBlacklist.contains(hostname) ||
@@ -236,7 +236,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           }
           val data = new ExecutorData(executorRef, executorAddress, hostname,
             0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
-            resourcesInfo)
+            resourcesInfo, resourceProfileId)
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
@@ -270,11 +270,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         removeWorker(workerId, host, message)
         context.reply(true)
 
-      case RetrieveSparkAppConfig =>
+      case RetrieveSparkAppConfig(resourceProfileId) =>
+        // note this will be updated in later prs to get the ResourceProfile from a
+        // ResourceProfileManager that matches the resource profile id
+        // for now just use default profile
+        val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
         val reply = SparkAppConfig(
           sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey(),
-          Option(delegationTokens.get()))
+          Option(delegationTokens.get()),
+          rp)
         context.reply(reply)
     }
 
@@ -570,6 +575,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty)
   }
 
+  // this function is for testing only
+  def getExecutorResourceProfileId(executorId: String): Int = synchronized {
+    val res = executorDataMap.get(executorId)
+    res.map(_.resourceProfileId).getOrElse(ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
+  }
+
   /**
    * Request an additional number of executors from the cluster manager.
    * @return whether the request is acknowledged.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index 17907d8..0621461 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -29,6 +29,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
  * @param freeCores  The current number of cores available for work on the executor
  * @param totalCores The total number of cores available to the executor
  * @param resourcesInfo The information of the currently available resources on the executor
+ * @param resourceProfileId The id of the ResourceProfile being used by this executor
  */
 private[cluster] class ExecutorData(
     val executorEndpoint: RpcEndpointRef,
@@ -38,5 +39,7 @@ private[cluster] class ExecutorData(
     override val totalCores: Int,
     override val logUrlMap: Map[String, String],
     override val attributes: Map[String, String],
-    override val resourcesInfo: Map[String, ExecutorResourceInfo]
-) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
+    override val resourcesInfo: Map[String, ExecutorResourceInfo],
+    override val resourceProfileId: Int
+) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
+  resourcesInfo, resourceProfileId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index 5a4ad6e..a97b089 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 
 /**
  * :: DeveloperApi ::
@@ -25,14 +26,15 @@ import org.apache.spark.resource.ResourceInformation
  */
 @DeveloperApi
 class ExecutorInfo(
-   val executorHost: String,
-   val totalCores: Int,
-   val logUrlMap: Map[String, String],
-   val attributes: Map[String, String],
-   val resourcesInfo: Map[String, ResourceInformation]) {
+    val executorHost: String,
+    val totalCores: Int,
+    val logUrlMap: Map[String, String],
+    val attributes: Map[String, String],
+    val resourcesInfo: Map[String, ResourceInformation],
+    val resourceProfileId: Int) {
 
   def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = {
-    this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty)
+    this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   def this(
@@ -40,7 +42,17 @@ class ExecutorInfo(
       totalCores: Int,
       logUrlMap: Map[String, String],
       attributes: Map[String, String]) = {
-    this(executorHost, totalCores, logUrlMap, attributes, Map.empty)
+    this(executorHost, totalCores, logUrlMap, attributes, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
+  }
+
+  def this(
+      executorHost: String,
+      totalCores: Int,
+      logUrlMap: Map[String, String],
+      attributes: Map[String, String],
+      resourcesInfo: Map[String, ResourceInformation]) = {
+    this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
+      DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@@ -52,12 +64,14 @@ class ExecutorInfo(
         totalCores == that.totalCores &&
         logUrlMap == that.logUrlMap &&
         attributes == that.attributes &&
-        resourcesInfo == that.resourcesInfo
+        resourcesInfo == that.resourcesInfo &&
+        resourceProfileId == that.resourceProfileId
     case _ => false
   }
 
   override def hashCode(): Int = {
-    val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
+    val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
+      resourceProfileId)
     state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 3dfd1ea..a24f190 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.RDDBlockId
 import org.apache.spark.util.Clock
@@ -52,6 +53,7 @@ private[spark] class ExecutorMonitor(
     conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING)
 
   private val executors = new ConcurrentHashMap[String, Tracker]()
+  private val execResourceProfileCount = new ConcurrentHashMap[Int, Int]()
 
   // The following fields are an optimization to avoid having to scan all executors on every EAM
   // schedule interval to find out which ones are timed out. They keep track of when the next
@@ -92,6 +94,7 @@ private[spark] class ExecutorMonitor(
 
   def reset(): Unit = {
     executors.clear()
+    execResourceProfileCount.clear()
     nextTimeout.set(Long.MaxValue)
     timedOutExecs = Nil
   }
@@ -148,8 +151,25 @@ private[spark] class ExecutorMonitor(
 
   def executorCount: Int = executors.size()
 
+  def executorCountWithResourceProfile(id: Int): Int = {
+    execResourceProfileCount.getOrDefault(id, 0)
+  }
+
+  def getResourceProfileId(executorId: String): Int = {
+    val execTrackingInfo = executors.get(executorId)
+    if (execTrackingInfo != null) {
+      execTrackingInfo.resourceProfileId
+    } else {
+      UNKNOWN_RESOURCE_PROFILE_ID
+    }
+  }
+
   def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval }
 
+  def pendingRemovalCountPerResourceProfileId(id: Int): Int = {
+    executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size
+  }
+
   override def onJobStart(event: SparkListenerJobStart): Unit = {
     if (!shuffleTrackingEnabled) {
       return
@@ -261,7 +281,7 @@ private[spark] class ExecutorMonitor(
     val executorId = event.taskInfo.executorId
     // Guard against a late arriving task start event (SPARK-26927).
     if (client.isExecutorActive(executorId)) {
-      val exec = ensureExecutorIsTracked(executorId)
+      val exec = ensureExecutorIsTracked(executorId, UNKNOWN_RESOURCE_PROFILE_ID)
       exec.updateRunningTasks(1)
     }
   }
@@ -290,15 +310,21 @@ private[spark] class ExecutorMonitor(
   }
 
   override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
-    val exec = ensureExecutorIsTracked(event.executorId)
+    val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId)
     exec.updateRunningTasks(0)
     logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})")
   }
 
+  private def decrementExecResourceProfileCount(rpId: Int): Unit = {
+    val count = execResourceProfileCount.getOrDefault(rpId, 0)
+    execResourceProfileCount.replace(rpId, count, count - 1)
+    execResourceProfileCount.remove(rpId, 0)
+  }
+
   override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
     val removed = executors.remove(event.executorId)
     if (removed != null) {
-      logInfo(s"Executor ${event.executorId} removed (new total is ${executors.size()})")
+      decrementExecResourceProfileCount(removed.resourceProfileId)
       if (!removed.pendingRemoval) {
         nextTimeout.set(Long.MinValue)
       }
@@ -309,8 +335,8 @@ private[spark] class ExecutorMonitor(
     if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
       return
     }
-
-    val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId)
+    val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
+      UNKNOWN_RESOURCE_PROFILE_ID)
     val storageLevel = event.blockUpdatedInfo.storageLevel
     val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
 
@@ -392,8 +418,26 @@ private[spark] class ExecutorMonitor(
    * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
    * event, which is possible because these events are posted in different threads. (see SPARK-4951)
    */
-  private def ensureExecutorIsTracked(id: String): Tracker = {
-    executors.computeIfAbsent(id, _ => new Tracker())
+  private def ensureExecutorIsTracked(id: String, resourceProfileId: Int): Tracker = {
+    val numExecsWithRpId = execResourceProfileCount.computeIfAbsent(resourceProfileId, _ => 0)
+    val execTracker = executors.computeIfAbsent(id, _ => {
+        val newcount = numExecsWithRpId + 1
+        execResourceProfileCount.put(resourceProfileId, newcount)
+        logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " +
+          s"count is now $newcount")
+        new Tracker(resourceProfileId)
+      })
+    // if we had added executor before without knowing the resource profile id, fix it up
+    if (execTracker.resourceProfileId == UNKNOWN_RESOURCE_PROFILE_ID &&
+        resourceProfileId != UNKNOWN_RESOURCE_PROFILE_ID) {
+      logDebug(s"Executor: $id, resource profile id was unknown, setting " +
+        s"it to $resourceProfileId")
+      execTracker.resourceProfileId = resourceProfileId
+      // fix up the counts for each resource profile id
+      execResourceProfileCount.put(resourceProfileId, numExecsWithRpId + 1)
+      decrementExecResourceProfileCount(UNKNOWN_RESOURCE_PROFILE_ID)
+    }
+    execTracker
   }
 
   private def updateNextTimeout(newValue: Long): Unit = {
@@ -413,7 +457,7 @@ private[spark] class ExecutorMonitor(
     }
   }
 
-  private class Tracker {
+  private class Tracker(var resourceProfileId: Int) {
     @volatile var timeoutAt: Long = Long.MaxValue
 
     // Tracks whether this executor is thought to be timed out. It's used to detect when the list
diff --git a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
index 0771207..bb413c0 100644
--- a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
+++ b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
@@ -35,7 +35,7 @@ public class JavaResourceProfileSuite {
     ExecutorResourceRequests execReqFpga =
       new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia");
 
-    ResourceProfile rprof = new ResourceProfile();
+    ResourceProfileBuilder rprof = new ResourceProfileBuilder();
     rprof.require(execReqGpu);
     rprof.require(execReqFpga);
     TaskResourceRequests taskReq1 = new TaskResourceRequests().resource(GpuResource, 1);
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 6ae1f19..99f3e3b 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.{Clock, ManualClock, SystemClock}
@@ -1018,8 +1019,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     manager
   }
 
+  private val execInfo = new ExecutorInfo("host1", 1, Map.empty,
+    Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
+
   private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
-    post(SparkListenerExecutorAdded(0L, id, null))
+    post(SparkListenerExecutorAdded(0L, id, execInfo))
   }
 
   private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index b468e6f..ff0f2f9 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -177,10 +178,10 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
       RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty,
-        Map.empty))
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
       RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty,
-        Map.empty))
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index d050ee2..1fe12e1 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -22,6 +22,8 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.Suite
 
+import org.apache.spark.resource.ResourceProfile
+
 /** Manages a local `sc` `SparkContext` variable, correctly stopping it after each test. */
 trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
 
@@ -42,6 +44,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   def resetSparkContext(): Unit = {
     LocalSparkContext.stop(sc)
+    ResourceProfile.clearDefaultProfile
     sc = null
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index e316da7..f8b9930 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.internal.config
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster._
@@ -505,7 +506,7 @@ class StandaloneDynamicAllocationSuite
     val mockAddress = mock(classOf[RpcAddress])
     when(endpointRef.address).thenReturn(mockAddress)
     val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty,
-      Map.empty, Map.empty)
+      Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     val taskScheduler = mock(classOf[TaskSchedulerImpl])
     when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
@@ -629,7 +630,7 @@ class StandaloneDynamicAllocationSuite
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)
       val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
-        Map.empty)
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
       backend.driverEndpoint.askSync[Boolean](message)
       backend.driverEndpoint.send(LaunchedExecutor(id))
     }
diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 7e96039..a996fc4 100644
--- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.executor
 
+import java.io.File
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Properties
@@ -33,7 +34,7 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.TestUtils._
-import org.apache.spark.resource.{ResourceAllocation, ResourceInformation}
+import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEnv
@@ -49,13 +50,13 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
   test("parsing no resources") {
     val conf = new SparkConf
-    conf.set(TASK_GPU_ID.amountConf, "2")
+    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
 
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None)
+      4, Seq.empty[URL], env, None, resourceProfile)
     withTempDir { tmpDir =>
       val testResourceArgs: JObject = ("" -> "")
       val ja = JArray(List(testResourceArgs))
@@ -72,12 +73,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
   test("parsing one resource") {
     val conf = new SparkConf
     conf.set(EXECUTOR_GPU_ID.amountConf, "2")
-    conf.set(TASK_GPU_ID.amountConf, "2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None)
+      4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
     withTempDir { tmpDir =>
       val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
       val ja = Extraction.decompose(Seq(ra))
@@ -91,18 +91,27 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     }
   }
 
+  test("parsing multiple resources resource profile") {
+    val rpBuilder = new ResourceProfileBuilder
+    val ereqs = new ExecutorResourceRequests().resource(GPU, 2)
+    ereqs.resource(FPGA, 3)
+    val rp = rpBuilder.require(ereqs).build
+    testParsingMultipleResources(new SparkConf, rp)
+  }
+
   test("parsing multiple resources") {
     val conf = new SparkConf
     conf.set(EXECUTOR_GPU_ID.amountConf, "2")
-    conf.set(TASK_GPU_ID.amountConf, "2")
     conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
-    conf.set(TASK_FPGA_ID.amountConf, "3")
+    testParsingMultipleResources(conf, ResourceProfile.getOrCreateDefaultProfile(conf))
+  }
 
+  def testParsingMultipleResources(conf: SparkConf, resourceProfile: ResourceProfile) {
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None)
+      4, Seq.empty[URL], env, None, resourceProfile)
 
     withTempDir { tmpDir =>
       val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@@ -125,12 +134,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
   test("error checking parsing resources and executor and task configs") {
     val conf = new SparkConf
     conf.set(EXECUTOR_GPU_ID.amountConf, "2")
-    conf.set(TASK_GPU_ID.amountConf, "2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None)
+      4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
 
     // not enough gpu's on the executor
     withTempDir { tmpDir =>
@@ -156,20 +164,33 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("User is expecting to use resource: gpu, but didn't specify a " +
-        "discovery script!"))
+      assert(error.contains("Resource script:  to discover gpu doesn't exist!"))
     }
   }
 
+  test("executor resource found less than required resource profile") {
+    val rpBuilder = new ResourceProfileBuilder
+    val ereqs = new ExecutorResourceRequests().resource(GPU, 4)
+    val treqs = new TaskResourceRequests().resource(GPU, 1)
+    val rp = rpBuilder.require(ereqs).require(treqs).build
+    testExecutorResourceFoundLessThanRequired(new SparkConf, rp)
+  }
+
   test("executor resource found less than required") {
-    val conf = new SparkConf
+    val conf = new SparkConf()
     conf.set(EXECUTOR_GPU_ID.amountConf, "4")
     conf.set(TASK_GPU_ID.amountConf, "1")
+    testExecutorResourceFoundLessThanRequired(conf, ResourceProfile.getOrCreateDefaultProfile(conf))
+  }
+
+  private def testExecutorResourceFoundLessThanRequired(
+      conf: SparkConf,
+      resourceProfile: ResourceProfile) = {
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None)
+      4, Seq.empty[URL], env, None, resourceProfile)
 
     // executor resources < required
     withTempDir { tmpDir =>
@@ -189,7 +210,6 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
   test("use resource discovery") {
     val conf = new SparkConf
     conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
-    conf.set(TASK_FPGA_ID.amountConf, "3")
     assume(!(Utils.isWindows))
     withTempDir { dir =>
       val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
@@ -201,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
       // we don't really use this, just need it to get at the parser function
       val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-        4, Seq.empty[URL], env, None)
+        4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
 
       val parsedResources = backend.parseOrFindResources(None)
 
@@ -212,37 +232,56 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     }
   }
 
+  test("use resource discovery and allocated file option with resource profile") {
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+      val rpBuilder = new ResourceProfileBuilder
+      val ereqs = new ExecutorResourceRequests().resource(FPGA, 3, scriptPath)
+      ereqs.resource(GPU, 2)
+      val rp = rpBuilder.require(ereqs).build
+      allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, new SparkConf, rp)
+    }
+  }
+
   test("use resource discovery and allocated file option") {
-    val conf = new SparkConf
-    conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
-    conf.set(TASK_FPGA_ID.amountConf, "3")
     assume(!(Utils.isWindows))
     withTempDir { dir =>
       val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
         """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+      val conf = new SparkConf
+      conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
       conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath)
-
-      val serializer = new JavaSerializer(conf)
-      val env = createMockEnv(conf, serializer)
-
-      // we don't really use this, just need it to get at the parser function
-      val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-        4, Seq.empty[URL], env, None)
-      val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
-      val ja = Extraction.decompose(Seq(gpuArgs))
-      val f1 = createTempJsonFile(dir, "resources", ja)
-      val parsedResources = backend.parseOrFindResources(Some(f1))
-
-      assert(parsedResources.size === 2)
-      assert(parsedResources.get(GPU).nonEmpty)
-      assert(parsedResources.get(GPU).get.name === GPU)
-      assert(parsedResources.get(GPU).get.addresses.sameElements(Array("0", "1")))
-      assert(parsedResources.get(FPGA).nonEmpty)
-      assert(parsedResources.get(FPGA).get.name === FPGA)
-      assert(parsedResources.get(FPGA).get.addresses.sameElements(Array("f1", "f2", "f3")))
+      conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+      val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
+      allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, conf, rp)
     }
   }
 
+  private def allocatedFileAndConfigsResourceDiscoveryTestFpga(
+      dir: File,
+      conf: SparkConf,
+      resourceProfile: ResourceProfile) = {
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
+      4, Seq.empty[URL], env, None, resourceProfile)
+    val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
+    val ja = Extraction.decompose(Seq(gpuArgs))
+    val f1 = createTempJsonFile(dir, "resources", ja)
+    val parsedResources = backend.parseOrFindResources(Some(f1))
+
+    assert(parsedResources.size === 2)
+    assert(parsedResources.get(GPU).nonEmpty)
+    assert(parsedResources.get(GPU).get.name === GPU)
+    assert(parsedResources.get(GPU).get.addresses.sameElements(Array("0", "1")))
+    assert(parsedResources.get(FPGA).nonEmpty)
+    assert(parsedResources.get(FPGA).get.name === FPGA)
+    assert(parsedResources.get(FPGA).get.addresses.sameElements(Array("f1", "f2", "f3")))
+  }
 
   test("track allocated resources by taskId") {
     val conf = new SparkConf
@@ -253,15 +292,16 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     try {
       val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
       val env = createMockEnv(conf, serializer, Some(rpcEnv))
-      backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
-        "host1", "host1", 4, Seq.empty[URL], env, None)
+        backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
+        "host1", "host1", 4, Seq.empty[URL], env, None,
+          resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
       assert(backend.taskResources.isEmpty)
 
       val taskId = 1000000
       // We don't really verify the data, just pass it around.
       val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
-      val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, 1,
-        mutable.Map.empty, mutable.Map.empty, new Properties,
+      val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000",
+        19, 1, mutable.Map.empty, mutable.Map.empty, new Properties,
         Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
       val serializedTaskDescription = TaskDescription.encode(taskDescription)
       backend.executor = mock[Executor]
@@ -271,13 +311,15 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
       backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
       eventually(timeout(10.seconds)) {
         assert(backend.taskResources.size == 1)
-        assert(backend.taskResources(taskId)(GPU).addresses sameElements Array("0", "1"))
+        val resources = backend.taskResources(taskId)
+        assert(resources(GPU).addresses sameElements Array("0", "1"))
       }
 
       // Update the status of a running task shall not affect `taskResources` map.
       backend.statusUpdate(taskId, TaskState.RUNNING, data)
       assert(backend.taskResources.size == 1)
-      assert(backend.taskResources(taskId)(GPU).addresses sameElements Array("0", "1"))
+      val resources = backend.taskResources(taskId)
+      assert(resources(GPU).addresses sameElements Array("0", "1"))
 
       // Update the status of a finished task shall remove the entry from `taskResources` map.
       backend.statusUpdate(taskId, TaskState.FINISHED, data)
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index a087f18..c0637ee 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -18,72 +18,97 @@
 package org.apache.spark.resource
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, SPARK_EXECUTOR_PREFIX}
+import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
 
 class ResourceProfileSuite extends SparkFunSuite {
 
   override def afterEach() {
     try {
-      ResourceProfile.resetDefaultProfile(new SparkConf)
+      ResourceProfile.clearDefaultProfile
     } finally {
       super.afterEach()
     }
   }
-
   test("Default ResourceProfile") {
     val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
     assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assert(rprof.executorResources.size === 2,
       "Executor resources should contain cores and memory by default")
     assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
-      s"Executor resources should have 1 core")
+      "Executor resources should have 1 core")
+    assert(rprof.getExecutorCores.get === 1,
+      "Executor resources should have 1 core")
     assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024,
-      s"Executor resources should have 1024 memory")
+      "Executor resources should have 1024 memory")
+    assert(rprof.executorResources.get(ResourceProfile.PYSPARK_MEM) == None,
+      "pyspark memory empty if not specified")
+    assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
+      "overhead memory empty if not specified")
     assert(rprof.taskResources.size === 1,
       "Task resources should just contain cpus by default")
     assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
-      s"Task resources should have 1 cpu")
+      "Task resources should have 1 cpu")
+    assert(rprof.getTaskCpus.get === 1,
+      "Task resources should have 1 cpu")
   }
 
   test("Default ResourceProfile with app level resources specified") {
     val conf = new SparkConf
+    conf.set(PYSPARK_EXECUTOR_MEMORY.key, "2g")
+    conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g")
+    conf.set(EXECUTOR_MEMORY.key, "4g")
+    conf.set(EXECUTOR_CORES.key, "4")
     conf.set("spark.task.resource.gpu.amount", "1")
     conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1")
     conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript")
     val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
     assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val execResources = rprof.executorResources
-    assert(execResources.size === 3,
+    assert(execResources.size === 5,
       "Executor resources should contain cores, memory, and gpu " + execResources)
+    assert(execResources.contains("gpu"), "Executor resources should have gpu")
+    assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
+      "Executor resources should have 4 core")
+    assert(rprof.getExecutorCores.get === 4, "Executor resources should have 4 core")
+    assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
+      "Executor resources should have 1024 memory")
+    assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount == 2048,
+      "pyspark memory empty if not specified")
+    assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024,
+      "overhead memory empty if not specified")
     assert(rprof.taskResources.size === 2,
       "Task resources should just contain cpus and gpu")
-    assert(execResources.contains("resource.gpu"), "Executor resources should have gpu")
-    assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+    assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
   }
 
-  test("Create ResourceProfile") {
-    val rprof = new ResourceProfile()
-    assert(rprof.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-    assert(rprof.executorResources === Map.empty)
-    assert(rprof.taskResources === Map.empty)
+  test("test default profile task gpus fractional") {
+    val sparkConf = new SparkConf()
+      .set("spark.executor.resource.gpu.amount", "2")
+      .set("spark.task.resource.gpu.amount", "0.33")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.taskResources.get("gpu").get.amount == 0.33)
+  }
 
-    val taskReq = new TaskResourceRequests().resource("resource.gpu", 1)
-    val eReq = new ExecutorResourceRequests().resource("resource.gpu", 2, "myscript", "nvidia")
+  test("Create ResourceProfile") {
+    val rprof = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
     rprof.require(taskReq).require(eReq)
 
     assert(rprof.executorResources.size === 1)
-    assert(rprof.executorResources.contains("resource.gpu"),
+    assert(rprof.executorResources.contains("gpu"),
       "Executor resources should have gpu")
-    assert(rprof.executorResources.get("resource.gpu").get.vendor === "nvidia",
+    assert(rprof.executorResources.get("gpu").get.vendor === "nvidia",
       "gpu vendor should be nvidia")
-    assert(rprof.executorResources.get("resource.gpu").get.discoveryScript === "myscript",
+    assert(rprof.executorResources.get("gpu").get.discoveryScript === "myscript",
       "discoveryScript should be myscript")
-    assert(rprof.executorResources.get("resource.gpu").get.amount === 2,
+    assert(rprof.executorResources.get("gpu").get.amount === 2,
     "gpu amount should be 2")
 
     assert(rprof.taskResources.size === 1, "Should have 1 task resource")
-    assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
-    assert(rprof.taskResources.get("resource.gpu").get.amount === 1,
+    assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
+    assert(rprof.taskResources.get("gpu").get.amount === 1,
       "Task resources should have 1 gpu")
 
     val ereqs = new ExecutorResourceRequests()
@@ -97,70 +122,59 @@ class ResourceProfileSuite extends SparkFunSuite {
 
     assert(rprof.executorResources.size === 5)
     assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
-      s"Executor resources should have 2 cores")
+      "Executor resources should have 2 cores")
     assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
-      s"Executor resources should have 4096 memory")
+      "Executor resources should have 4096 memory")
     assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048,
-      s"Executor resources should have 2048 overhead memory")
+      "Executor resources should have 2048 overhead memory")
     assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
-      s"Executor resources should have 1024 pyspark memory")
+      "Executor resources should have 1024 pyspark memory")
 
     assert(rprof.taskResources.size === 2)
     assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
-
-    val error = intercept[IllegalArgumentException] {
-      rprof.require(new ExecutorResourceRequests().resource("bogusResource", 1))
-    }.getMessage()
-    assert(error.contains("Executor resource not allowed"))
-
-    val taskError = intercept[IllegalArgumentException] {
-      rprof.require(new TaskResourceRequests().resource("bogusTaskResource", 1))
-    }.getMessage()
-    assert(taskError.contains("Task resource not allowed"))
   }
 
   test("Test ExecutorResourceRequests memory helpers") {
-    val rprof = new ResourceProfile()
+    val rprof = new ResourceProfileBuilder()
     val ereqs = new ExecutorResourceRequests()
     ereqs.memory("4g")
     ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
     rprof.require(ereqs)
 
     assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
-      s"Executor resources should have 4096 memory")
+      "Executor resources should have 4096 memory")
     assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000,
-      s"Executor resources should have 2000 overhead memory")
+      "Executor resources should have 2000 overhead memory")
     assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
-      s"Executor resources should have 512 pyspark memory")
+      "Executor resources should have 512 pyspark memory")
   }
 
   test("Test TaskResourceRequest fractional") {
-    val rprof = new ResourceProfile()
-    val treqs = new TaskResourceRequests().resource("resource.gpu", 0.33)
+    val rprof = new ResourceProfileBuilder()
+    val treqs = new TaskResourceRequests().resource("gpu", 0.33)
     rprof.require(treqs)
 
     assert(rprof.taskResources.size === 1, "Should have 1 task resource")
-    assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
-    assert(rprof.taskResources.get("resource.gpu").get.amount === 0.33,
+    assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
+    assert(rprof.taskResources.get("gpu").get.amount === 0.33,
       "Task resources should have 0.33 gpu")
 
-    val fpgaReqs = new TaskResourceRequests().resource("resource.fpga", 4.0)
+    val fpgaReqs = new TaskResourceRequests().resource("fpga", 4.0)
     rprof.require(fpgaReqs)
 
     assert(rprof.taskResources.size === 2, "Should have 2 task resource")
-    assert(rprof.taskResources.contains("resource.fpga"), "Task resources should have gpu")
-    assert(rprof.taskResources.get("resource.fpga").get.amount === 4.0,
+    assert(rprof.taskResources.contains("fpga"), "Task resources should have gpu")
+    assert(rprof.taskResources.get("fpga").get.amount === 4.0,
       "Task resources should have 4.0 gpu")
 
     var taskError = intercept[AssertionError] {
-      rprof.require(new TaskResourceRequests().resource("resource.gpu", 1.5))
+      rprof.require(new TaskResourceRequests().resource("gpu", 1.5))
     }.getMessage()
     assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number."))
 
     taskError = intercept[AssertionError] {
-      rprof.require(new TaskResourceRequests().resource("resource.gpu", 0.7))
+      rprof.require(new TaskResourceRequests().resource("gpu", 0.7))
     }.getMessage()
     assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
   }
 }
-
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
index b6d46d2..b809469 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
@@ -128,7 +128,8 @@ class ResourceUtilsSuite extends SparkFunSuite
       assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo)
 
       val gpuDiscovery = createTempScriptWithExpectedOutput(
-        dir, "gpuDiscoveryScript", """{"name": "gpu", "addresses": ["0", "1"]}""")
+        dir, "gpuDiscoveryScript",
+        """{"name": "gpu", "addresses": ["0", "1"]}""")
       conf.set(EXECUTOR_GPU_ID.amountConf, "2")
       conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery)
       val resourcesFromBoth = getOrDiscoverAllResources(
@@ -139,6 +140,38 @@ class ResourceUtilsSuite extends SparkFunSuite
     }
   }
 
+  test("get from resources file and discover resource profile remaining") {
+    val conf = new SparkConf
+    val rpId = 1
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      implicit val formats = DefaultFormats
+      val fpgaAddrs = Seq("f1", "f2", "f3")
+      val fpgaAllocation = ResourceAllocation(EXECUTOR_FPGA_ID, fpgaAddrs)
+      val resourcesFile = createTempJsonFile(
+        dir, "resources", Extraction.decompose(Seq(fpgaAllocation)))
+      val resourcesFromFileOnly = getOrDiscoverAllResourcesForResourceProfile(
+        Some(resourcesFile),
+        SPARK_EXECUTOR_PREFIX,
+        ResourceProfile.getOrCreateDefaultProfile(conf))
+      val expectedFpgaInfo = new ResourceInformation(FPGA, fpgaAddrs.toArray)
+      assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo)
+
+      val gpuDiscovery = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript",
+        """{"name": "gpu", "addresses": ["0", "1"]}""")
+      val rpBuilder = new ResourceProfileBuilder()
+      val ereqs = new ExecutorResourceRequests().resource(GPU, 2, gpuDiscovery)
+      val treqs = new TaskResourceRequests().resource(GPU, 1)
+      val rp = rpBuilder.require(ereqs).require(treqs).build
+      val resourcesFromBoth = getOrDiscoverAllResourcesForResourceProfile(
+        Some(resourcesFile), SPARK_EXECUTOR_PREFIX, rp)
+      val expectedGpuInfo = new ResourceInformation(GPU, Array("0", "1"))
+      assert(resourcesFromBoth(FPGA) === expectedFpgaInfo)
+      assert(resourcesFromBoth(GPU) === expectedGpuInfo)
+    }
+  }
+
   test("list resource ids") {
     val conf = new SparkConf
     conf.set(DRIVER_GPU_ID.amountConf, "2")
@@ -148,7 +181,7 @@ class ResourceUtilsSuite extends SparkFunSuite
 
     conf.set(DRIVER_FPGA_ID.amountConf, "2")
     val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX)
-      .map{ rId => (rId.resourceName, 1)}.toMap
+      .map { rId => (rId.resourceName, 1) }.toMap
     assert(resourcesMap.size === 2, "should only have GPU for resource")
     assert(resourcesMap.get(GPU).nonEmpty, "should have GPU")
     assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 29160a3..c063301 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@@ -173,11 +173,14 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     sc.addSparkListener(listener)
 
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes,
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
+      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes,
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
+      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes,
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
 
     sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
     assert(executorAddedCount === 3)
@@ -214,20 +217,25 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     sc.addSparkListener(listener)
 
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources))
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources))
+      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources))
+      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
+        5))
 
     val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
     val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
     val buffer = new SerializableBuffer(bytebuffer)
 
     var execResources = backend.getExecutorAvailableResources("1")
-
     assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
 
+    var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
+    assert(exec3ResourceProfileId === 5)
+
     val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
     var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
       "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 1397cb7..615389a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -27,7 +27,9 @@ import org.mockito.Mockito.{doAnswer, mock, when}
 import org.apache.spark._
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, UNKNOWN_RESOURCE_PROFILE_ID}
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
 import org.apache.spark.util.ManualClock
 
@@ -47,6 +49,9 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   private var client: ExecutorAllocationClient = _
   private var clock: ManualClock = _
 
+  private val execInfo = new ExecutorInfo("host1", 1, Map.empty,
+    Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
+
   // List of known executors. Allows easily mocking which executors are alive without
   // having to use mockito APIs directly in each test.
   private val knownExecs = mutable.HashSet[String]()
@@ -64,10 +69,12 @@ class ExecutorMonitorSuite extends SparkFunSuite {
 
   test("basic executor timeout") {
     knownExecs += "1"
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     assert(monitor.executorCount === 1)
     assert(monitor.isExecutorIdle("1"))
     assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+    assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
+    assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   test("SPARK-4951, SPARK-26927: handle out of order task start events") {
@@ -75,26 +82,38 @@ class ExecutorMonitorSuite extends SparkFunSuite {
 
     monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
     assert(monitor.executorCount === 1)
+    assert(monitor.executorCountWithResourceProfile(UNKNOWN_RESOURCE_PROFILE_ID) === 1)
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     assert(monitor.executorCount === 1)
+    assert(monitor.executorCountWithResourceProfile(UNKNOWN_RESOURCE_PROFILE_ID) === 0)
+    assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
+    assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID)
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
     assert(monitor.executorCount === 2)
+    assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 2)
+    assert(monitor.getResourceProfileId("2") === DEFAULT_RESOURCE_PROFILE_ID)
 
     monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "2", null))
     assert(monitor.executorCount === 1)
+    assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
 
     knownExecs -= "2"
 
     monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 2)))
     assert(monitor.executorCount === 1)
+    assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
+
+    monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "1", null))
+    assert(monitor.executorCount === 0)
+    assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 0)
   }
 
   test("track tasks running on executor") {
     knownExecs += "1"
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
     assert(!monitor.isExecutorIdle("1"))
 
@@ -117,7 +136,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
 
   test("use appropriate time out depending on whether blocks are stored") {
     knownExecs += "1"
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     assert(monitor.isExecutorIdle("1"))
     assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
 
@@ -139,7 +158,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   }
 
   test("keeps track of stored blocks for each rdd and split") {
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
 
     monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
     assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
@@ -173,19 +192,19 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     knownExecs ++= Set("1", "2", "3")
 
     // start exec 1 at 0s (should idle time out at 60s)
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     assert(monitor.isExecutorIdle("1"))
 
     // start exec 2 at 30s, store a block (should idle time out at 150s)
     clock.setTime(TimeUnit.SECONDS.toMillis(30))
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
     monitor.onBlockUpdated(rddUpdate(1, 0, "2"))
     assert(monitor.isExecutorIdle("2"))
     assert(!monitor.timedOutExecutors(idleDeadline).contains("2"))
 
     // start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out)
     clock.setTime(TimeUnit.SECONDS.toMillis(60))
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo))
     assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1"))
 
     // store block on exec 3 (should now idle time out at 180s)
@@ -205,7 +224,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
 
   test("SPARK-27677: don't track blocks stored on disk when using shuffle service") {
     // First make sure that blocks on disk are counted when no shuffle service is available.
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY))
     assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
     assert(monitor.timedOutExecutors(storageDeadline) ===  Seq("1"))
@@ -213,7 +232,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
     monitor = new ExecutorMonitor(conf, client, null, clock)
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY))
     monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_ONLY))
     assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
@@ -236,9 +255,9 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   test("track executors pending for removal") {
     knownExecs ++= Set("1", "2", "3")
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo))
     clock.setTime(idleDeadline)
     assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3"))
     assert(monitor.pendingRemovalCount === 0)
@@ -286,7 +305,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2)))
     monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4)))
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
 
     // First a failed task, to make sure it does not count.
@@ -342,7 +361,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
         throw new IllegalStateException("No event should be sent.")
       }
     }
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.shuffleCleaned(0)
   }
 
@@ -351,8 +370,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false)
     monitor = new ExecutorMonitor(conf, client, bus, clock)
 
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
 
     // Two separate jobs with separate shuffles. The first job will only run tasks on
     // executor 1, the second on executor 2. Ensures that jobs finishing don't affect
@@ -401,7 +420,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     val stage = stageInfo(1, shuffleId = 0)
     monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage)))
     clock.advance(1000L)
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
     monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1),
       new ExecutorMetrics, null))
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b6ec869..f1bbe0b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,14 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
+    // [SPARK-29306] Add support for Stage level scheduling for executors
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productElement"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productArity"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.canEqual"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productIterator"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productPrefix"),
+    ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.toString"),
+
     // [SPARK-29399][core] Remove old ExecutorPlugin interface.
     ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"),
 
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 8f6ae59..1876861 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon
 import org.apache.spark.deploy.mesos.{config => mesosConfig}
 import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor}
@@ -716,7 +717,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val mockEndpointRef = mock[RpcEndpointRef]
     val mockAddress = mock[RpcAddress]
     val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty,
-      Map.empty, Map.empty)
+      Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     backend.driverEndpoint.askSync[Boolean](message)
   }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2e9576e..1e8f408 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -455,7 +456,8 @@ private[spark] class ApplicationMaster(
       val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
       val executorCores = _sparkConf.get(EXECUTOR_CORES)
       val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
-        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
+        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
       dummyRunner.launchContextDebugInfo()
     }
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7046ad7..d9262bb 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -40,7 +40,8 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.util.{Utils, YarnContainerInfoHelper}
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.util.Utils
 
 private[yarn] class ExecutorRunnable(
     container: Option[Container],
@@ -53,7 +54,8 @@ private[yarn] class ExecutorRunnable(
     executorCores: Int,
     appId: String,
     securityMgr: SecurityManager,
-    localResources: Map[String, LocalResource]) extends Logging {
+    localResources: Map[String, LocalResource],
+    resourceProfileId: Int) extends Logging {
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   var nmClient: NMClient = _
@@ -72,7 +74,7 @@ private[yarn] class ExecutorRunnable(
 
     s"""
     |===============================================================================
-    |YARN executor launch context:
+    |Default YARN executor launch context:
     |  env:
     |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s"    $k -> $v\n" }.mkString}
     |  command:
@@ -207,7 +209,8 @@ private[yarn] class ExecutorRunnable(
         "--executor-id", executorId,
         "--hostname", hostname,
         "--cores", executorCores.toString,
-        "--app-id", appId) ++
+        "--app-id", appId,
+        "--resourceProfileId", resourceProfileId.toString) ++
       userClassPath ++
       Seq(
         s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index f68be33..09414cb 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -38,6 +38,7 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -565,7 +566,8 @@ private[yarn] class YarnAllocator(
                 executorCores,
                 appAttemptId.getApplicationId.toString,
                 securityMgr,
-                localResources
+                localResources,
+                ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
               ).run()
               updateInternalState()
             } catch {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
index d46424e..669e39f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
@@ -22,6 +22,7 @@ import java.net.URL
 import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.YarnContainerInfoHelper
 
@@ -39,7 +40,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     cores: Int,
     userClassPath: Seq[URL],
     env: SparkEnv,
-    resourcesFile: Option[String])
+    resourcesFile: Option[String],
+    resourceProfile: ResourceProfile)
   extends CoarseGrainedExecutorBackend(
     rpcEnv,
     driverUrl,
@@ -49,7 +51,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     cores,
     userClassPath,
     env,
-    resourcesFile) with Logging {
+    resourcesFile,
+    resourceProfile) with Logging {
 
   private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
 
@@ -67,11 +70,11 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
 private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
 
   def main(args: Array[String]): Unit = {
-    val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
-      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
+    val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
+      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
       new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
         arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
-        arguments.resourcesFileOpt)
+        arguments.resourcesFileOpt, resourceProfile)
     }
     val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
       this.getClass.getCanonicalName.stripSuffix("$"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org