You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/02/12 22:46:06 UTC

[spark] branch master updated: [SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 496f6ac  [SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes
496f6ac is described below

commit 496f6ac86001d284cbfb7488a63dd3a168919c0f
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Wed Feb 12 16:45:42 2020 -0600

    [SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes
    
    ### What changes were proposed in this pull request?
    
    This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile.  Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile.
    
    The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles.
    
    I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents.
    
    There are various places in the code that use executor "slots" for things.  The ResourceProfile adds functionality to keep that calculation in it.   This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there.
    This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things.
    
    This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier.   That full set of changes will come with the scheduler PR that will be after this one.
    
    The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue.
    
    The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well.
    
    ### Why are the changes needed?
    
    Needed for stage level scheduling feature.
    
    ### Does this PR introduce any user-facing change?
    
    No user facing api changes added here.
    
    ### How was this patch tested?
    
    Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases.
    
    Closes #27313 from tgravescs/SPARK-29148.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/ExecutorAllocationClient.scala    |   31 +-
 .../apache/spark/ExecutorAllocationManager.scala   |  473 ++++++---
 .../main/scala/org/apache/spark/SparkContext.scala |  150 +--
 .../org/apache/spark/internal/config/Tests.scala   |    9 +
 .../spark/resource/ExecutorResourceRequests.scala  |    2 +-
 .../apache/spark/resource/ResourceProfile.scala    |  150 ++-
 .../spark/resource/ResourceProfileBuilder.scala    |    2 +-
 .../spark/resource/ResourceProfileManager.scala    |   86 ++
 .../org/apache/spark/resource/ResourceUtils.scala  |  109 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |    9 +-
 .../org/apache/spark/scheduler/ResultStage.scala   |    5 +-
 .../apache/spark/scheduler/ShuffleMapStage.scala   |    5 +-
 .../scala/org/apache/spark/scheduler/Stage.scala   |    9 +-
 .../org/apache/spark/scheduler/StageInfo.scala     |    9 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |    4 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  150 ++-
 .../cluster/StandaloneSchedulerBackend.scala       |   11 +-
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala |   11 +-
 .../scala/org/apache/spark/util/JsonProtocol.scala |    8 +-
 .../spark/ExecutorAllocationManagerSuite.scala     | 1049 +++++++++++++-------
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |    9 +-
 .../scala/org/apache/spark/LocalSparkContext.scala |    2 +-
 .../scala/org/apache/spark/SparkContextSuite.scala |   36 +-
 .../history/BasicEventFilterBuilderSuite.scala     |    4 +-
 .../resource/ResourceProfileManagerSuite.scala     |  103 ++
 .../spark/resource/ResourceProfileSuite.scala      |   79 +-
 .../apache/spark/resource/ResourceUtilsSuite.scala |    3 +
 .../CoarseGrainedSchedulerBackendSuite.scala       |   13 +-
 .../scheduler/EventLoggingListenerSuite.scala      |    7 +-
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  |   19 +-
 .../spark/status/AppStatusListenerSuite.scala      |   76 +-
 .../spark/status/ListenerEventsTestHelper.scala    |   10 +-
 .../scala/org/apache/spark/ui/StagePageSuite.scala |    4 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  |   15 +-
 python/pyspark/tests/test_context.py               |    5 +
 python/pyspark/tests/test_taskcontext.py           |    6 +
 .../k8s/KubernetesClusterSchedulerBackend.scala    |    8 +-
 .../KubernetesClusterSchedulerBackendSuite.scala   |    4 +
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala |   18 +-
 .../MesosCoarseGrainedSchedulerBackendSuite.scala  |   26 +-
 .../scheduler/cluster/YarnSchedulerBackend.scala   |   25 +-
 .../cluster/YarnSchedulerBackendSuite.scala        |    7 +-
 .../execution/ui/MetricsAggregationBenchmark.scala |    4 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   |    4 +-
 .../scheduler/ExecutorAllocationManager.scala      |    7 +-
 .../scheduler/ExecutorAllocationManagerSuite.scala |   19 +-
 46 files changed, 1935 insertions(+), 860 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index cb965cb..00bd006 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -37,24 +37,29 @@ private[spark] trait ExecutorAllocationClient {
   /**
    * Update the cluster manager on our scheduling needs. Three bits of information are included
    * to help it make decisions.
-   * @param numExecutors The total number of executors we'd like to have. The cluster manager
-   *                     shouldn't kill any running executor to reach this number, but,
-   *                     if all existing executors were to die, this is the number of executors
-   *                     we'd want to be allocated.
-   * @param localityAwareTasks The number of tasks in all active stages that have a locality
-   *                           preferences. This includes running, pending, and completed tasks.
-   * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
-   *                             that would like to like to run on that host.
-   *                             This includes running, pending, and completed tasks.
+   *
+   * @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
+   *                                        ResourceProfile id. The cluster manager shouldn't kill
+   *                                        any running executor to reach this number, but, if all
+   *                                        existing executors were to die, this is the number
+   *                                        of executors we'd want to be allocated.
+   * @param numLocalityAwareTasksPerResourceProfileId The number of tasks in all active stages that
+   *                                                  have a locality preferences per
+   *                                                  ResourceProfile id. This includes running,
+   *                                                  pending, and completed tasks.
+   * @param hostToLocalTaskCount A map of ResourceProfile id to a map of hosts to the number of
+   *                             tasks from all active stages that would like to like to run on
+   *                             that host. This includes running, pending, and completed tasks.
    * @return whether the request is acknowledged by the cluster manager.
    */
   private[spark] def requestTotalExecutors(
-      numExecutors: Int,
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int]): Boolean
+      resourceProfileIdToNumExecutors: Map[Int, Int],
+      numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
+      hostToLocalTaskCount: Map[Int, Map[String, Int]]): Boolean
 
   /**
-   * Request an additional number of executors from the cluster manager.
+   * Request an additional number of executors from the cluster manager for the default
+   * ResourceProfile.
    * @return whether the request is acknowledged by the cluster manager.
    */
   def requestExecutors(numAdditionalExecutors: Int): Boolean
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 677386c..5cb3160 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -29,6 +29,8 @@ import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
+import org.apache.spark.resource.ResourceProfileManager
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.dynalloc.ExecutorMonitor
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -36,9 +38,9 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 /**
  * An agent that dynamically allocates and removes executors based on the workload.
  *
- * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
- * synced to the cluster manager. The target starts at a configured initial value and changes with
- * the number of pending and running tasks.
+ * The ExecutorAllocationManager maintains a moving target number of executors, for each
+ * ResourceProfile, which is periodically synced to the cluster manager. The target starts
+ * at a configured initial value and changes with the number of pending and running tasks.
  *
  * Decreasing the target number of executors happens when the current target is more than needed to
  * handle the current load. The target number of executors is always truncated to the number of
@@ -57,14 +59,18 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
  * quickly over time in case the maximum number of executors is very high. Otherwise, it will take
  * a long time to ramp up under heavy workloads.
  *
- * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not
- * been scheduled to run any tasks, then it is removed. Note that an executor caching any data
+ * The remove policy is simpler and is applied on each ResourceProfile separately. If an executor
+ * for that ResourceProfile has been idle for K seconds and the number of executors is more
+ * then what is needed for that ResourceProfile, meaning there are not enough tasks that could use
+ * the executor, then it is removed. Note that an executor caching any data
  * blocks will be removed if it has been idle for more than L seconds.
  *
  * There is no retry logic in either case because we make the assumption that the cluster manager
  * will eventually fulfill all requests it receives asynchronously.
  *
- * The relevant Spark properties include the following:
+ * The relevant Spark properties are below. Each of these properties applies separately to
+ * every ResourceProfile. So if you set a minimum number of executors, that is a minimum
+ * for each ResourceProfile.
  *
  *   spark.dynamicAllocation.enabled - Whether this feature is enabled
  *   spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
@@ -95,7 +101,8 @@ private[spark] class ExecutorAllocationManager(
     listenerBus: LiveListenerBus,
     conf: SparkConf,
     cleaner: Option[ContextCleaner] = None,
-    clock: Clock = new SystemClock())
+    clock: Clock = new SystemClock(),
+    resourceProfileManager: ResourceProfileManager)
   extends Logging {
 
   allocationManager =>
@@ -117,23 +124,23 @@ private[spark] class ExecutorAllocationManager(
   // During testing, the methods to actually kill and add executors are mocked out
   private val testing = conf.get(DYN_ALLOCATION_TESTING)
 
-  // TODO: The default value of 1 for spark.executor.cores works right now because dynamic
-  // allocation is only supported for YARN and the default number of cores per executor in YARN is
-  // 1, but it might need to be attained differently for different cluster managers
-  private val tasksPerExecutorForFullParallelism =
-    conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK)
-
   private val executorAllocationRatio =
     conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
 
+  private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id
+
   validateSettings()
 
-  // Number of executors to add in the next round
-  private var numExecutorsToAdd = 1
+  // Number of executors to add for each ResourceProfile in the next round
+  private val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int]
+  numExecutorsToAddPerResourceProfileId(defaultProfileId) = 1
 
   // The desired number of executors at this moment in time. If all our executors were to die, this
   // is the number of executors we would immediately want from the cluster manager.
-  private var numExecutorsTarget = initialNumExecutors
+  // Note every profile will be allowed to have initial number,
+  // we may want to make this configurable per Profile in the future
+  private val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int]
+  numExecutorsTargetPerResourceProfileId(defaultProfileId) = initialNumExecutors
 
   // A timestamp of when an addition should be triggered, or NOT_SET if it is not set
   // This is set when pending tasks are added but not scheduled yet
@@ -165,11 +172,12 @@ private[spark] class ExecutorAllocationManager(
   //   (2) an executor idle timeout has elapsed.
   @volatile private var initializing: Boolean = true
 
-  // Number of locality aware tasks, used for executor placement.
-  private var localityAwareTasks = 0
+  // Number of locality aware tasks for each ResourceProfile, used for executor placement.
+  private var numLocalityAwareTasksPerResourceProfileId = new mutable.HashMap[Int, Int]
+  numLocalityAwareTasksPerResourceProfileId(defaultProfileId) = 0
 
-  // Host to possible task running on it, used for executor placement.
-  private var hostToLocalTaskCount: Map[String, Int] = Map.empty
+  // ResourceProfile id to Host to possible task running on it, used for executor placement.
+  private var rpIdToHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
 
   /**
    * Verify that the settings specified through the config are valid.
@@ -233,7 +241,14 @@ private[spark] class ExecutorAllocationManager(
     }
     executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
 
-    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
+    // copy the maps inside synchonize to ensure not being modified
+    val (numExecutorsTarget, numLocalityAware) = synchronized {
+      val numTarget = numExecutorsTargetPerResourceProfileId.toMap
+      val numLocality = numLocalityAwareTasksPerResourceProfileId.toMap
+      (numTarget, numLocality)
+    }
+
+    client.requestTotalExecutors(numExecutorsTarget, numLocalityAware, rpIdToHostToLocalTaskCount)
   }
 
   /**
@@ -253,20 +268,28 @@ private[spark] class ExecutorAllocationManager(
    */
   def reset(): Unit = synchronized {
     addTime = 0L
-    numExecutorsTarget = initialNumExecutors
+    numExecutorsTargetPerResourceProfileId.keys.foreach { rpId =>
+      numExecutorsTargetPerResourceProfileId(rpId) = initialNumExecutors
+    }
     executorMonitor.reset()
   }
 
   /**
-   * The maximum number of executors we would need under the current load to satisfy all running
-   * and pending tasks, rounded up.
+   * The maximum number of executors, for the ResourceProfile id passed in, that we would need
+   * under the current load to satisfy all running and pending tasks, rounded up.
    */
-  private def maxNumExecutorsNeeded(): Int = {
-    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
+  private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
+    val pending = listener.totalPendingTasksPerResourceProfile(rpId)
+    val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
+    val running = listener.totalRunningTasksPerResourceProfile(rpId)
+    val numRunningOrPendingTasks = pending + running
+    val rp = resourceProfileManager.resourceProfileFromId(rpId)
+    val tasksPerExecutor = rp.maxTasksPerExecutor(conf)
+    logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
+      s" tasksperexecutor: $tasksPerExecutor")
     val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
-      tasksPerExecutorForFullParallelism).toInt
-    if (tasksPerExecutorForFullParallelism > 1 && maxNeeded == 1 &&
-      listener.pendingSpeculativeTasks > 0) {
+      tasksPerExecutor).toInt
+    if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
       // If we have pending speculative tasks and only need a single executor, allocate one more
       // to satisfy the locality requirements of speculation
       maxNeeded + 1
@@ -275,8 +298,8 @@ private[spark] class ExecutorAllocationManager(
     }
   }
 
-  private def totalRunningTasks(): Int = synchronized {
-    listener.totalRunningTasks
+  private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized {
+    listener.totalRunningTasksPerResourceProfile(id)
   }
 
   /**
@@ -302,7 +325,8 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Updates our target number of executors and syncs the result with the cluster manager.
+   * Updates our target number of executors for each ResourceProfile and then syncs the result
+   * with the cluster manager.
    *
    * Check to see whether our existing allocation and the requests we've made previously exceed our
    * current needs. If so, truncate our target and let the cluster manager know so that it can
@@ -314,130 +338,205 @@ private[spark] class ExecutorAllocationManager(
    * @return the delta in the target number of executors.
    */
   private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
-    val maxNeeded = maxNumExecutorsNeeded
-
     if (initializing) {
       // Do not change our target while we are still initializing,
       // Otherwise the first job may have to ramp up unnecessarily
       0
-    } else if (maxNeeded < numExecutorsTarget) {
-      // The target number exceeds the number we actually need, so stop adding new
-      // executors and inform the cluster manager to cancel the extra pending requests
-      val oldNumExecutorsTarget = numExecutorsTarget
-      numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
-      numExecutorsToAdd = 1
-
-      // If the new target has not changed, avoid sending a message to the cluster manager
-      if (numExecutorsTarget < oldNumExecutorsTarget) {
-        // We lower the target number of executors but don't actively kill any yet.  Killing is
-        // controlled separately by an idle timeout.  It's still helpful to reduce the target number
-        // in case an executor just happens to get lost (eg., bad hardware, or the cluster manager
-        // preempts it) -- in that case, there is no point in trying to immediately  get a new
-        // executor, since we wouldn't even use it yet.
-        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
-        logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
-          s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
+    } else {
+      val updatesNeeded = new mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]
+
+      // Update targets for all ResourceProfiles then do a single request to the cluster manager
+      numExecutorsTargetPerResourceProfileId.foreach { case (rpId, targetExecs) =>
+        val maxNeeded = maxNumExecutorsNeededPerResourceProfile(rpId)
+        if (maxNeeded < targetExecs) {
+          // The target number exceeds the number we actually need, so stop adding new
+          // executors and inform the cluster manager to cancel the extra pending requests
+
+          // We lower the target number of executors but don't actively kill any yet.  Killing is
+          // controlled separately by an idle timeout.  It's still helpful to reduce
+          // the target number in case an executor just happens to get lost (eg., bad hardware,
+          // or the cluster manager preempts it) -- in that case, there is no point in trying
+          // to immediately  get a new executor, since we wouldn't even use it yet.
+          decrementExecutorsFromTarget(maxNeeded, rpId, updatesNeeded)
+        } else if (addTime != NOT_SET && now >= addTime) {
+          addExecutorsToTarget(maxNeeded, rpId, updatesNeeded)
+        }
+      }
+      doUpdateRequest(updatesNeeded.toMap, now)
+    }
+  }
+
+  private def addExecutorsToTarget(
+      maxNeeded: Int,
+      rpId: Int,
+      updatesNeeded: mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]): Int = {
+    updateTargetExecs(addExecutors, maxNeeded, rpId, updatesNeeded)
+  }
+
+  private def decrementExecutorsFromTarget(
+      maxNeeded: Int,
+      rpId: Int,
+      updatesNeeded: mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]): Int = {
+    updateTargetExecs(decrementExecutors, maxNeeded, rpId, updatesNeeded)
+  }
+
+  private def updateTargetExecs(
+      updateTargetFn: (Int, Int) => Int,
+      maxNeeded: Int,
+      rpId: Int,
+      updatesNeeded: mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]): Int = {
+    val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
+    // update the target number (add or remove)
+    val delta = updateTargetFn(maxNeeded, rpId)
+    if (delta != 0) {
+      updatesNeeded(rpId) = ExecutorAllocationManager.TargetNumUpdates(delta, oldNumExecutorsTarget)
+    }
+    delta
+  }
+
+  private def doUpdateRequest(
+      updates: Map[Int, ExecutorAllocationManager.TargetNumUpdates],
+      now: Long): Int = {
+    // Only call cluster manager if target has changed.
+    if (updates.size > 0) {
+      val requestAcknowledged = try {
+        logDebug("requesting updates: " + updates)
+        testing ||
+          client.requestTotalExecutors(
+            numExecutorsTargetPerResourceProfileId.toMap,
+            numLocalityAwareTasksPerResourceProfileId.toMap,
+            rpIdToHostToLocalTaskCount)
+      } catch {
+        case NonFatal(e) =>
+          // Use INFO level so the error it doesn't show up by default in shells.
+          // Errors here are more commonly caused by YARN AM restarts, which is a recoverable
+          // issue, and generate a lot of noisy output.
+          logInfo("Error reaching cluster manager.", e)
+          false
+      }
+      if (requestAcknowledged) {
+        // have to go through all resource profiles that changed
+        var totalDelta = 0
+        updates.foreach { case (rpId, targetNum) =>
+          val delta = targetNum.delta
+          totalDelta += delta
+          if (delta > 0) {
+            val executorsString = "executor" + { if (delta > 1) "s" else "" }
+            logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " +
+              s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " +
+              s"for resource profile id: ${rpId})")
+            numExecutorsToAddPerResourceProfileId(rpId) =
+              if (delta == numExecutorsToAddPerResourceProfileId(rpId)) {
+                numExecutorsToAddPerResourceProfileId(rpId) * 2
+              } else {
+                1
+              }
+            logDebug(s"Starting timer to add more executors (to " +
+              s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
+            addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS)
+          } else {
+            logDebug(s"Lowering target number of executors to" +
+              s" ${numExecutorsTargetPerResourceProfileId(rpId)} (previously " +
+              s"$targetNum.oldNumExecutorsTarget for resource profile id: ${rpId}) " +
+              "because not all requested executors " +
+              "are actually needed")
+          }
+        }
+        totalDelta
+      } else {
+        // request was for all profiles so we have to go through all to reset to old num
+        updates.foreach { case (rpId, targetNum) =>
+          logWarning("Unable to reach the cluster manager to request more executors!")
+          numExecutorsTargetPerResourceProfileId(rpId) = targetNum.oldNumExecutorsTarget
+        }
+        0
       }
-      numExecutorsTarget - oldNumExecutorsTarget
-    } else if (addTime != NOT_SET && now >= addTime) {
-      val delta = addExecutors(maxNeeded)
-      logDebug(s"Starting timer to add more executors (to " +
-        s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
-      addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS)
-      delta
     } else {
+      logDebug("No change in number of executors")
       0
     }
   }
 
+  private def decrementExecutors(maxNeeded: Int, rpId: Int): Int = {
+    val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
+    numExecutorsTargetPerResourceProfileId(rpId) = math.max(maxNeeded, minNumExecutors)
+    numExecutorsToAddPerResourceProfileId(rpId) = 1
+    numExecutorsTargetPerResourceProfileId(rpId) - oldNumExecutorsTarget
+  }
+
   /**
-   * Request a number of executors from the cluster manager.
+   * Update the target number of executors and figure out how many to add.
    * If the cap on the number of executors is reached, give up and reset the
    * number of executors to add next round instead of continuing to double it.
    *
    * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
    *                              tasks could fill
+   * @param rpId                  the ResourceProfile id of the executors
    * @return the number of additional executors actually requested.
    */
-  private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+  private def addExecutors(maxNumExecutorsNeeded: Int, rpId: Int): Int = {
+    val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
     // Do not request more executors if it would put our target over the upper bound
-    if (numExecutorsTarget >= maxNumExecutors) {
-      logDebug(s"Not adding executors because our current target total " +
-        s"is already $numExecutorsTarget (limit $maxNumExecutors)")
-      numExecutorsToAdd = 1
+    // this is doing a max check per ResourceProfile
+    if (oldNumExecutorsTarget >= maxNumExecutors) {
+      logDebug("Not adding executors because our current target total " +
+        s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
+      numExecutorsToAddPerResourceProfileId(rpId) = 1
       return 0
     }
-
-    val oldNumExecutorsTarget = numExecutorsTarget
     // There's no point in wasting time ramping up to the number of executors we already have, so
     // make sure our target is at least as much as our current allocation:
-    numExecutorsTarget = math.max(numExecutorsTarget, executorMonitor.executorCount)
+    var numExecutorsTarget = math.max(numExecutorsTargetPerResourceProfileId(rpId),
+        executorMonitor.executorCountWithResourceProfile(rpId))
     // Boost our target with the number to add for this round:
-    numExecutorsTarget += numExecutorsToAdd
+    numExecutorsTarget += numExecutorsToAddPerResourceProfileId(rpId)
     // Ensure that our target doesn't exceed what we need at the present moment:
     numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
     // Ensure that our target fits within configured bounds:
     numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
-
     val delta = numExecutorsTarget - oldNumExecutorsTarget
+    numExecutorsTargetPerResourceProfileId(rpId) = numExecutorsTarget
 
     // If our target has not changed, do not send a message
     // to the cluster manager and reset our exponential growth
     if (delta == 0) {
-      numExecutorsToAdd = 1
-      return 0
-    }
-
-    val addRequestAcknowledged = try {
-      testing ||
-        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
-    } catch {
-      case NonFatal(e) =>
-        // Use INFO level so the error it doesn't show up by default in shells. Errors here are more
-        // commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of
-        // noisy output.
-        logInfo("Error reaching cluster manager.", e)
-        false
-    }
-    if (addRequestAcknowledged) {
-      val executorsString = "executor" + { if (delta > 1) "s" else "" }
-      logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
-        s" (new desired total will be $numExecutorsTarget)")
-      numExecutorsToAdd = if (delta == numExecutorsToAdd) {
-        numExecutorsToAdd * 2
-      } else {
-        1
-      }
-      delta
-    } else {
-      logWarning(
-        s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
-      numExecutorsTarget = oldNumExecutorsTarget
-      0
+      numExecutorsToAddPerResourceProfileId(rpId) = 1
     }
+    delta
   }
 
   /**
    * Request the cluster manager to remove the given executors.
    * Returns the list of executors which are removed.
    */
-  private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
+  private def removeExecutors(executors: Seq[(String, Int)]): Seq[String] = synchronized {
     val executorIdsToBeRemoved = new ArrayBuffer[String]
-
     logDebug(s"Request to remove executorIds: ${executors.mkString(", ")}")
-    val numExistingExecutors = executorMonitor.executorCount - executorMonitor.pendingRemovalCount
-
-    var newExecutorTotal = numExistingExecutors
-    executors.foreach { executorIdToBeRemoved =>
-      if (newExecutorTotal - 1 < minNumExecutors) {
-        logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
-          s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
-      } else if (newExecutorTotal - 1 < numExecutorsTarget) {
-        logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
-          s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
+    val numExecutorsTotalPerRpId = mutable.Map[Int, Int]()
+    executors.foreach { case (executorIdToBeRemoved, rpId) =>
+      if (rpId == UNKNOWN_RESOURCE_PROFILE_ID) {
+        if (testing) {
+          throw new SparkException("ResourceProfile Id was UNKNOWN, this is not expected")
+        }
+        logWarning(s"Not removing executor $executorIdsToBeRemoved because the " +
+          "ResourceProfile was UNKNOWN!")
       } else {
-        executorIdsToBeRemoved += executorIdToBeRemoved
-        newExecutorTotal -= 1
+        // get the running total as we remove or initialize it to the count - pendingRemoval
+        val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
+          (executorMonitor.executorCountWithResourceProfile(rpId) -
+            executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
+        if (newExecutorTotal - 1 < minNumExecutors) {
+          logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
+            s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
+            s"$minNumExecutors)")
+        } else if (newExecutorTotal - 1 < numExecutorsTargetPerResourceProfileId(rpId)) {
+          logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
+            s"are only $newExecutorTotal executor(s) left (number of executor " +
+            s"target ${numExecutorsTargetPerResourceProfileId(rpId)})")
+        } else {
+          executorIdsToBeRemoved += executorIdToBeRemoved
+          numExecutorsTotalPerRpId(rpId) -= 1
+        }
       }
     }
 
@@ -457,14 +556,15 @@ private[spark] class ExecutorAllocationManager(
 
     // [SPARK-21834] killExecutors api reduces the target number of executors.
     // So we need to update the target with desired value.
-    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
+    client.requestTotalExecutors(
+      numExecutorsTargetPerResourceProfileId.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap,
+      rpIdToHostToLocalTaskCount)
+
     // reset the newExecutorTotal to the existing number of executors
-    newExecutorTotal = numExistingExecutors
     if (testing || executorsRemoved.nonEmpty) {
-      newExecutorTotal -= executorsRemoved.size
       executorMonitor.executorsKilled(executorsRemoved)
-      logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout." +
-        s"(new desired total will be $newExecutorTotal)")
+      logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
       executorsRemoved
     } else {
       logWarning(s"Unable to reach the cluster manager to kill executor/s " +
@@ -493,7 +593,7 @@ private[spark] class ExecutorAllocationManager(
   private def onSchedulerQueueEmpty(): Unit = synchronized {
     logDebug("Clearing timer to add executors because there are no more pending tasks")
     addTime = NOT_SET
-    numExecutorsToAdd = 1
+    numExecutorsToAddPerResourceProfileId.transform { case (_, _) => 1 }
   }
 
   private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
@@ -519,12 +619,16 @@ private[spark] class ExecutorAllocationManager(
     private val stageAttemptToSpeculativeTaskIndices =
       new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
 
+    private val resourceProfileIdToStageAttempt =
+      new mutable.HashMap[Int, mutable.Set[StageAttempt]]
+
     // stageAttempt to tuple (the number of task with locality preferences, a map where each pair
-    // is a node and the number of tasks that would like to be scheduled on that node) map,
+    // is a node and the number of tasks that would like to be scheduled on that node, and
+    // the resource profile id) map,
     // maintain the executor placement hints for each stageAttempt used by resource framework
     // to better place the executors.
     private val stageAttemptToExecutorPlacementHints =
-      new mutable.HashMap[StageAttempt, (Int, Map[String, Int])]
+      new mutable.HashMap[StageAttempt, (Int, Map[String, Int], Int)]
 
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
       initializing = false
@@ -535,6 +639,13 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         stageAttemptToNumTasks(stageAttempt) = numTasks
         allocationManager.onSchedulerBacklogged()
+        // need to keep stage task requirements to ask for the right containers
+        val profId = stageSubmitted.stageInfo.resourceProfileId
+        logDebug(s"Stage resource profile id is: $profId with numTasks: $numTasks")
+        resourceProfileIdToStageAttempt.getOrElseUpdate(
+          profId, new mutable.HashSet[StageAttempt]) += stageAttempt
+        numExecutorsToAddPerResourceProfileId.getOrElseUpdate(profId, 1)
+        numExecutorsTargetPerResourceProfileId.getOrElseUpdate(profId, initialNumExecutors)
 
         // Compute the number of tasks requested by the stage on each host
         var numTasksPending = 0
@@ -549,7 +660,7 @@ private[spark] class ExecutorAllocationManager(
           }
         }
         stageAttemptToExecutorPlacementHints.put(stageAttempt,
-          (numTasksPending, hostToLocalTaskCountPerStage.toMap))
+          (numTasksPending, hostToLocalTaskCountPerStage.toMap, profId))
 
         // Update the executor placement hints
         updateExecutorPlacementHints()
@@ -561,7 +672,7 @@ private[spark] class ExecutorAllocationManager(
       val stageAttemptId = stageCompleted.stageInfo.attemptNumber()
       val stageAttempt = StageAttempt(stageId, stageAttemptId)
       allocationManager.synchronized {
-        // do NOT remove stageAttempt from stageAttemptToNumRunningTasks,
+        // do NOT remove stageAttempt from stageAttemptToNumRunningTask
         // because the attempt may still have running tasks,
         // even after another attempt for the stage is submitted.
         stageAttemptToNumTasks -= stageAttempt
@@ -597,7 +708,7 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
             new mutable.HashSet[Int]) += taskIndex
         }
-        if (totalPendingTasks() == 0) {
+        if (!hasPendingTasks) {
           allocationManager.onSchedulerQueueEmpty()
         }
       }
@@ -613,9 +724,22 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask(stageAttempt) -= 1
           if (stageAttemptToNumRunningTask(stageAttempt) == 0) {
             stageAttemptToNumRunningTask -= stageAttempt
+            if (!stageAttemptToNumTasks.contains(stageAttempt)) {
+              val rpForStage = resourceProfileIdToStageAttempt.filter { case (k, v) =>
+                v.contains(stageAttempt)
+              }.keys
+              if (rpForStage.size == 1) {
+                // be careful about the removal from here due to late tasks, make sure stage is
+                // really complete and no tasks left
+                resourceProfileIdToStageAttempt(rpForStage.head) -= stageAttempt
+              } else {
+                logWarning(s"Should have exactly one resource profile for stage $stageAttempt," +
+                  s" but have $rpForStage")
+              }
+            }
+
           }
         }
-
         if (taskEnd.taskInfo.speculative) {
           stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
           stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
@@ -624,7 +748,7 @@ private[spark] class ExecutorAllocationManager(
         taskEnd.reason match {
           case Success | _: TaskKilled =>
           case _ =>
-            if (totalPendingTasks() == 0) {
+            if (!hasPendingTasks) {
               // If the task failed (not intentionally killed), we expect it to be resubmitted
               // later. To ensure we have enough resources to run the resubmitted task, we need to
               // mark the scheduler as backlogged again if it's not already marked as such
@@ -661,20 +785,46 @@ private[spark] class ExecutorAllocationManager(
      *
      * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
      */
-    def pendingTasks(): Int = {
-      stageAttemptToNumTasks.map { case (stageAttempt, numTasks) =>
-        numTasks - stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
-      }.sum
+    def pendingTasksPerResourceProfile(rpId: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rpId, Set.empty).toSeq
+      attempts.map(attempt => getPendingTaskSum(attempt)).sum
     }
 
-    def pendingSpeculativeTasks(): Int = {
-      stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
-        numTasks - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
-      }.sum
+    def hasPendingRegularTasks: Boolean = {
+      val attemptSets = resourceProfileIdToStageAttempt.values
+      attemptSets.exists(attempts => attempts.exists(getPendingTaskSum(_) > 0))
+    }
+
+    private def getPendingTaskSum(attempt: StageAttempt): Int = {
+      val numTotalTasks = stageAttemptToNumTasks.getOrElse(attempt, 0)
+      val numRunning = stageAttemptToTaskIndices.get(attempt).map(_.size).getOrElse(0)
+      numTotalTasks - numRunning
     }
 
-    def totalPendingTasks(): Int = {
-      pendingTasks + pendingSpeculativeTasks
+    def pendingSpeculativeTasksPerResourceProfile(rp: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
+      attempts.map(attempt => getPendingSpeculativeTaskSum(attempt)).sum
+    }
+
+    def hasPendingSpeculativeTasks: Boolean = {
+      val attemptSets = resourceProfileIdToStageAttempt.values
+      attemptSets.exists { attempts =>
+        attempts.exists(getPendingSpeculativeTaskSum(_) > 0)
+      }
+    }
+
+    private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = {
+      val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0)
+      val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0)
+      numTotalTasks - numRunning
+    }
+
+    def hasPendingTasks: Boolean = {
+      hasPendingSpeculativeTasks || hasPendingRegularTasks
+    }
+
+    def totalPendingTasksPerResourceProfile(rp: Int): Int = {
+      pendingTasksPerResourceProfile(rp) + pendingSpeculativeTasksPerResourceProfile(rp)
     }
 
     /**
@@ -685,6 +835,14 @@ private[spark] class ExecutorAllocationManager(
       stageAttemptToNumRunningTask.values.sum
     }
 
+    def totalRunningTasksPerResourceProfile(rp: Int): Int = {
+      val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
+      // attempts is a Set, change to Seq so we keep all values
+      attempts.map { attempt =>
+        stageAttemptToNumRunningTask.getOrElseUpdate(attempt, 0)
+      }.sum
+    }
+
     /**
      * Update the Executor placement hints (the number of tasks with locality preferences,
      * a map where each pair is a node and the number of tasks that would like to be scheduled
@@ -694,18 +852,27 @@ private[spark] class ExecutorAllocationManager(
      * granularity within stages.
      */
     def updateExecutorPlacementHints(): Unit = {
-      var localityAwareTasks = 0
-      val localityToCount = new mutable.HashMap[String, Int]()
-      stageAttemptToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
-        localityAwareTasks += numTasksPending
-        localities.foreach { case (hostname, count) =>
-          val updatedCount = localityToCount.getOrElse(hostname, 0) + count
-          localityToCount(hostname) = updatedCount
-        }
+      val localityAwareTasksPerResourceProfileId = new mutable.HashMap[Int, Int]
+
+      // ResourceProfile id => map[host, count]
+      val rplocalityToCount = new mutable.HashMap[Int, mutable.HashMap[String, Int]]()
+      stageAttemptToExecutorPlacementHints.values.foreach {
+        case (numTasksPending, localities, rpId) =>
+          val rpNumPending =
+            localityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
+          localityAwareTasksPerResourceProfileId(rpId) = rpNumPending + numTasksPending
+          localities.foreach { case (hostname, count) =>
+            val rpBasedHostToCount =
+              rplocalityToCount.getOrElseUpdate(rpId, new mutable.HashMap[String, Int])
+            val newUpdated = rpBasedHostToCount.getOrElse(hostname, 0) + count
+            rpBasedHostToCount(hostname) = newUpdated
+          }
       }
 
-      allocationManager.localityAwareTasks = localityAwareTasks
-      allocationManager.hostToLocalTaskCount = localityToCount.toMap
+      allocationManager.numLocalityAwareTasksPerResourceProfileId =
+        localityAwareTasksPerResourceProfileId
+      allocationManager.rpIdToHostToLocalTaskCount =
+        rplocalityToCount.map { case (k, v) => (k, v.toMap)}.toMap
     }
   }
 
@@ -726,14 +893,22 @@ private[spark] class ExecutorAllocationManager(
       })
     }
 
-    registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
+    // The metrics are going to return the sum for all the different ResourceProfiles.
+    registerGauge("numberExecutorsToAdd",
+      numExecutorsToAddPerResourceProfileId.values.sum, 0)
     registerGauge("numberExecutorsPendingToRemove", executorMonitor.pendingRemovalCount, 0)
     registerGauge("numberAllExecutors", executorMonitor.executorCount, 0)
-    registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
-    registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
+    registerGauge("numberTargetExecutors",
+      numExecutorsTargetPerResourceProfileId.values.sum, 0)
+    registerGauge("numberMaxNeededExecutors", numExecutorsTargetPerResourceProfileId.keys
+        .map(maxNumExecutorsNeededPerResourceProfile(_)).sum, 0)
   }
 }
 
 private object ExecutorAllocationManager {
   val NOT_SET = Long.MaxValue
+
+  // helper case class for requesting executors, here to be visible for testing
+  private[spark] case class TargetNumUpdates(delta: Int, oldNumExecutorsTarget: Int)
+
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 91188d5..a47136e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
+import scala.collection.immutable
 import scala.collection.mutable.HashMap
 import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
@@ -53,7 +54,7 @@ import org.apache.spark.io.CompressionCodec
 import org.apache.spark.metrics.source.JVMCPUSource
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
-import org.apache.spark.resource.{ResourceID, ResourceInformation}
+import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
@@ -219,9 +220,10 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _shutdownHookRef: AnyRef = _
   private var _statusStore: AppStatusStore = _
   private var _heartbeater: Heartbeater = _
-  private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
+  private var _resources: immutable.Map[String, ResourceInformation] = _
   private var _shuffleDriverComponents: ShuffleDriverComponents = _
   private var _plugins: Option[PluginContainer] = None
+  private var _resourceProfileManager: ResourceProfileManager = _
 
   /* ------------------------------------------------------------------------------------- *
    | Accessors and public fields. These provide access to the internal state of the        |
@@ -343,6 +345,8 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
     _executorAllocationManager
 
+  private[spark] def resourceProfileManager: ResourceProfileManager = _resourceProfileManager
+
   private[spark] def cleaner: Option[ContextCleaner] = _cleaner
 
   private[spark] var checkpointDir: Option[String] = None
@@ -451,6 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
     }
 
     _listenerBus = new LiveListenerBus(_conf)
+    _resourceProfileManager = new ResourceProfileManager(_conf)
 
     // Initialize the app status store and listener before SparkEnv is created so that it gets
     // all events.
@@ -611,7 +616,7 @@ class SparkContext(config: SparkConf) extends Logging {
           case b: ExecutorAllocationClient =>
             Some(new ExecutorAllocationManager(
               schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
-              cleaner = cleaner))
+              cleaner = cleaner, resourceProfileManager = resourceProfileManager))
           case _ =>
             None
         }
@@ -1622,7 +1627,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
   /**
    * Update the cluster manager on our scheduling needs. Three bits of information are included
-   * to help it make decisions.
+   * to help it make decisions. This applies to the default ResourceProfile.
    * @param numExecutors The total number of executors we'd like to have. The cluster manager
    *                     shouldn't kill any running executor to reach this number, but,
    *                     if all existing executors were to die, this is the number of executors
@@ -1638,11 +1643,16 @@ class SparkContext(config: SparkConf) extends Logging {
   def requestTotalExecutors(
       numExecutors: Int,
       localityAwareTasks: Int,
-      hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
+      hostToLocalTaskCount: immutable.Map[String, Int]
     ): Boolean = {
     schedulerBackend match {
       case b: ExecutorAllocationClient =>
-        b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
+        // this is being applied to the default resource profile, would need to add api to support
+        // others
+        val defaultProfId = resourceProfileManager.defaultResourceProfile.id
+        b.requestTotalExecutors(immutable.Map(defaultProfId-> numExecutors),
+          immutable.Map(localityAwareTasks -> defaultProfId),
+          immutable.Map(defaultProfId -> hostToLocalTaskCount))
       case _ =>
         logWarning("Requesting executors is not supported by current scheduler.")
         false
@@ -2036,6 +2046,7 @@ class SparkContext(config: SparkConf) extends Logging {
     // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
     // `SparkContext` is stopped.
     localProperties.remove()
+    ResourceProfile.clearDefaultProfile()
     // Unset YARN mode system env variable, to allow switching between cluster types.
     SparkContext.clearActiveContext()
     logInfo("Successfully stopped SparkContext")
@@ -2771,109 +2782,34 @@ object SparkContext extends Logging {
     // When running locally, don't try to re-execute tasks on failure.
     val MAX_LOCAL_TASK_FAILURES = 1
 
-    // Ensure that executor's resources satisfies one or more tasks requirement.
-    def checkResourcesPerTask(clusterMode: Boolean, executorCores: Option[Int]): Unit = {
+    // Ensure that default executor's resources satisfies one or more tasks requirement.
+    // This function is for cluster managers that don't set the executor cores config, for
+    // others its checked in ResourceProfile.
+    def checkResourcesPerTask(executorCores: Int): Unit = {
       val taskCores = sc.conf.get(CPUS_PER_TASK)
-      val execCores = if (clusterMode) {
-        executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
-      } else {
-        executorCores.get
-      }
-      // some cluster managers don't set the EXECUTOR_CORES config by default (standalone
-      // and mesos coarse grained), so we can't rely on that config for those.
-      val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
-        (master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))
-
-      // Number of cores per executor must meet at least one task requirement.
-      if (shouldCheckExecCores && execCores < taskCores) {
-        throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
-          s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on $master.")
-      }
-
-      // Calculate the max slots each executor can provide based on resources available on each
-      // executor and resources required by each task.
-      val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
-      val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
-          .map(request => (request.id.resourceName, request.amount)).toMap
-
-      var (numSlots, limitingResourceName) = if (shouldCheckExecCores) {
-        (execCores / taskCores, "CPU")
-      } else {
-        (-1, "")
-      }
-
-      taskResourceRequirements.foreach { taskReq =>
-        // Make sure the executor resources were specified through config.
-        val execAmount = executorResourcesAndAmounts.getOrElse(taskReq.resourceName,
-          throw new SparkException("The executor resource config: " +
-            new ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
-            " needs to be specified since a task requirement config: " +
-            new ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
-            " was specified")
-        )
-        // Make sure the executor resources are large enough to launch at least one task.
-        if (execAmount < taskReq.amount) {
-          throw new SparkException("The executor resource config: " +
-            new ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
-            s" = $execAmount has to be >= the requested amount in task resource config: " +
-            new ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
-            s" = ${taskReq.amount}")
-        }
-        // Compare and update the max slots each executor can provide.
-        // If the configured amount per task was < 1.0, a task is subdividing
-        // executor resources. If the amount per task was > 1.0, the task wants
-        // multiple executor resources.
-        val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
-        if (resourceNumSlots < numSlots) {
-          if (shouldCheckExecCores) {
-            throw new IllegalArgumentException("The number of slots on an executor has to be " +
-              "limited by the number of cores, otherwise you waste resources and " +
-              "dynamic allocation doesn't work properly. Your configuration has " +
-              s"core/task cpu slots = ${numSlots} and " +
-              s"${taskReq.resourceName} = ${resourceNumSlots}. " +
-              "Please adjust your configuration so that all resources require same number " +
-              "of executor slots.")
-          }
-          numSlots = resourceNumSlots
-          limitingResourceName = taskReq.resourceName
-        }
-      }
-      if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
-        // if we can't rely on the executor cores config throw a warning for user
-        logWarning("Please ensure that the number of slots available on your " +
-          "executors is limited by the number of cores to task cpus and not another " +
-          "custom resource. If cores is not the limiting resource then dynamic " +
-          "allocation will not work properly!")
-      }
-      // warn if we would waste any resources due to another resource limiting the number of
-      // slots on an executor
-      taskResourceRequirements.foreach { taskReq =>
-        val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
-        if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
-          val taskReqStr = if (taskReq.numParts > 1) {
-            s"${taskReq.amount}/${taskReq.numParts}"
-          } else {
-            s"${taskReq.amount}"
-          }
-          val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
-          val message = s"The configuration of resource: ${taskReq.resourceName} " +
-            s"(exec = ${execAmount}, task = ${taskReqStr}, " +
-            s"runnable tasks = ${resourceNumSlots}) will " +
-            s"result in wasted resources due to resource ${limitingResourceName} limiting the " +
-            s"number of runnable tasks per executor to: ${numSlots}. Please adjust " +
-            s"your configuration."
-          if (Utils.isTesting) {
-            throw new SparkException(message)
-          } else {
-            logWarning(message)
-          }
-        }
+      validateTaskCpusLargeEnough(executorCores, taskCores)
+      val defaultProf = sc.resourceProfileManager.defaultResourceProfile
+      // TODO - this is temporary until all of stage level scheduling feature is integrated,
+      // fail if any other resource limiting due to dynamic allocation and scheduler using
+      // slots based on cores
+      val cpuSlots = executorCores/taskCores
+      val limitingResource = defaultProf.limitingResource(sc.conf)
+      if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS) &&
+        defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
+        throw new IllegalArgumentException("The number of slots on an executor has to be " +
+          "limited by the number of cores, otherwise you waste resources and " +
+          "dynamic allocation doesn't work properly. Your configuration has " +
+          s"core/task cpu slots = ${cpuSlots} and " +
+          s"${limitingResource} = " +
+          s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " +
+          "so that all resources require same number of executor slots.")
       }
+      ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
     }
 
     master match {
       case "local" =>
-        checkResourcesPerTask(clusterMode = false, Some(1))
+        checkResourcesPerTask(1)
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
         scheduler.initialize(backend)
@@ -2886,7 +2822,7 @@ object SparkContext extends Logging {
         if (threadCount <= 0) {
           throw new SparkException(s"Asked to run locally with $threadCount threads")
         }
-        checkResourcesPerTask(clusterMode = false, Some(threadCount))
+        checkResourcesPerTask(threadCount)
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
         scheduler.initialize(backend)
@@ -2897,14 +2833,13 @@ object SparkContext extends Logging {
         // local[*, M] means the number of cores on the computer with M failures
         // local[N, M] means exactly N threads with M failures
         val threadCount = if (threads == "*") localCpuCount else threads.toInt
-        checkResourcesPerTask(clusterMode = false, Some(threadCount))
+        checkResourcesPerTask(threadCount)
         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
         scheduler.initialize(backend)
         (backend, scheduler)
 
       case SPARK_REGEX(sparkUrl) =>
-        checkResourcesPerTask(clusterMode = true, None)
         val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
@@ -2912,7 +2847,7 @@ object SparkContext extends Logging {
         (backend, scheduler)
 
       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
-        checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
+        checkResourcesPerTask(coresPerSlave.toInt)
         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
         val memoryPerSlaveInt = memoryPerSlave.toInt
         if (sc.executorMemory > memoryPerSlaveInt) {
@@ -2941,7 +2876,6 @@ object SparkContext extends Logging {
         (backend, scheduler)
 
       case masterUrl =>
-        checkResourcesPerTask(clusterMode = true, None)
         val cm = getClusterManager(masterUrl) match {
           case Some(clusterMgr) => clusterMgr
           case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
index 21660ab..51df73e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -53,4 +53,13 @@ private[spark] object Tests {
   val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
     .intConf
     .createWithDefault(2)
+
+  val RESOURCES_WARNING_TESTING =
+    ConfigBuilder("spark.resources.warnings.testing").booleanConf.createWithDefault(false)
+
+  val RESOURCE_PROFILE_MANAGER_TESTING =
+    ConfigBuilder("spark.testing.resourceProfileManager")
+      .booleanConf
+      .createWithDefault(false)
+
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
index d345674..d4c29f9 100644
--- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -109,7 +109,7 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
       discoveryScript: String = "",
       vendor: String = ""): this.type = {
     // a bit weird but for Java api use empty string as meaning None because empty
-    // string is otherwise invalid for those paramters anyway
+    // string is otherwise invalid for those parameters anyway
     val req = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
     _executorResources.put(resourceName, req)
     this
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 14019d2..03dcf5e 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -22,12 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
+import org.apache.spark.util.Utils
 
 /**
  * Resource profile to associate with an RDD. A ResourceProfile allows the user to
@@ -42,6 +44,13 @@ class ResourceProfile(
 
   // _id is only a var for testing purposes
   private var _id = ResourceProfile.getNextProfileId
+  // This is used for any resources that use fractional amounts, the key is the resource name
+  // and the value is the number of tasks that can share a resource address. For example,
+  // if the user says task gpu amount is 0.5, that results in 2 tasks per resource address.
+  private var _executorResourceSlotsPerAddr: Option[Map[String, Int]] = None
+  private var _limitingResource: Option[String] = None
+  private var _maxTasksPerExecutor: Option[Int] = None
+  private var _coresLimitKnown: Boolean = false
 
   def id: Int = _id
 
@@ -67,6 +76,138 @@ class ResourceProfile(
     taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
   }
 
+  private[spark] def getNumSlotsPerAddress(resource: String, sparkConf: SparkConf): Int = {
+    _executorResourceSlotsPerAddr.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+    }
+    _executorResourceSlotsPerAddr.get.getOrElse(resource,
+      throw new SparkException(s"Resource $resource doesn't exist in profile id: $id"))
+  }
+
+  // Maximum tasks you could put on an executor with this profile based on the limiting resource.
+  // If the executor cores config is not present this value is based on the other resources
+  // available or 1 if no other resources. You need to check the isCoresLimitKnown to
+  // calculate proper value.
+  private[spark] def maxTasksPerExecutor(sparkConf: SparkConf): Int = {
+    _maxTasksPerExecutor.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+      _maxTasksPerExecutor.get
+    }
+  }
+
+  // Returns whether the executor cores was available to use to calculate the max tasks
+  // per executor and limiting resource. Some cluster managers (like standalone and coarse
+  // grained mesos) don't use the cores config by default so we can't use it to calculate slots.
+  private[spark] def isCoresLimitKnown: Boolean = _coresLimitKnown
+
+  // The resource that has the least amount of slots per executor. Its possible multiple or all
+  // resources result in same number of slots and this could be any of those.
+  // If the executor cores config is not present this value is based on the other resources
+  // available or empty string if no other resources. You need to check the isCoresLimitKnown to
+  // calculate proper value.
+  private[spark] def limitingResource(sparkConf: SparkConf): String = {
+    _limitingResource.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+      _limitingResource.get
+    }
+  }
+
+  // executor cores config is not set for some masters by default and the default value
+  // only applies to yarn/k8s
+  private def shouldCheckExecutorCores(sparkConf: SparkConf): Boolean = {
+    val master = sparkConf.getOption("spark.master")
+    sparkConf.contains(EXECUTOR_CORES) ||
+      (master.isDefined && (master.get.equalsIgnoreCase("yarn") || master.get.startsWith("k8s")))
+  }
+
+  /**
+   * Utility function to calculate the number of tasks you can run on a single Executor based
+   * on the task and executor resource requests in the ResourceProfile. This will be based
+   * off the resource that is most restrictive. For instance, if the executor
+   * request is for 4 cpus and 2 gpus and your task request is for 1 cpu and 1 gpu each, the
+   * limiting resource is gpu and the number of tasks you can run on a single executor is 2.
+   * This function also sets the limiting resource, isCoresLimitKnown and number of slots per
+   * resource address.
+   */
+  private def calculateTasksAndLimitingResource(sparkConf: SparkConf): Unit = synchronized {
+    val shouldCheckExecCores = shouldCheckExecutorCores(sparkConf)
+    var (taskLimit, limitingResource) = if (shouldCheckExecCores) {
+      val cpusPerTask = taskResources.get(ResourceProfile.CPUS)
+        .map(_.amount).getOrElse(sparkConf.get(CPUS_PER_TASK).toDouble).toInt
+      assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0")
+      val coresPerExecutor = getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
+      _coresLimitKnown = true
+      ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask)
+      val tasksBasedOnCores = coresPerExecutor / cpusPerTask
+      // Note that if the cores per executor aren't set properly this calculation could be off,
+      // we default it to just be 1 in order to allow checking of the rest of the custom
+      // resources. We set the limit based on the other resources available.
+      (tasksBasedOnCores, ResourceProfile.CPUS)
+    } else {
+      (-1, "")
+    }
+    val numPartsPerResourceMap = new mutable.HashMap[String, Int]
+    numPartsPerResourceMap(ResourceProfile.CORES) = 1
+    val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
+    taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
+    val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
+    execResourceToCheck.foreach { case (rName, execReq) =>
+      val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
+      numPartsPerResourceMap(rName) = 1
+      if (taskReq > 0.0) {
+        if (taskReq > execReq.amount) {
+          throw new SparkException(s"The executor resource: $rName, amount: ${execReq.amount} " +
+            s"needs to be >= the task resource request amount of $taskReq")
+        }
+        val (numPerTask, parts) = ResourceUtils.calculateAmountAndPartsForFraction(taskReq)
+        numPartsPerResourceMap(rName) = parts
+        val numTasks = ((execReq.amount * parts) / numPerTask).toInt
+        if (taskLimit == -1 || numTasks < taskLimit) {
+          if (shouldCheckExecCores) {
+            // TODO - until resource profiles full implemented we need to error if cores not
+            // limiting resource because the scheduler code uses that for slots
+            throw new IllegalArgumentException("The number of slots on an executor has to be " +
+              "limited by the number of cores, otherwise you waste resources and " +
+              "dynamic allocation doesn't work properly. Your configuration has " +
+              s"core/task cpu slots = ${taskLimit} and " +
+              s"${execReq.resourceName} = ${numTasks}. " +
+              "Please adjust your configuration so that all resources require same number " +
+              "of executor slots.")
+          }
+          limitingResource = rName
+          taskLimit = numTasks
+        }
+        taskResourcesToCheck -= rName
+      } else {
+        logWarning(s"The executor resource config for resource: $rName was specified but " +
+          "no corresponding task resource request was specified.")
+      }
+    }
+    if(!shouldCheckExecCores) {
+      // if we can't rely on the executor cores config throw a warning for user
+      logWarning("Please ensure that the number of slots available on your " +
+        "executors is limited by the number of cores to task cpus and not another " +
+        "custom resource. If cores is not the limiting resource then dynamic " +
+        "allocation will not work properly!")
+    }
+    if (taskResourcesToCheck.nonEmpty) {
+      throw new SparkException("No executor resource configs were not specified for the " +
+        s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
+    }
+    logInfo(s"Limiting resource is $limitingResource at $taskLimit tasks per executor")
+    _executorResourceSlotsPerAddr = Some(numPartsPerResourceMap.toMap)
+    _maxTasksPerExecutor = if (taskLimit == -1) Some(1) else Some(taskLimit)
+    _limitingResource = Some(limitingResource)
+    if (shouldCheckExecCores) {
+      ResourceUtils.warnOnWastedResources(this, sparkConf)
+    }
+  }
+
+  // to be used only by history server for reconstruction from events
+  private[spark] def setResourceProfileId(id: Int): Unit = {
+    _id = id
+  }
+
   // testing only
   private[spark] def setToDefaultProfile(): Unit = {
     _id = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
@@ -123,7 +264,7 @@ object ResourceProfile extends Logging {
           val taskResources = getDefaultTaskResources(conf)
           val executorResources = getDefaultExecutorResources(conf)
           val defProf = new ResourceProfile(executorResources, taskResources)
-          defProf.setToDefaultProfile
+          defProf.setToDefaultProfile()
           defaultProfile = Some(defProf)
           logInfo("Default ResourceProfile created, executor resources: " +
             s"${defProf.executorResources}, task resources: " +
@@ -157,13 +298,12 @@ object ResourceProfile extends Logging {
 
   // for testing only
   private[spark] def reInitDefaultProfile(conf: SparkConf): Unit = {
-    clearDefaultProfile
+    clearDefaultProfile()
     // force recreate it after clearing
     getOrCreateDefaultProfile(conf)
   }
 
-  // for testing only
-  private[spark] def clearDefaultProfile: Unit = {
+  private[spark] def clearDefaultProfile(): Unit = {
     DEFAULT_PROFILE_LOCK.synchronized {
       defaultProfile = None
     }
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
index 0d55c17..26f23f4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
@@ -31,7 +31,7 @@ import org.apache.spark.annotation.Evolving
  * requirements between stages.
  */
 @Evolving
-class ResourceProfileBuilder() {
+private[spark] class ResourceProfileBuilder() {
 
   private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
   private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
new file mode 100644
index 0000000..06db946
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Tests._
+import org.apache.spark.util.Utils
+import org.apache.spark.util.Utils.isTesting
+
+/**
+ * Manager of resource profiles. The manager allows one place to keep the actual ResourceProfiles
+ * and everywhere else we can use the ResourceProfile Id to save on space.
+ * Note we never remove a resource profile at this point. Its expected this number if small
+ * so this shouldn't be much overhead.
+ */
+@Evolving
+private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
+  private val resourceProfileIdToResourceProfile = new ConcurrentHashMap[Int, ResourceProfile]()
+
+  private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+  addResourceProfile(defaultProfile)
+
+  def defaultResourceProfile: ResourceProfile = defaultProfile
+
+  private val taskCpusDefaultProfile = defaultProfile.getTaskCpus.get
+  private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
+  private val master = sparkConf.getOption("spark.master")
+  private val isNotYarn = master.isDefined && !master.get.equals("yarn")
+  private val errorForTesting = !isTesting || sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
+
+  // If we use anything except the default profile, its only supported on YARN right now.
+  // Throw an exception if not supported.
+  private[spark] def isSupported(rp: ResourceProfile): Boolean = {
+    val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+    val notYarnAndNotDefaultProfile = isNotDefaultProfile && isNotYarn
+    val YarnNotDynAllocAndNotDefaultProfile = isNotDefaultProfile && !isNotYarn && !dynamicEnabled
+    if (errorForTesting && (notYarnAndNotDefaultProfile || YarnNotDynAllocAndNotDefaultProfile)) {
+      throw new SparkException("ResourceProfiles are only supported on YARN with dynamic " +
+        "allocation enabled.")
+    }
+    true
+  }
+
+  def addResourceProfile(rp: ResourceProfile): Unit = {
+    isSupported(rp)
+    // force the computation of maxTasks and limitingResource now so we don't have cost later
+    rp.limitingResource(sparkConf)
+    logInfo(s"Adding ResourceProfile id: ${rp.id}")
+    resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+  }
+
+  /*
+   * Gets the ResourceProfile associated with the id, if a profile doesn't exist
+   * it returns the default ResourceProfile created from the application level configs.
+   */
+  def resourceProfileFromId(rpId: Int): ResourceProfile = {
+    val rp = resourceProfileIdToResourceProfile.get(rpId)
+    if (rp == null) {
+      throw new SparkException(s"ResourceProfileId $rpId not found!")
+    }
+    rp
+  }
+
+  def taskCpusForProfileId(rpId: Int): Int = {
+    resourceProfileFromId(rpId).getTaskCpus.getOrElse(taskCpusDefaultProfile)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 7dd7fc1..cdb761c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -29,7 +29,8 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.resource.ResourceDiscoveryPlugin
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX}
+import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES, RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX}
+import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING}
 import org.apache.spark.util.Utils
 
 /**
@@ -161,19 +162,23 @@ private[spark] object ResourceUtils extends Logging {
   }
 
   // Used to take a fraction amount from a task resource requirement and split into a real
-  // integer amount and the number of parts expected. For instance, if the amount is 0.5,
-  // the we get (1, 2) back out.
-  // Returns tuple of (amount, numParts)
-  def calculateAmountAndPartsForFraction(amount: Double): (Int, Int) = {
-    val parts = if (amount <= 0.5) {
-      Math.floor(1.0 / amount).toInt
-    } else if (amount % 1 != 0) {
+  // integer amount and the number of slots per address. For instance, if the amount is 0.5,
+  // the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per
+  // address, which allows you to put 2 tasks on that address. Note if amount is greater
+  // than 1, then the number of slots per address has to be 1. This would indicate that a
+  // would have multiple addresses assigned per task. This can be used for calculating
+  // the number of tasks per executor -> (executorAmount * numParts) / (integer amount).
+  // Returns tuple of (integer amount, numParts)
+  def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = {
+    val parts = if (doubleAmount <= 0.5) {
+      Math.floor(1.0 / doubleAmount).toInt
+    } else if (doubleAmount % 1 != 0) {
       throw new SparkException(
-        s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
+        s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.")
     } else {
       1
     }
-    (Math.ceil(amount).toInt, parts)
+    (Math.ceil(doubleAmount).toInt, parts)
   }
 
   // Add any task resource requests from the spark conf to the TaskResourceRequests passed in
@@ -382,6 +387,90 @@ private[spark] object ResourceUtils extends Logging {
       s"${resourceRequest.id.resourceName}")
   }
 
+  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+    // Number of cores per executor must meet at least one task requirement.
+    if (execCores < taskCpus) {
+      throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
+        s"the number of cpus per task = $taskCpus.")
+    }
+    true
+  }
+
+  // the option executor cores parameter is by the different local modes since it not configured
+  // via the config
+  def warnOnWastedResources(
+      rp: ResourceProfile,
+      sparkConf: SparkConf,
+      execCores: Option[Int] = None): Unit = {
+    // There have been checks on the ResourceProfile to make sure the executor resources were
+    // specified and are large enough if any task resources were specified.
+    // Now just do some sanity test and log warnings when it looks like the user will
+    // waste some resources.
+    val coresKnown = rp.isCoresLimitKnown
+    var limitingResource = rp.limitingResource(sparkConf)
+    var maxTaskPerExec = rp.maxTasksPerExecutor(sparkConf)
+    val taskCpus = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK))
+    val cores = if (execCores.isDefined) {
+      execCores.get
+    } else if (coresKnown) {
+      rp.getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
+    } else {
+      // can't calculate cores limit
+      return
+    }
+    // when executor cores config isn't set, we can't calculate the real limiting resource
+    // and number of tasks per executor ahead of time, so calculate it now.
+    if (!coresKnown) {
+      val numTasksPerExecCores = cores / taskCpus
+      val numTasksPerExecCustomResource = rp.maxTasksPerExecutor(sparkConf)
+      if (limitingResource.isEmpty ||
+        (limitingResource.nonEmpty && numTasksPerExecCores < numTasksPerExecCustomResource)) {
+        limitingResource = ResourceProfile.CPUS
+        maxTaskPerExec = numTasksPerExecCores
+      }
+    }
+    val taskReq = ResourceProfile.getCustomTaskResources(rp)
+    val execReq = ResourceProfile.getCustomExecutorResources(rp)
+
+    if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS)) {
+      if ((taskCpus * maxTaskPerExec) < cores) {
+        val resourceNumSlots = Math.floor(cores/taskCpus).toInt
+        val message = s"The configuration of cores (exec = ${cores} " +
+          s"task = ${taskCpus}, runnable tasks = ${resourceNumSlots}) will " +
+          s"result in wasted resources due to resource ${limitingResource} limiting the " +
+          s"number of runnable tasks per executor to: ${maxTaskPerExec}. Please adjust " +
+          "your configuration."
+        if (sparkConf.get(RESOURCES_WARNING_TESTING)) {
+          throw new SparkException(message)
+        } else {
+          logWarning(message)
+        }
+      }
+    }
+
+    taskReq.foreach { case (rName, treq) =>
+      val execAmount = execReq(rName).amount
+      val numParts = rp.getNumSlotsPerAddress(rName, sparkConf)
+      // handle fractional
+      val taskAmount = if (numParts > 1) 1 else treq.amount
+      if (maxTaskPerExec < (execAmount * numParts / taskAmount)) {
+        val taskReqStr = s"${taskAmount}/${numParts}"
+        val resourceNumSlots = Math.floor(execAmount * numParts / taskAmount).toInt
+        val message = s"The configuration of resource: ${treq.resourceName} " +
+          s"(exec = ${execAmount}, task = ${taskReqStr}, " +
+          s"runnable tasks = ${resourceNumSlots}) will " +
+          s"result in wasted resources due to resource ${limitingResource} limiting the " +
+          s"number of runnable tasks per executor to: ${maxTaskPerExec}. Please adjust " +
+          "your configuration."
+        if (sparkConf.get(RESOURCES_WARNING_TESTING)) {
+          throw new SparkException(message)
+        } else {
+          logWarning(message)
+        }
+      }
+    }
+  }
+
   // known types of resources
   final val GPU: String = "gpu"
   final val FPGA: String = "fpga"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7bf363d..fd5c3e0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -37,7 +37,8 @@ import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -391,7 +392,8 @@ private[spark] class DAGScheduler(
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
     val stage = new ShuffleMapStage(
-      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
+      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     stageIdToStage(id) = stage
     shuffleIdToMapStage(shuffleDep.shuffleId) = stage
@@ -453,7 +455,8 @@ private[spark] class DAGScheduler(
     checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
-    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
+    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     stageIdToStage(id) = stage
     updateJobIdStageIdMaps(jobId, stage)
     stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
index d168783..7fdc318 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -34,8 +34,9 @@ private[spark] class ResultStage(
     val partitions: Array[Int],
     parents: List[Stage],
     firstJobId: Int,
-    callSite: CallSite)
-  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {
+    callSite: CallSite,
+    resourceProfileId: Int)
+  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite, resourceProfileId) {
 
   /**
    * The active job for this result stage. Will be empty if the job has already finished
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index 1b44d0a..be1984d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -42,8 +42,9 @@ private[spark] class ShuffleMapStage(
     firstJobId: Int,
     callSite: CallSite,
     val shuffleDep: ShuffleDependency[_, _, _],
-    mapOutputTrackerMaster: MapOutputTrackerMaster)
-  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
+    mapOutputTrackerMaster: MapOutputTrackerMaster,
+    resourceProfileId: Int)
+  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite, resourceProfileId) {
 
   private[this] var _mapStageJobs: List[ActiveJob] = Nil
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a9f72ea..ae7924d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -59,7 +59,8 @@ private[scheduler] abstract class Stage(
     val numTasks: Int,
     val parents: List[Stage],
     val firstJobId: Int,
-    val callSite: CallSite)
+    val callSite: CallSite,
+    val resourceProfileId: Int)
   extends Logging {
 
   val numPartitions = rdd.partitions.length
@@ -79,7 +80,8 @@ private[scheduler] abstract class Stage(
    * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
    * have been created).
    */
-  private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
+  private var _latestInfo: StageInfo =
+    StageInfo.fromStage(this, nextAttemptId, resourceProfileId = resourceProfileId)
 
   /**
    * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
@@ -100,7 +102,8 @@ private[scheduler] abstract class Stage(
     val metrics = new TaskMetrics
     metrics.register(rdd.sparkContext)
     _latestInfo = StageInfo.fromStage(
-      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
+      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
+      resourceProfileId = resourceProfileId)
     nextAttemptId += 1
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index fdc5032..556478d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -38,7 +38,8 @@ class StageInfo(
     val details: String,
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
-    private[spark] val shuffleDepId: Option[Int] = None) {
+    private[spark] val shuffleDepId: Option[Int] = None,
+    val resourceProfileId: Int) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
   /** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -87,7 +88,8 @@ private[spark] object StageInfo {
       attemptId: Int,
       numTasks: Option[Int] = None,
       taskMetrics: TaskMetrics = null,
-      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
+      resourceProfileId: Int
     ): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
@@ -105,6 +107,7 @@ private[spark] object StageInfo {
       stage.details,
       taskMetrics,
       taskLocalityPreferences,
-      shuffleDepId)
+      shuffleDepId,
+      resourceProfileId)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6a1d460..bf92081 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -384,7 +384,9 @@ private[spark] class TaskSchedulerImpl(
    */
   private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = {
     val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
+    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask)
+    logDebug(s"Resources meet task requirements is: $meetsReqs")
+    meetsReqs
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 55f4005..63aa049 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -69,13 +69,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME))
   private val createTimeNs = System.nanoTime()
 
-  private val taskResourceNumParts: Map[String, Int] =
-    if (scheduler.resourcesReqsPerTask != null) {
-      scheduler.resourcesReqsPerTask.map(req => req.resourceName -> req.numParts).toMap
-    } else {
-      Map.empty
-    }
-
   // Accessing `executorDataMap` in the inherited methods from ThreadSafeRpcEndpoint doesn't need
   // any protection. But accessing `executorDataMap` out of the inherited methods must be
   // protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
@@ -83,13 +76,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // `CoarseGrainedSchedulerBackend.this`.
   private val executorDataMap = new HashMap[String, ExecutorData]
 
-  // Number of executors requested by the cluster manager, [[ExecutorAllocationManager]]
-  @GuardedBy("CoarseGrainedSchedulerBackend.this")
-  private var requestedTotalExecutors = 0
-
-  // Number of executors requested from the cluster manager that have not registered yet
+  // Number of executors for each ResourceProfile requested by the cluster
+  // manager, [[ExecutorAllocationManager]]
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
-  private var numPendingExecutors = 0
+  private val requestedTotalExecutorsPerResourceProfile = new HashMap[ResourceProfile, Int]
 
   private val listenerBus = scheduler.sc.listenerBus
 
@@ -102,13 +92,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Executors that have been lost, but for which we don't yet know the real exit reason.
   private val executorsPendingLossReason = new HashSet[String]
 
-  // A map to store hostname with its possible task number running on it
+  // A map of ResourceProfile id to map of hostname with its possible task number running on it
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
-  protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
+  protected var rpHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
 
-  // The number of pending tasks which is locality required
+  // The number of pending tasks per ResourceProfile id which is locality required
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
-  protected var localityAwareTasks = 0
+  protected var numLocalityAwareTasksPerResourceProfileId = Map.empty[Int, Int]
 
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
@@ -223,16 +213,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             } else {
               context.senderAddress
             }
-          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
+          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId, " +
+            s" ResourceProfileId $resourceProfileId")
           addressToExecutorId(executorAddress) = executorId
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
-          val resourcesInfo = resources.map{ case (k, v) =>
-            (v.name,
-             new ExecutorResourceInfo(v.name, v.addresses,
-               // tell the executor it can schedule resources up to numParts times,
-               // as configured by the user, or set to 1 as that is the default (1 task/resource)
-               taskResourceNumParts.getOrElse(v.name, 1)))
+          val resourcesInfo = resources.map { case (rName, info) =>
+            // tell the executor it can schedule resources up to numParts times,
+            // as configured by the user, or set to 1 as that is the default (1 task/resource)
+            val numParts = scheduler.sc.resourceProfileManager
+              .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
+            (info.name, new ExecutorResourceInfo(info.name, info.addresses, numParts))
           }
           val data = new ExecutorData(executorRef, executorAddress, hostname,
             0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
@@ -244,10 +235,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             if (currentExecutorIdCounter < executorId.toInt) {
               currentExecutorIdCounter = executorId.toInt
             }
-            if (numPendingExecutors > 0) {
-              numPendingExecutors -= 1
-              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
-            }
           }
           // Note: some tests expect the reply to come after we put the executor in the map
           context.reply(true)
@@ -271,10 +258,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         context.reply(true)
 
       case RetrieveSparkAppConfig(resourceProfileId) =>
-        // note this will be updated in later prs to get the ResourceProfile from a
-        // ResourceProfileManager that matches the resource profile id
-        // for now just use default profile
-        val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
+        val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
         val reply = SparkAppConfig(
           sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey(),
@@ -494,8 +478,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * */
   protected[scheduler] def reset(): Unit = {
     val executors: Set[String] = synchronized {
-      requestedTotalExecutors = 0
-      numPendingExecutors = 0
+      requestedTotalExecutorsPerResourceProfile.clear()
       executorDataMap.keys.toSet
     }
 
@@ -577,12 +560,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   // this function is for testing only
   def getExecutorResourceProfileId(executorId: String): Int = synchronized {
-    val res = executorDataMap.get(executorId)
-    res.map(_.resourceProfileId).getOrElse(ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
+    val execDataOption = executorDataMap.get(executorId)
+    execDataOption.map(_.resourceProfileId).getOrElse(ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
   }
 
   /**
-   * Request an additional number of executors from the cluster manager.
+   * Request an additional number of executors from the cluster manager. This is
+   * requesting against the default ResourceProfile, we will need an API change to
+   * allow against other profiles.
    * @return whether the request is acknowledged.
    */
   final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
@@ -594,21 +579,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
 
     val response = synchronized {
-      requestedTotalExecutors += numAdditionalExecutors
-      numPendingExecutors += numAdditionalExecutors
-      logDebug(s"Number of pending executors is now $numPendingExecutors")
-      if (requestedTotalExecutors !=
-          (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
-        logDebug(
-          s"""requestExecutors($numAdditionalExecutors): Executor request doesn't match:
-             |requestedTotalExecutors  = $requestedTotalExecutors
-             |numExistingExecutors     = $numExistingExecutors
-             |numPendingExecutors      = $numPendingExecutors
-             |executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
-      }
-
+      val defaultProf = scheduler.sc.resourceProfileManager.defaultResourceProfile
+      val numExisting = requestedTotalExecutorsPerResourceProfile.getOrElse(defaultProf, 0)
+      requestedTotalExecutorsPerResourceProfile(defaultProf) = numExisting + numAdditionalExecutors
       // Account for executors pending to be added or removed
-      doRequestTotalExecutors(requestedTotalExecutors)
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
     }
 
     defaultAskTimeout.awaitResult(response)
@@ -617,39 +592,41 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Update the cluster manager on our scheduling needs. Three bits of information are included
    * to help it make decisions.
-   * @param numExecutors The total number of executors we'd like to have. The cluster manager
-   *                     shouldn't kill any running executor to reach this number, but,
-   *                     if all existing executors were to die, this is the number of executors
-   *                     we'd want to be allocated.
-   * @param localityAwareTasks The number of tasks in all active stages that have a locality
-   *                           preferences. This includes running, pending, and completed tasks.
+   * @param resourceProfileToNumExecutors The total number of executors we'd like to have per
+   *                                      ResourceProfile. The cluster manager shouldn't kill any
+   *                                      running executor to reach this number, but, if all
+   *                                      existing executors were to die, this is the number
+   *                                      of executors we'd want to be allocated.
+   * @param numLocalityAwareTasksPerResourceProfileId The number of tasks in all active stages that
+   *                                                  have a locality preferences per
+   *                                                  ResourceProfile. This includes running,
+   *                                                  pending, and completed tasks.
    * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
    *                             that would like to like to run on that host.
    *                             This includes running, pending, and completed tasks.
    * @return whether the request is acknowledged by the cluster manager.
    */
   final override def requestTotalExecutors(
-      numExecutors: Int,
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int]
-    ): Boolean = {
-    if (numExecutors < 0) {
+      resourceProfileIdToNumExecutors: Map[Int, Int],
+      numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
+      hostToLocalTaskCount: Map[Int, Map[String, Int]]
+  ): Boolean = {
+    val totalExecs = resourceProfileIdToNumExecutors.values.sum
+    if (totalExecs < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of executor(s) " +
-          s"$numExecutors from the cluster manager. Please specify a positive number!")
+          s"$totalExecs from the cluster manager. Please specify a positive number!")
+    }
+    val resourceProfileToNumExecutors = resourceProfileIdToNumExecutors.map { case (rpid, num) =>
+      (scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
     }
-
     val response = synchronized {
-      this.requestedTotalExecutors = numExecutors
-      this.localityAwareTasks = localityAwareTasks
-      this.hostToLocalTaskCount = hostToLocalTaskCount
-
-      numPendingExecutors =
-        math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
-
-      doRequestTotalExecutors(numExecutors)
+      this.requestedTotalExecutorsPerResourceProfile.clear()
+      this.requestedTotalExecutorsPerResourceProfile ++= resourceProfileToNumExecutors
+      this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
+      this.rpHostToLocalTaskCount = hostToLocalTaskCount
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
     }
-
     defaultAskTimeout.awaitResult(response)
   }
 
@@ -665,7 +642,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    *
    * @return a future whose evaluation indicates whether the request is acknowledged.
    */
-  protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
+  protected def doRequestTotalExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] =
     Future.successful(false)
 
   /**
@@ -706,20 +684,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       // take into account executors that are pending to be added or removed.
       val adjustTotalExecutors =
         if (adjustTargetNumExecutors) {
-          requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
-          if (requestedTotalExecutors !=
-              (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
-            logDebug(
-              s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force):
-                 |Executor counts do not match:
-                 |requestedTotalExecutors  = $requestedTotalExecutors
-                 |numExistingExecutors     = $numExistingExecutors
-                 |numPendingExecutors      = $numPendingExecutors
-                 |executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
+          executorsToKill.foreach { exec =>
+            val rpId = executorDataMap(exec).resourceProfileId
+            val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+            if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+              // Assume that we are killing an executor that was started by default and
+              // not through the request api
+              requestedTotalExecutorsPerResourceProfile(rp) = 0
+            } else {
+              val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
+              requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
+            }
           }
-          doRequestTotalExecutors(requestedTotalExecutors)
+          doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
         } else {
-          numPendingExecutors += executorsToKill.size
           Future.successful(true)
         }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index a9b607d..d91d78b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientL
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
-import org.apache.spark.resource.ResourceUtils
+import org.apache.spark.resource.{ResourceProfile, ResourceUtils}
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
@@ -58,6 +58,7 @@ private[spark] class StandaloneSchedulerBackend(
 
   private val maxCores = conf.get(config.CORES_MAX)
   private val totalExpectedCores = maxCores.getOrElse(0)
+  private val defaultProf = sc.resourceProfileManager.defaultResourceProfile
 
   override def start(): Unit = {
     super.start()
@@ -194,9 +195,13 @@ private[spark] class StandaloneSchedulerBackend(
    *
    * @return whether the request is acknowledged.
    */
-  protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+  protected override def doRequestTotalExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
+    // resources profiles not supported
     Option(client) match {
-      case Some(c) => c.requestTotalExecutors(requestedTotal)
+      case Some(c) =>
+        val numExecs = resourceProfileToTotalExecs.getOrElse(defaultProf, 0)
+        c.requestTotalExecutors(numExecs)
       case None =>
         logWarning("Attempted to request executors before driver fully initialized.")
         Future.successful(false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index a24f190..c29546b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -70,7 +70,7 @@ private[spark] class ExecutorMonitor(
   // this listener. There are safeguards in other parts of the code that would prevent that executor
   // from being removed.
   private val nextTimeout = new AtomicLong(Long.MaxValue)
-  private var timedOutExecs = Seq.empty[String]
+  private var timedOutExecs = Seq.empty[(String, Int)]
 
   // Active job tracking.
   //
@@ -100,10 +100,10 @@ private[spark] class ExecutorMonitor(
   }
 
   /**
-   * Returns the list of executors that are currently considered to be timed out.
-   * Should only be called from the EAM thread.
+   * Returns the list of executors and their ResourceProfile id that are currently considered to
+   * be timed out. Should only be called from the EAM thread.
    */
-  def timedOutExecutors(): Seq[String] = {
+  def timedOutExecutors(): Seq[(String, Int)] = {
     val now = clock.nanoTime()
     if (now >= nextTimeout.get()) {
       // Temporarily set the next timeout at Long.MaxValue. This ensures that after
@@ -126,7 +126,7 @@ private[spark] class ExecutorMonitor(
             true
           }
         }
-        .keys
+        .map { case (name, exec) => (name, exec.resourceProfileId)}
         .toSeq
       updateNextTimeout(newNextTimeout)
     }
@@ -155,6 +155,7 @@ private[spark] class ExecutorMonitor(
     execResourceProfileCount.getOrDefault(id, 0)
   }
 
+  // for testing
   def getResourceProfileId(executorId: String): Int = {
     val execTrackingInfo = executors.get(executorId)
     if (execTrackingInfo != null) {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 4d89c4f..5382473 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
@@ -662,7 +662,8 @@ private[spark] object JsonProtocol {
     val stageInfos = jsonOption(json \ "Stage Infos")
       .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
         stageIds.map { id =>
-          new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")
+          new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown",
+            resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
         }
       }
     SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
@@ -803,7 +804,8 @@ private[spark] object JsonProtocol {
     }
 
     val stageInfo = new StageInfo(
-      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
+      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details,
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 8d95849..8fa33f4 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, ResourceProfileManager, TaskResourceRequests}
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -45,6 +46,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
   private val managers = new mutable.ListBuffer[ExecutorAllocationManager]()
   private var listenerBus: LiveListenerBus = _
   private var client: ExecutorAllocationClient = _
+  private val clock = new SystemClock()
+  private var rpManager: ResourceProfileManager = _
+
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -108,65 +112,257 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
 
   test("starting state") {
     val manager = createManager(createConf())
-    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
     assert(executorsPendingToRemove(manager).isEmpty)
     assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
   }
 
-  test("add executors") {
+  test("add executors default profile") {
     val manager = createManager(createConf(1, 10, 1))
     post(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    // Keep adding until the limit is reached
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 4)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 8)
+    // reached the limit of 10
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+
+    // Register previously requested executors
+    onExecutorAddedDefaultProfile(manager, "first")
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    onExecutorAddedDefaultProfile(manager, "second")
+    onExecutorAddedDefaultProfile(manager, "third")
+    onExecutorAddedDefaultProfile(manager, "fourth")
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    onExecutorAddedDefaultProfile(manager, "first") // duplicates should not count
+    onExecutorAddedDefaultProfile(manager, "second")
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+
+    // Try adding again
+    // This should still fail because the number pending + running is still at the limit
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+  }
+
+  test("add executors multiple profiles") {
+    val manager = createManager(createConf(1, 10, 1))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 1000, rp = defaultProfile)))
+    val rp1 = new ResourceProfileBuilder()
+    val execReqs = new ExecutorResourceRequests().cores(4).resource("gpu", 4)
+    val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+    rp1.require(execReqs).require(taskReqs)
+    val rprof1 = rp1.build
+    rpManager.addResourceProfile(rprof1)
+    post(SparkListenerStageSubmitted(createStageInfo(1, 1000, rp = rprof1)))
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
     // Keep adding until the limit is reached
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 2)
-    assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 4)
-    assert(numExecutorsToAdd(manager) === 4)
-    assert(addExecutors(manager) === 4)
-    assert(numExecutorsTarget(manager) === 8)
-    assert(numExecutorsToAdd(manager) === 8)
-    assert(addExecutors(manager) === 2) // reached the limit of 10
-    assert(numExecutorsTarget(manager) === 10)
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsTarget(manager) === 10)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    assert(numExecutorsToAdd(manager, rprof1) === 1)
+    assert(numExecutorsTarget(manager, rprof1.id) === 1)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    assert(numExecutorsToAdd(manager, rprof1) === 2)
+    assert(numExecutorsTarget(manager, rprof1.id) === 2)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 4)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    assert(numExecutorsToAdd(manager, rprof1) === 4)
+    assert(numExecutorsTarget(manager, rprof1.id) === 4)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 8)
+    // reached the limit of 10
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    assert(numExecutorsToAdd(manager, rprof1) === 8)
+    assert(numExecutorsTarget(manager, rprof1.id) === 8)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    assert(numExecutorsToAdd(manager, rprof1) === 1)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(numExecutorsToAdd(manager, rprof1) === 1)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
 
     // Register previously requested executors
-    onExecutorAdded(manager, "first")
-    assert(numExecutorsTarget(manager) === 10)
-    onExecutorAdded(manager, "second")
-    onExecutorAdded(manager, "third")
-    onExecutorAdded(manager, "fourth")
-    assert(numExecutorsTarget(manager) === 10)
-    onExecutorAdded(manager, "first") // duplicates should not count
-    onExecutorAdded(manager, "second")
-    assert(numExecutorsTarget(manager) === 10)
+    onExecutorAddedDefaultProfile(manager, "first")
+    onExecutorAdded(manager, "firstrp1", rprof1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
+    onExecutorAddedDefaultProfile(manager, "second")
+    onExecutorAddedDefaultProfile(manager, "third")
+    onExecutorAddedDefaultProfile(manager, "fourth")
+    onExecutorAdded(manager, "secondrp1", rprof1)
+    onExecutorAdded(manager, "thirdrp1", rprof1)
+    onExecutorAdded(manager, "fourthrp1", rprof1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
+    onExecutorAddedDefaultProfile(manager, "first") // duplicates should not count
+    onExecutorAddedDefaultProfile(manager, "second")
+    onExecutorAdded(manager, "firstrp1", rprof1)
+    onExecutorAdded(manager, "secondrp1", rprof1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
 
     // Try adding again
     // This should still fail because the number pending + running is still at the limit
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsTarget(manager) === 10)
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsTarget(manager) === 10)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(numExecutorsToAdd(manager, rprof1) === 1)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    assert(addExecutorsToTarget(manager, updatesNeeded, rprof1) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(numExecutorsToAdd(manager, rprof1) === 1)
+    assert(numExecutorsTarget(manager, rprof1.id) === 10)
+  }
+
+  test("remove executors multiple profiles") {
+    val manager = createManager(createConf(5, 10, 5))
+    val rp1 = new ResourceProfileBuilder()
+    val execReqs = new ExecutorResourceRequests().cores(4).resource("gpu", 4)
+    val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+    rp1.require(execReqs).require(taskReqs)
+    val rprof1 = rp1.build
+    val rp2 = new ResourceProfileBuilder()
+    val execReqs2 = new ExecutorResourceRequests().cores(1)
+    val taskReqs2 = new TaskResourceRequests().cpus(1)
+    rp2.require(execReqs2).require(taskReqs2)
+    val rprof2 = rp2.build
+    rpManager.addResourceProfile(rprof1)
+    rpManager.addResourceProfile(rprof2)
+    post(SparkListenerStageSubmitted(createStageInfo(1, 10, rp = rprof1)))
+    post(SparkListenerStageSubmitted(createStageInfo(2, 10, rp = rprof2)))
+
+    (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id, rprof1) }
+    (11 to 20).map(_.toString).foreach { id => onExecutorAdded(manager, id, rprof2) }
+    (21 to 30).map(_.toString).foreach { id => onExecutorAdded(manager, id, defaultProfile) }
+
+    // Keep removing until the limit is reached
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(removeExecutor(manager, "1", rprof1.id))
+    assert(executorsPendingToRemove(manager).size === 1)
+    assert(executorsPendingToRemove(manager).contains("1"))
+    assert(removeExecutor(manager, "11", rprof2.id))
+    assert(removeExecutor(manager, "2", rprof1.id))
+    assert(executorsPendingToRemove(manager).size === 3)
+    assert(executorsPendingToRemove(manager).contains("2"))
+    assert(executorsPendingToRemove(manager).contains("11"))
+    assert(removeExecutor(manager, "21", defaultProfile.id))
+    assert(removeExecutor(manager, "3", rprof1.id))
+    assert(removeExecutor(manager, "4", rprof1.id))
+    assert(executorsPendingToRemove(manager).size === 6)
+    assert(executorsPendingToRemove(manager).contains("21"))
+    assert(executorsPendingToRemove(manager).contains("3"))
+    assert(executorsPendingToRemove(manager).contains("4"))
+    assert(removeExecutor(manager, "5", rprof1.id))
+    assert(!removeExecutor(manager, "6", rprof1.id)) // reached the limit of 5
+    assert(executorsPendingToRemove(manager).size === 7)
+    assert(executorsPendingToRemove(manager).contains("5"))
+    assert(!executorsPendingToRemove(manager).contains("6"))
+
+    // Kill executors previously requested to remove
+    onExecutorRemoved(manager, "1")
+    assert(executorsPendingToRemove(manager).size === 6)
+    assert(!executorsPendingToRemove(manager).contains("1"))
+    onExecutorRemoved(manager, "2")
+    onExecutorRemoved(manager, "3")
+    assert(executorsPendingToRemove(manager).size === 4)
+    assert(!executorsPendingToRemove(manager).contains("2"))
+    assert(!executorsPendingToRemove(manager).contains("3"))
+    onExecutorRemoved(manager, "2") // duplicates should not count
+    onExecutorRemoved(manager, "3")
+    assert(executorsPendingToRemove(manager).size === 4)
+    onExecutorRemoved(manager, "4")
+    onExecutorRemoved(manager, "5")
+    assert(executorsPendingToRemove(manager).size === 2)
+    assert(executorsPendingToRemove(manager).contains("11"))
+    assert(executorsPendingToRemove(manager).contains("21"))
+
+    // Try removing again
+    // This should still fail because the number pending + running is still at the limit
+    assert(!removeExecutor(manager, "7", rprof1.id))
+    assert(executorsPendingToRemove(manager).size === 2)
+    assert(!removeExecutor(manager, "8", rprof1.id))
+    assert(executorsPendingToRemove(manager).size === 2)
+
+    // make sure rprof2 has the same min limit or 5
+    assert(removeExecutor(manager, "12", rprof2.id))
+    assert(removeExecutor(manager, "13", rprof2.id))
+    assert(removeExecutor(manager, "14", rprof2.id))
+    assert(removeExecutor(manager, "15", rprof2.id))
+    assert(!removeExecutor(manager, "16", rprof2.id)) // reached the limit of 5
+    assert(executorsPendingToRemove(manager).size === 6)
+    assert(!executorsPendingToRemove(manager).contains("16"))
+    onExecutorRemoved(manager, "11")
+    onExecutorRemoved(manager, "12")
+    onExecutorRemoved(manager, "13")
+    onExecutorRemoved(manager, "14")
+    onExecutorRemoved(manager, "15")
+    assert(executorsPendingToRemove(manager).size === 1)
   }
 
   def testAllocationRatio(cores: Int, divisor: Double, expected: Int): Unit = {
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
     val conf = createConf(3, 15)
       .set(config.DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO, divisor)
       .set(config.EXECUTOR_CORES, cores)
     val manager = createManager(conf)
     post(SparkListenerStageSubmitted(createStageInfo(0, 20)))
     for (i <- 0 to 5) {
-      addExecutors(manager)
+      addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+      doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
     }
-    assert(numExecutorsTarget(manager) === expected)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === expected)
   }
 
   test("executionAllocationRatio is correctly handled") {
@@ -185,127 +381,158 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val manager = createManager(createConf(0, 10, 0))
     post(SparkListenerStageSubmitted(createStageInfo(0, 5)))
 
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
     // Verify that we're capped at number of tasks in the stage
-    assert(numExecutorsTarget(manager) === 0)
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 3)
-    assert(numExecutorsToAdd(manager) === 4)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 5)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
 
     // Verify that running a task doesn't affect the target
     post(SparkListenerStageSubmitted(createStageInfo(1, 3)))
     post(SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
-    assert(numExecutorsTarget(manager) === 5)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 6)
-    assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 8)
-    assert(numExecutorsToAdd(manager) === 4)
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsTarget(manager) === 8)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 6)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
 
     // Verify that re-running a task doesn't blow things up
     post(SparkListenerStageSubmitted(createStageInfo(2, 3)))
     post(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
     post(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 9)
-    assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 10)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 9)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
 
     // Verify that running a task once we're at our limit doesn't blow things up
     post(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsTarget(manager) === 10)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
   }
 
   test("add executors when speculative tasks added") {
     val manager = createManager(createConf(0, 10, 0))
 
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
     // Verify that we're capped at number of tasks including the speculative ones in the stage
     post(SparkListenerSpeculativeTaskSubmitted(1))
-    assert(numExecutorsTarget(manager) === 0)
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
     post(SparkListenerSpeculativeTaskSubmitted(1))
     post(SparkListenerSpeculativeTaskSubmitted(1))
-    post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 3)
-    assert(numExecutorsToAdd(manager) === 4)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 5)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
 
     // Verify that running a task doesn't affect the target
     post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
-    assert(numExecutorsTarget(manager) === 5)
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
 
     // Verify that running a speculative task doesn't affect the target
     post(SparkListenerTaskStart(1, 0, createTaskInfo(1, 0, "executor-2", true)))
-    assert(numExecutorsTarget(manager) === 5)
-    assert(addExecutors(manager) === 0)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
   }
 
   test("SPARK-30511 remove executors when speculative tasks end") {
     val clock = new ManualClock()
     val stage = createStageInfo(0, 40)
-    val manager = createManager(createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4), clock = clock)
+    val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
 
     post(SparkListenerStageSubmitted(stage))
-    assert(addExecutors(manager) === 1)
-    assert(addExecutors(manager) === 2)
-    assert(addExecutors(manager) === 4)
-    assert(addExecutors(manager) === 3)
-
-    (0 to 9).foreach(execId => onExecutorAdded(manager, execId.toString))
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+
+    (0 to 9).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
     (0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
       info => post(SparkListenerTaskStart(0, 0, info))
     }
-    assert(numExecutorsTarget(manager) === 10)
-    assert(maxNumExecutorsNeeded(manager) == 10)
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 10)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 10)
 
     // 30 tasks (0 - 29) finished
     (0 to 29).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
       info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) }
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 3)
-    assert(maxNumExecutorsNeeded(manager) == 3)
-    (0 to 6).foreach { i => assert(removeExecutor(manager, i.toString))}
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3)
+    (0 to 6).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString))}
     (0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}
 
     // 10 speculative tasks (30 - 39) launch for the remaining tasks
     (30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
-    assert(addExecutors(manager) === 1)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) == 5)
-    assert(maxNumExecutorsNeeded(manager) == 5)
-    (10 to 12).foreach(execId => onExecutorAdded(manager, execId.toString))
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTarget(manager, defaultProfile.id) == 5)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
+    (10 to 12).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
     (40 to 49).map { i =>
       createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", speculative = true)}
       .foreach { info => post(SparkListenerTaskStart(0, 0, info))}
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) == 5) // At this point, we still have 6 executors running
-    assert(maxNumExecutorsNeeded(manager) == 5)
+    // At this point, we still have 6 executors running
+    assert(numExecutorsTarget(manager, defaultProfile.id) == 5)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
 
     // 6 speculative tasks (40 - 45) finish before the original tasks, with 4 speculative remaining
     (40 to 45).map { i =>
@@ -314,9 +541,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
         info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))}
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 4)
-    assert(maxNumExecutorsNeeded(manager) == 4)
-    assert(removeExecutor(manager, "10"))
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 4)
+    assert(removeExecutorDefaultProfile(manager, "10"))
     onExecutorRemoved(manager, "10")
     // At this point, we still have 5 executors running: ["7", "8", "9", "11", "12"]
 
@@ -327,9 +554,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
         SparkListenerTaskEnd(0, 0, null, TaskKilled("test"), info, new ExecutorMetrics, null))}
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 2)
-    assert(maxNumExecutorsNeeded(manager) == 2)
-    (7 to 8).foreach { i => assert(removeExecutor(manager, i.toString))}
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
+    (7 to 8).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString))}
     (7 to 8).foreach { i => onExecutorRemoved(manager, i.toString)}
     // At this point, we still have 3 executors running: ["9", "11", "12"]
 
@@ -343,8 +570,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     // tasks running. Target lowers to 2, but still hold 3 executors ["9", "11", "12"]
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 2)
-    assert(maxNumExecutorsNeeded(manager) == 2)
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
     // At this point, we still have 3 executors running: ["9", "11", "12"]
 
     // Task 37 and 47 succeed at the same time
@@ -357,9 +584,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     // tasks running
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 1)
-    assert(maxNumExecutorsNeeded(manager) == 1)
-    assert(removeExecutor(manager, "11"))
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
+    assert(removeExecutorDefaultProfile(manager, "11"))
     onExecutorRemoved(manager, "11")
     // At this point, we still have 2 executors running: ["9", "12"]
 
@@ -372,14 +599,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
     // maxNeeded = 1, allocate one more to satisfy speculation locality requirement
-    assert(numExecutorsTarget(manager) === 2)
-    assert(maxNumExecutorsNeeded(manager) == 2)
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
     post(SparkListenerTaskStart(0, 0,
       createTaskInfo(50, 39, executorId = "12", speculative = true)))
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 1)
-    assert(maxNumExecutorsNeeded(manager) == 1)
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
 
     // Task 39 and 48 succeed, task 50 killed
     post(SparkListenerTaskEnd(0, 0, null, Success,
@@ -391,11 +618,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     post(SparkListenerStageCompleted(stage))
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 0)
-    assert(maxNumExecutorsNeeded(manager) == 0)
-    assert(removeExecutor(manager, "9"))
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 0)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 0)
+    assert(removeExecutorDefaultProfile(manager, "9"))
     onExecutorRemoved(manager, "9")
-    assert(removeExecutor(manager, "12"))
+    assert(removeExecutorDefaultProfile(manager, "12"))
     onExecutorRemoved(manager, "12")
   }
 
@@ -417,43 +644,49 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     post(SparkListenerStageCompleted(stage))
 
     // There are still two tasks that belong to the zombie stage running.
-    assert(totalRunningTasks(manager) === 2)
+    assert(totalRunningTasksPerResourceProfile(manager) === 2)
 
     // submit another attempt for the stage.  We count completions from the first zombie attempt
     val stageAttempt1 = createStageInfo(stage.stageId, 5, attemptId = 1)
     post(SparkListenerStageSubmitted(stageAttempt1))
     post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, new ExecutorMetrics, null))
-    assert(totalRunningTasks(manager) === 1)
+    assert(totalRunningTasksPerResourceProfile(manager) === 1)
     val attemptTaskInfo1 = createTaskInfo(3, 0, "executor-1")
     val attemptTaskInfo2 = createTaskInfo(4, 1, "executor-1")
     post(SparkListenerTaskStart(0, 1, attemptTaskInfo1))
     post(SparkListenerTaskStart(0, 1, attemptTaskInfo2))
-    assert(totalRunningTasks(manager) === 3)
+    assert(totalRunningTasksPerResourceProfile(manager) === 3)
     post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo1, new ExecutorMetrics, null))
-    assert(totalRunningTasks(manager) === 2)
+    assert(totalRunningTasksPerResourceProfile(manager) === 2)
     post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo2, new ExecutorMetrics, null))
-    assert(totalRunningTasks(manager) === 1)
+    assert(totalRunningTasksPerResourceProfile(manager) === 1)
     post(SparkListenerTaskEnd(0, 1, null, Success, attemptTaskInfo2, new ExecutorMetrics, null))
-    assert(totalRunningTasks(manager) === 0)
+    assert(totalRunningTasksPerResourceProfile(manager) === 0)
   }
 
   testRetry("cancel pending executors when no longer needed") {
     val manager = createManager(createConf(0, 10, 0))
     post(SparkListenerStageSubmitted(createStageInfo(2, 5)))
 
-    assert(numExecutorsTarget(manager) === 0)
-    assert(numExecutorsToAdd(manager) === 1)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 2)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 3)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
 
     val task1Info = createTaskInfo(0, 0, "executor-1")
     post(SparkListenerTaskStart(2, 0, task1Info))
 
-    assert(numExecutorsToAdd(manager) === 4)
-    assert(addExecutors(manager) === 2)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
 
     val task2Info = createTaskInfo(1, 0, "executor-1")
     post(SparkListenerTaskStart(2, 0, task2Info))
@@ -469,22 +702,21 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
 
   test("remove executors") {
     val manager = createManager(createConf(5, 10, 5))
-    (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+    (1 to 10).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
 
     // Keep removing until the limit is reached
     assert(executorsPendingToRemove(manager).isEmpty)
-    assert(removeExecutor(manager, "1"))
+    assert(removeExecutorDefaultProfile(manager, "1"))
     assert(executorsPendingToRemove(manager).size === 1)
     assert(executorsPendingToRemove(manager).contains("1"))
-    assert(removeExecutor(manager, "2"))
-    assert(removeExecutor(manager, "3"))
+    assert(removeExecutorDefaultProfile(manager, "2"))
+    assert(removeExecutorDefaultProfile(manager, "3"))
     assert(executorsPendingToRemove(manager).size === 3)
     assert(executorsPendingToRemove(manager).contains("2"))
     assert(executorsPendingToRemove(manager).contains("3"))
-    assert(executorsPendingToRemove(manager).size === 3)
-    assert(removeExecutor(manager, "4"))
-    assert(removeExecutor(manager, "5"))
-    assert(!removeExecutor(manager, "6")) // reached the limit of 5
+    assert(removeExecutorDefaultProfile(manager, "4"))
+    assert(removeExecutorDefaultProfile(manager, "5"))
+    assert(!removeExecutorDefaultProfile(manager, "6")) // reached the limit of 5
     assert(executorsPendingToRemove(manager).size === 5)
     assert(executorsPendingToRemove(manager).contains("4"))
     assert(executorsPendingToRemove(manager).contains("5"))
@@ -508,29 +740,29 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
 
     // Try removing again
     // This should still fail because the number pending + running is still at the limit
-    assert(!removeExecutor(manager, "7"))
+    assert(!removeExecutorDefaultProfile(manager, "7"))
     assert(executorsPendingToRemove(manager).isEmpty)
-    assert(!removeExecutor(manager, "8"))
+    assert(!removeExecutorDefaultProfile(manager, "8"))
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
   test("remove multiple executors") {
     val manager = createManager(createConf(5, 10, 5))
-    (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
+    (1 to 10).map(_.toString).foreach { id => onExecutorAddedDefaultProfile(manager, id) }
 
     // Keep removing until the limit is reached
     assert(executorsPendingToRemove(manager).isEmpty)
-    assert(removeExecutors(manager, Seq("1")) === Seq("1"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("1")) === Seq("1"))
     assert(executorsPendingToRemove(manager).size === 1)
     assert(executorsPendingToRemove(manager).contains("1"))
-    assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("2", "3")) === Seq("2", "3"))
     assert(executorsPendingToRemove(manager).size === 3)
     assert(executorsPendingToRemove(manager).contains("2"))
     assert(executorsPendingToRemove(manager).contains("3"))
     assert(executorsPendingToRemove(manager).size === 3)
-    assert(removeExecutor(manager, "4"))
-    assert(removeExecutors(manager, Seq("5")) === Seq("5"))
-    assert(!removeExecutor(manager, "6")) // reached the limit of 5
+    assert(removeExecutorDefaultProfile(manager, "4"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("5")) === Seq("5"))
+    assert(!removeExecutorDefaultProfile(manager, "6")) // reached the limit of 5
     assert(executorsPendingToRemove(manager).size === 5)
     assert(executorsPendingToRemove(manager).contains("4"))
     assert(executorsPendingToRemove(manager).contains("5"))
@@ -554,87 +786,100 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
 
     // Try removing again
     // This should still fail because the number pending + running is still at the limit
-    assert(!removeExecutor(manager, "7"))
+    assert(!removeExecutorDefaultProfile(manager, "7"))
     assert(executorsPendingToRemove(manager).isEmpty)
-    assert(removeExecutors(manager, Seq("8")) !== Seq("8"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("8")) !== Seq("8"))
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
-  test ("Removing with various numExecutorsTarget condition") {
+  test ("Removing with various numExecutorsTargetForDefaultProfileId condition") {
     val manager = createManager(createConf(5, 12, 5))
 
     post(SparkListenerStageSubmitted(createStageInfo(0, 8)))
 
-    // Remove when numExecutorsTarget is the same as the current number of executors
-    assert(addExecutors(manager) === 1)
-    assert(addExecutors(manager) === 2)
-    (1 to 8).foreach(execId => onExecutorAdded(manager, execId.toString))
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    // Remove when numExecutorsTargetForDefaultProfileId is the same as the current
+    // number of executors
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    (1 to 8).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
     (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
       info => post(SparkListenerTaskStart(0, 0, info)) }
     assert(manager.executorMonitor.executorCount === 8)
-    assert(numExecutorsTarget(manager) === 8)
-    assert(maxNumExecutorsNeeded(manager) == 8)
-    assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 8)
+    // won't work since numExecutorsTargetForDefaultProfileId == numExecutors
+    assert(!removeExecutorDefaultProfile(manager, "1"))
 
-    // Remove executors when numExecutorsTarget is lower than current number of executors
+    // Remove executors when numExecutorsTargetForDefaultProfileId is lower than
+    // current number of executors
     (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { info =>
       post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))
     }
     adjustRequestedExecutors(manager)
     assert(manager.executorMonitor.executorCount === 8)
-    assert(numExecutorsTarget(manager) === 5)
-    assert(maxNumExecutorsNeeded(manager) == 5)
-    assert(removeExecutor(manager, "1"))
-    assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3"))
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
+    assert(removeExecutorDefaultProfile(manager, "1"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("2", "3"))=== Seq("2", "3"))
     onExecutorRemoved(manager, "1")
     onExecutorRemoved(manager, "2")
     onExecutorRemoved(manager, "3")
 
-    // numExecutorsTarget is lower than minNumExecutors
+    // numExecutorsTargetForDefaultProfileId is lower than minNumExecutors
     post(SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"),
       new ExecutorMetrics, null))
     assert(manager.executorMonitor.executorCount === 5)
-    assert(numExecutorsTarget(manager) === 5)
-    assert(maxNumExecutorsNeeded(manager) == 4)
-    assert(!removeExecutor(manager, "4")) // lower limit
-    assert(addExecutors(manager) === 0) // upper limit
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 4)
+    assert(!removeExecutorDefaultProfile(manager, "4")) // lower limit
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) // upper limit
   }
 
   test ("interleaving add and remove") {
     val manager = createManager(createConf(5, 12, 5))
     post(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
     // Add a few executors
-    assert(addExecutors(manager) === 1)
-    assert(addExecutors(manager) === 2)
-    onExecutorAdded(manager, "1")
-    onExecutorAdded(manager, "2")
-    onExecutorAdded(manager, "3")
-    onExecutorAdded(manager, "4")
-    onExecutorAdded(manager, "5")
-    onExecutorAdded(manager, "6")
-    onExecutorAdded(manager, "7")
-    onExecutorAdded(manager, "8")
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    onExecutorAddedDefaultProfile(manager, "1")
+    onExecutorAddedDefaultProfile(manager, "2")
+    onExecutorAddedDefaultProfile(manager, "3")
+    onExecutorAddedDefaultProfile(manager, "4")
+    onExecutorAddedDefaultProfile(manager, "5")
+    onExecutorAddedDefaultProfile(manager, "6")
+    onExecutorAddedDefaultProfile(manager, "7")
+    onExecutorAddedDefaultProfile(manager, "8")
     assert(manager.executorMonitor.executorCount === 8)
-    assert(numExecutorsTarget(manager) === 8)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
 
 
     // Remove when numTargetExecutors is equal to the current number of executors
-    assert(!removeExecutor(manager, "1"))
-    assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3"))
+    assert(!removeExecutorDefaultProfile(manager, "1"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("2", "3")) !== Seq("2", "3"))
 
     // Remove until limit
-    onExecutorAdded(manager, "9")
-    onExecutorAdded(manager, "10")
-    onExecutorAdded(manager, "11")
-    onExecutorAdded(manager, "12")
+    onExecutorAddedDefaultProfile(manager, "9")
+    onExecutorAddedDefaultProfile(manager, "10")
+    onExecutorAddedDefaultProfile(manager, "11")
+    onExecutorAddedDefaultProfile(manager, "12")
     assert(manager.executorMonitor.executorCount === 12)
-    assert(numExecutorsTarget(manager) === 8)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 8)
 
-    assert(removeExecutor(manager, "1"))
-    assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
-    assert(!removeExecutor(manager, "5")) // lower limit reached
-    assert(!removeExecutor(manager, "6"))
+    assert(removeExecutorDefaultProfile(manager, "1"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
+    assert(!removeExecutorDefaultProfile(manager, "5")) // lower limit reached
+    assert(!removeExecutorDefaultProfile(manager, "6"))
     onExecutorRemoved(manager, "1")
     onExecutorRemoved(manager, "2")
     onExecutorRemoved(manager, "3")
@@ -642,33 +887,36 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     assert(manager.executorMonitor.executorCount === 8)
 
     // Add until limit
-    assert(!removeExecutor(manager, "7")) // still at lower limit
+    assert(!removeExecutorDefaultProfile(manager, "7")) // still at lower limit
     assert((manager, Seq("8")) !== Seq("8"))
-    onExecutorAdded(manager, "13")
-    onExecutorAdded(manager, "14")
-    onExecutorAdded(manager, "15")
-    onExecutorAdded(manager, "16")
+    onExecutorAddedDefaultProfile(manager, "13")
+    onExecutorAddedDefaultProfile(manager, "14")
+    onExecutorAddedDefaultProfile(manager, "15")
+    onExecutorAddedDefaultProfile(manager, "16")
     assert(manager.executorMonitor.executorCount === 12)
 
     // Remove succeeds again, now that we are no longer at the lower limit
-    assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
-    assert(removeExecutor(manager, "8"))
+    assert(removeExecutorsDefaultProfile(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
+    assert(removeExecutorDefaultProfile(manager, "8"))
     assert(manager.executorMonitor.executorCount === 12)
     onExecutorRemoved(manager, "5")
     onExecutorRemoved(manager, "6")
     assert(manager.executorMonitor.executorCount === 10)
-    assert(numExecutorsToAdd(manager) === 4)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 4)
     onExecutorRemoved(manager, "9")
     onExecutorRemoved(manager, "10")
-    assert(addExecutors(manager) === 4) // at upper limit
-    onExecutorAdded(manager, "17")
-    onExecutorAdded(manager, "18")
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4) // at upper limit
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    onExecutorAddedDefaultProfile(manager, "17")
+    onExecutorAddedDefaultProfile(manager, "18")
     assert(manager.executorMonitor.executorCount === 10)
-    assert(addExecutors(manager) === 0) // still at upper limit
-    onExecutorAdded(manager, "19")
-    onExecutorAdded(manager, "20")
+    // still at upper limit
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    onExecutorAddedDefaultProfile(manager, "19")
+    onExecutorAddedDefaultProfile(manager, "20")
     assert(manager.executorMonitor.executorCount === 12)
-    assert(numExecutorsTarget(manager) === 12)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 12)
   }
 
   test("starting/canceling add timer") {
@@ -706,22 +954,22 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val manager = createManager(createConf(0, 20, 0), clock = clock)
 
     // No events - we should not be adding or removing
-    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     clock.advance(100L)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     clock.advance(1000L)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     clock.advance(10000L)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
   }
 
@@ -734,43 +982,43 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     onSchedulerBacklogged(manager)
     clock.advance(schedulerBacklogTimeout * 1000 / 2)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 0) // timer not exceeded yet
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0) // timer not exceeded yet
     clock.advance(schedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 1) // first timer exceeded
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1) // first timer exceeded
     clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 1) // second timer not exceeded yet
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1) // second timer not exceeded yet
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 1 + 2) // second timer exceeded
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1 + 2) // second timer exceeded
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 1 + 2 + 4) // third timer exceeded
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1 + 2 + 4) // third timer exceeded
 
     // Scheduler queue drained
     onSchedulerQueueEmpty(manager)
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 7) // timer is canceled
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 7) // timer is canceled
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 7)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 7)
 
     // Scheduler queue backlogged again
     onSchedulerBacklogged(manager)
     clock.advance(schedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 7 + 1) // timer restarted
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 7 + 1) // timer restarted
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 7 + 1 + 2)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 7 + 1 + 2)
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 7 + 1 + 2 + 4)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 7 + 1 + 2 + 4)
     clock.advance(sustainedSchedulerBacklogTimeout * 1000)
     schedule(manager)
-    assert(numExecutorsTarget(manager) === 20) // limit reached
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 20) // limit reached
   }
 
   test("mock polling loop remove behavior") {
@@ -778,9 +1026,9 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val manager = createManager(createConf(1, 20, 1), clock = clock)
 
     // Remove idle executors on timeout
-    onExecutorAdded(manager, "executor-1")
-    onExecutorAdded(manager, "executor-2")
-    onExecutorAdded(manager, "executor-3")
+    onExecutorAddedDefaultProfile(manager, "executor-1")
+    onExecutorAddedDefaultProfile(manager, "executor-2")
+    onExecutorAddedDefaultProfile(manager, "executor-3")
     assert(executorsPendingToRemove(manager).isEmpty)
 
     // idle threshold not reached yet
@@ -796,10 +1044,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 executor remaining)
 
     // Mark a subset as busy - only idle executors should be removed
-    onExecutorAdded(manager, "executor-4")
-    onExecutorAdded(manager, "executor-5")
-    onExecutorAdded(manager, "executor-6")
-    onExecutorAdded(manager, "executor-7")
+    onExecutorAddedDefaultProfile(manager, "executor-4")
+    onExecutorAddedDefaultProfile(manager, "executor-5")
+    onExecutorAddedDefaultProfile(manager, "executor-6")
+    onExecutorAddedDefaultProfile(manager, "executor-7")
     assert(manager.executorMonitor.executorCount === 7)
     assert(executorsPendingToRemove(manager).size === 2) // 2 pending to be removed
     onExecutorBusy(manager, "executor-4")
@@ -864,23 +1112,31 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val stage1 = createStageInfo(0, 1000)
     post(SparkListenerStageSubmitted(stage1))
 
-    assert(addExecutors(manager) === 1)
-    assert(addExecutors(manager) === 2)
-    assert(addExecutors(manager) === 4)
-    assert(addExecutors(manager) === 8)
-    assert(numExecutorsTarget(manager) === 15)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
+
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 8)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 15)
     (0 until 15).foreach { i =>
-      onExecutorAdded(manager, s"executor-$i")
+      onExecutorAddedDefaultProfile(manager, s"executor-$i")
     }
     assert(manager.executorMonitor.executorCount === 15)
     post(SparkListenerStageCompleted(stage1))
 
     adjustRequestedExecutors(manager)
-    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
 
     post(SparkListenerStageSubmitted(createStageInfo(1, 1000)))
-    addExecutors(manager)
-    assert(numExecutorsTarget(manager) === 16)
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 16)
   }
 
   test("avoid ramp down initial executors until first job is submitted") {
@@ -888,19 +1144,19 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val manager = createManager(createConf(2, 5, 3), clock = clock)
 
     // Verify the initial number of executors
-    assert(numExecutorsTarget(manager) === 3)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
     schedule(manager)
     // Verify whether the initial number of executors is kept with no pending tasks
-    assert(numExecutorsTarget(manager) === 3)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
 
     post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
     clock.advance(100L)
 
-    assert(maxNumExecutorsNeeded(manager) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 2)
     schedule(manager)
 
     // Verify that current number of executors should be ramp down when first job is submitted
-    assert(numExecutorsTarget(manager) === 2)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
   }
 
   test("avoid ramp down initial executors until idle executor is timeout") {
@@ -908,20 +1164,20 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val manager = createManager(createConf(2, 5, 3), clock = clock)
 
     // Verify the initial number of executors
-    assert(numExecutorsTarget(manager) === 3)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
     schedule(manager)
     // Verify the initial number of executors is kept when no pending tasks
-    assert(numExecutorsTarget(manager) === 3)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
     (0 until 3).foreach { i =>
-      onExecutorAdded(manager, s"executor-$i")
+      onExecutorAddedDefaultProfile(manager, s"executor-$i")
     }
 
     clock.advance(executorIdleTimeout * 1000)
 
-    assert(maxNumExecutorsNeeded(manager) === 0)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 0)
     schedule(manager)
-    // Verify executor is timeout,numExecutorsTarget is recalculated
-    assert(numExecutorsTarget(manager) === 2)
+    // Verify executor is timeout,numExecutorsTargetForDefaultProfileId is recalculated
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
   }
 
   test("get pending task number and related locality preference") {
@@ -937,7 +1193,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val stageInfo1 = createStageInfo(1, 5, localityPreferences1)
     post(SparkListenerStageSubmitted(stageInfo1))
 
-    assert(localityAwareTasks(manager) === 3)
+    assert(localityAwareTasksForDefaultProfile(manager) === 3)
+    val hostToLocal = hostToLocalTaskCount(manager)
     assert(hostToLocalTaskCount(manager) ===
       Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2))
 
@@ -949,67 +1206,76 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     val stageInfo2 = createStageInfo(2, 3, localityPreferences2)
     post(SparkListenerStageSubmitted(stageInfo2))
 
-    assert(localityAwareTasks(manager) === 5)
+    assert(localityAwareTasksForDefaultProfile(manager) === 5)
     assert(hostToLocalTaskCount(manager) ===
       Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2))
 
     post(SparkListenerStageCompleted(stageInfo1))
-    assert(localityAwareTasks(manager) === 2)
+    assert(localityAwareTasksForDefaultProfile(manager) === 2)
     assert(hostToLocalTaskCount(manager) ===
       Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
   }
 
-  test("SPARK-8366: maxNumExecutorsNeeded should properly handle failed tasks") {
+  test("SPARK-8366: maxNumExecutorsNeededPerResourceProfile should properly handle failed tasks") {
     val manager = createManager(createConf())
-    assert(maxNumExecutorsNeeded(manager) === 0)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 0)
 
     post(SparkListenerStageSubmitted(createStageInfo(0, 1)))
-    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 1)
 
     val taskInfo = createTaskInfo(1, 1, "executor-1")
     post(SparkListenerTaskStart(0, 0, taskInfo))
-    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 1)
 
     // If the task is failed, we expect it to be resubmitted later.
     val taskEndReason = ExceptionFailure(null, null, null, null, None)
     post(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, new ExecutorMetrics, null))
-    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 1)
   }
 
   test("reset the state of allocation manager") {
     val manager = createManager(createConf())
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
+
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]
 
     // Allocation manager is reset when adding executor requests are sent without reporting back
     // executor added.
     post(SparkListenerStageSubmitted(createStageInfo(0, 10)))
 
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 2)
-    assert(addExecutors(manager) === 2)
-    assert(numExecutorsTarget(manager) === 4)
-    assert(addExecutors(manager) === 1)
-    assert(numExecutorsTarget(manager) === 5)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 4)
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
 
     manager.reset()
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
     assert(manager.executorMonitor.executorCount === 0)
 
     // Allocation manager is reset when executors are added.
     post(SparkListenerStageSubmitted(createStageInfo(0, 10)))
 
-    addExecutors(manager)
-    addExecutors(manager)
-    addExecutors(manager)
-    assert(numExecutorsTarget(manager) === 5)
-
-    onExecutorAdded(manager, "first")
-    onExecutorAdded(manager, "second")
-    onExecutorAdded(manager, "third")
-    onExecutorAdded(manager, "fourth")
-    onExecutorAdded(manager, "fifth")
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+
+    onExecutorAddedDefaultProfile(manager, "first")
+    onExecutorAddedDefaultProfile(manager, "second")
+    onExecutorAddedDefaultProfile(manager, "third")
+    onExecutorAddedDefaultProfile(manager, "fourth")
+    onExecutorAddedDefaultProfile(manager, "fifth")
     assert(manager.executorMonitor.executorCount === 5)
 
     // Cluster manager lost will make all the live executors lost, so here simulate this behavior
@@ -1020,28 +1286,31 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     onExecutorRemoved(manager, "fifth")
 
     manager.reset()
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
     assert(manager.executorMonitor.executorCount === 0)
 
     // Allocation manager is reset when executors are pending to remove
-    addExecutors(manager)
-    addExecutors(manager)
-    addExecutors(manager)
-    assert(numExecutorsTarget(manager) === 5)
-
-    onExecutorAdded(manager, "first")
-    onExecutorAdded(manager, "second")
-    onExecutorAdded(manager, "third")
-    onExecutorAdded(manager, "fourth")
-    onExecutorAdded(manager, "fifth")
-    onExecutorAdded(manager, "sixth")
-    onExecutorAdded(manager, "seventh")
-    onExecutorAdded(manager, "eighth")
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    addExecutorsToTargetForDefaultProfile(manager, updatesNeeded)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 5)
+
+    onExecutorAddedDefaultProfile(manager, "first")
+    onExecutorAddedDefaultProfile(manager, "second")
+    onExecutorAddedDefaultProfile(manager, "third")
+    onExecutorAddedDefaultProfile(manager, "fourth")
+    onExecutorAddedDefaultProfile(manager, "fifth")
+    onExecutorAddedDefaultProfile(manager, "sixth")
+    onExecutorAddedDefaultProfile(manager, "seventh")
+    onExecutorAddedDefaultProfile(manager, "eighth")
     assert(manager.executorMonitor.executorCount === 8)
 
-    removeExecutor(manager, "first")
-    removeExecutors(manager, Seq("second", "third"))
+    removeExecutorDefaultProfile(manager, "first")
+    removeExecutorsDefaultProfile(manager, Seq("second", "third"))
     assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
     assert(manager.executorMonitor.executorCount === 8)
 
@@ -1055,8 +1324,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
 
     manager.reset()
 
-    assert(numExecutorsTarget(manager) === 1)
-    assert(numExecutorsToAdd(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
+    assert(numExecutorsToAddForDefaultProfile(manager) === 1)
     assert(executorsPendingToRemove(manager) === Set.empty)
     assert(manager.executorMonitor.executorCount === 0)
   }
@@ -1067,31 +1336,31 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
       createConf(1, 2, 1).set(config.DYN_ALLOCATION_TESTING, false),
       clock = clock)
 
-    when(client.requestTotalExecutors(meq(2), any(), any())).thenReturn(true)
+    when(client.requestTotalExecutors(any(), any(), any())).thenReturn(true)
     // test setup -- job with 2 tasks, scale up to two executors
-    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
     post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 2)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
     val taskInfo0 = createTaskInfo(0, 0, "executor-1")
     post(SparkListenerTaskStart(0, 0, taskInfo0))
     post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     val taskInfo1 = createTaskInfo(1, 1, "executor-2")
     post(SparkListenerTaskStart(0, 0, taskInfo1))
-    assert(numExecutorsTarget(manager) === 2)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
 
     // have one task finish -- we should adjust the target number of executors down
     // but we should *not* kill any executors yet
     post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo0, new ExecutorMetrics, null))
-    assert(maxNumExecutorsNeeded(manager) === 1)
-    assert(numExecutorsTarget(manager) === 2)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
-    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
     verify(client, never).killExecutors(any(), any(), any(), any())
 
     // now we cross the idle timeout for executor-1, so we kill it.  the really important
@@ -1101,8 +1370,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
       .thenReturn(Seq("executor-1"))
     clock.advance(3000)
     schedule(manager)
-    assert(maxNumExecutorsNeeded(manager) === 1)
-    assert(numExecutorsTarget(manager) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
     // here's the important verify -- we did kill the executors, but did not adjust the target count
     verify(client).killExecutors(Seq("executor-1"), false, false, false)
   }
@@ -1110,7 +1379,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
   test("SPARK-26758 check executor target number after idle time out ") {
     val clock = new ManualClock(10000L)
     val manager = createManager(createConf(1, 5, 3), clock = clock)
-    assert(numExecutorsTarget(manager) === 3)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 3)
     post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
     post(SparkListenerExecutorAdded(
@@ -1121,14 +1390,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     clock.advance(executorIdleTimeout * 1000)
     schedule(manager)
     // once the schedule is run target executor number should be 1
-    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
   }
 
   private def createConf(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
       initialExecutors: Int = 1): SparkConf = {
-    new SparkConf()
+    val sparkConf = new SparkConf()
       .set(config.DYN_ALLOCATION_ENABLED, true)
       .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
       .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
@@ -1143,12 +1412,16 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
       // SPARK-22864: effectively disable the allocation schedule by setting the period to a
       // really long value.
       .set(TEST_SCHEDULE_INTERVAL, 10000L)
+    sparkConf
   }
 
   private def createManager(
       conf: SparkConf,
       clock: Clock = new SystemClock()): ExecutorAllocationManager = {
-    val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock)
+    ResourceProfile.reInitDefaultProfile(conf)
+    rpManager = new ResourceProfileManager(conf)
+    val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock,
+      resourceProfileManager = rpManager)
     managers += manager
     manager.start()
     manager
@@ -1157,7 +1430,18 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
   private val execInfo = new ExecutorInfo("host1", 1, Map.empty,
     Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
 
-  private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
+  private def onExecutorAddedDefaultProfile(
+      manager: ExecutorAllocationManager,
+      id: String): Unit = {
+    post(SparkListenerExecutorAdded(0L, id, execInfo))
+  }
+
+  private def onExecutorAdded(
+      manager: ExecutorAllocationManager,
+      id: String,
+      rp: ResourceProfile): Unit = {
+    val cores = rp.getExecutorCores.getOrElse(1)
+    val execInfo = new ExecutorInfo("host1", cores, Map.empty, Map.empty, Map.empty, rp.id)
     post(SparkListenerExecutorAdded(0L, id, execInfo))
   }
 
@@ -1176,8 +1460,18 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     post(SparkListenerTaskEnd(1, 1, "foo", Success, info, new ExecutorMetrics, null))
   }
 
-  private def removeExecutor(manager: ExecutorAllocationManager, executorId: String): Boolean = {
-    val executorsRemoved = removeExecutors(manager, Seq(executorId))
+  private def removeExecutorDefaultProfile(
+      manager: ExecutorAllocationManager,
+      executorId: String): Boolean = {
+    val executorsRemoved = removeExecutorsDefaultProfile(manager, Seq(executorId))
+    executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+  }
+
+  private def removeExecutor(
+      manager: ExecutorAllocationManager,
+      executorId: String,
+      rpId: Int): Boolean = {
+    val executorsRemoved = removeExecutors(manager, Seq((executorId, rpId)))
     executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
   }
 
@@ -1199,10 +1493,11 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
       stageId: Int,
       numTasks: Int,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
-      attemptId: Int = 0
+      attemptId: Int = 0,
+      rp: ResourceProfile = defaultProfile
     ): StageInfo = {
     new StageInfo(stageId, attemptId, "name", numTasks, Seq.empty, Seq.empty, "no details",
-      taskLocalityPreferences = taskLocalityPreferences)
+      taskLocalityPreferences = taskLocalityPreferences, resourceProfileId = rp.id)
   }
 
   private def createTaskInfo(
@@ -1217,54 +1512,117 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
    | Helper methods for accessing private methods and fields |
    * ------------------------------------------------------- */
 
-  private val _numExecutorsToAdd = PrivateMethod[Int](Symbol("numExecutorsToAdd"))
-  private val _numExecutorsTarget = PrivateMethod[Int](Symbol("numExecutorsTarget"))
-  private val _maxNumExecutorsNeeded = PrivateMethod[Int](Symbol("maxNumExecutorsNeeded"))
+  private val _numExecutorsToAddPerResourceProfileId =
+    PrivateMethod[mutable.HashMap[Int, Int]](
+      Symbol("numExecutorsToAddPerResourceProfileId"))
+  private val _numExecutorsTargetPerResourceProfileId =
+    PrivateMethod[mutable.HashMap[Int, Int]](
+      Symbol("numExecutorsTargetPerResourceProfileId"))
+  private val _maxNumExecutorsNeededPerResourceProfile =
+    PrivateMethod[Int](Symbol("maxNumExecutorsNeededPerResourceProfile"))
   private val _addTime = PrivateMethod[Long](Symbol("addTime"))
   private val _schedule = PrivateMethod[Unit](Symbol("schedule"))
-  private val _addExecutors = PrivateMethod[Int](Symbol("addExecutors"))
+  private val _doUpdateRequest = PrivateMethod[Unit](Symbol("doUpdateRequest"))
   private val _updateAndSyncNumExecutorsTarget =
     PrivateMethod[Int](Symbol("updateAndSyncNumExecutorsTarget"))
+  private val _addExecutorsToTarget = PrivateMethod[Int](Symbol("addExecutorsToTarget"))
   private val _removeExecutors = PrivateMethod[Seq[String]](Symbol("removeExecutors"))
   private val _onSchedulerBacklogged = PrivateMethod[Unit](Symbol("onSchedulerBacklogged"))
   private val _onSchedulerQueueEmpty = PrivateMethod[Unit](Symbol("onSchedulerQueueEmpty"))
-  private val _localityAwareTasks = PrivateMethod[Int](Symbol("localityAwareTasks"))
-  private val _hostToLocalTaskCount =
-    PrivateMethod[Map[String, Int]](Symbol("hostToLocalTaskCount"))
+  private val _localityAwareTasksPerResourceProfileId =
+    PrivateMethod[mutable.HashMap[Int, Int]](Symbol("numLocalityAwareTasksPerResourceProfileId"))
+  private val _rpIdToHostToLocalTaskCount =
+    PrivateMethod[Map[Int, Map[String, Int]]](Symbol("rpIdToHostToLocalTaskCount"))
   private val _onSpeculativeTaskSubmitted =
     PrivateMethod[Unit](Symbol("onSpeculativeTaskSubmitted"))
-  private val _totalRunningTasks = PrivateMethod[Int](Symbol("totalRunningTasks"))
+  private val _totalRunningTasksPerResourceProfile =
+    PrivateMethod[Int](Symbol("totalRunningTasksPerResourceProfile"))
+
+  private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
+
+  private def numExecutorsToAddForDefaultProfile(manager: ExecutorAllocationManager): Int = {
+    numExecutorsToAdd(manager, defaultProfile)
+  }
+
+  private def numExecutorsToAdd(
+      manager: ExecutorAllocationManager,
+      rp: ResourceProfile): Int = {
+    val nmap = manager invokePrivate _numExecutorsToAddPerResourceProfileId()
+    nmap(rp.id)
+  }
+
+  private def updateAndSyncNumExecutorsTarget(
+      manager: ExecutorAllocationManager,
+      now: Long): Unit = {
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(now)
+  }
+
+  private def numExecutorsTargetForDefaultProfileId(manager: ExecutorAllocationManager): Int = {
+    numExecutorsTarget(manager, defaultProfile.id)
+  }
 
-  private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _numExecutorsToAdd()
+  private def numExecutorsTarget(
+      manager: ExecutorAllocationManager,
+      rpId: Int): Int = {
+    val numMap = manager invokePrivate _numExecutorsTargetPerResourceProfileId()
+    numMap(rpId)
   }
 
-  private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _numExecutorsTarget()
+  private def addExecutorsToTargetForDefaultProfile(
+      manager: ExecutorAllocationManager,
+      updatesNeeded: mutable.HashMap[ResourceProfile,
+        ExecutorAllocationManager.TargetNumUpdates]
+  ): Int = {
+    addExecutorsToTarget(manager, updatesNeeded, defaultProfile)
+  }
+
+  private def addExecutorsToTarget(
+      manager: ExecutorAllocationManager,
+      updatesNeeded: mutable.HashMap[ResourceProfile,
+        ExecutorAllocationManager.TargetNumUpdates],
+      rp: ResourceProfile
+  ): Int = {
+    val maxNumExecutorsNeeded =
+      manager invokePrivate _maxNumExecutorsNeededPerResourceProfile(rp.id)
+    manager invokePrivate
+      _addExecutorsToTarget(maxNumExecutorsNeeded, rp.id, updatesNeeded)
   }
 
   private def addTime(manager: ExecutorAllocationManager): Long = {
     manager invokePrivate _addTime()
   }
 
-  private def schedule(manager: ExecutorAllocationManager): Unit = {
-    manager invokePrivate _schedule()
+  private def doUpdateRequest(
+      manager: ExecutorAllocationManager,
+      updates: Map[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates],
+      now: Long): Unit = {
+    manager invokePrivate _doUpdateRequest(updates, now)
   }
 
-  private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _maxNumExecutorsNeeded()
+  private def schedule(manager: ExecutorAllocationManager): Unit = {
+    manager invokePrivate _schedule()
   }
 
-  private def addExecutors(manager: ExecutorAllocationManager): Int = {
-    val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
-    manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
+  private def maxNumExecutorsNeededPerResourceProfile(
+      manager: ExecutorAllocationManager,
+      rp: ResourceProfile): Int = {
+    manager invokePrivate _maxNumExecutorsNeededPerResourceProfile(rp.id)
   }
 
   private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
     manager invokePrivate _updateAndSyncNumExecutorsTarget(0L)
   }
 
-  private def removeExecutors(manager: ExecutorAllocationManager, ids: Seq[String]): Seq[String] = {
+  private def removeExecutorsDefaultProfile(
+      manager: ExecutorAllocationManager,
+      ids: Seq[String]): Seq[String] = {
+    val idsAndProfileIds = ids.map((_, defaultProfile.id))
+    manager invokePrivate _removeExecutors(idsAndProfileIds)
+  }
+
+  private def removeExecutors(
+      manager: ExecutorAllocationManager,
+      ids: Seq[(String, Int)]): Seq[String] = {
     manager invokePrivate _removeExecutors(ids)
   }
 
@@ -1280,15 +1638,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _onSpeculativeTaskSubmitted(id)
   }
 
-  private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _localityAwareTasks()
+  private def localityAwareTasksForDefaultProfile(manager: ExecutorAllocationManager): Int = {
+    val localMap = manager invokePrivate _localityAwareTasksPerResourceProfileId()
+    localMap(defaultProfile.id)
+  }
+
+  private def totalRunningTasksPerResourceProfile(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _totalRunningTasksPerResourceProfile(defaultProfile.id)
   }
 
-  private def totalRunningTasks(manager: ExecutorAllocationManager): Int = {
-    manager invokePrivate _totalRunningTasks()
+  private def hostToLocalTaskCount(
+      manager: ExecutorAllocationManager): Map[String, Int] = {
+    val rpIdToHostLocal = manager invokePrivate _rpIdToHostToLocalTaskCount()
+    rpIdToHostLocal(defaultProfile.id)
   }
 
-  private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
-    manager invokePrivate _hostToLocalTaskCount()
+  private def getResourceProfileIdOfExecutor(manager: ExecutorAllocationManager): Int = {
+    defaultProfile.id
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index ff0f2f9..a929695 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -285,9 +285,14 @@ private class FakeSchedulerBackend(
     clusterManagerEndpoint: RpcEndpointRef)
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
-  protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+  protected override def doRequestTotalExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
     clusterManagerEndpoint.ask[Boolean](
-      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty))
+      RequestExecutors(
+        resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)),
+        numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
+        rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
+        Set.empty))
   }
 
   protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 1fe12e1..599ea89 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -44,7 +44,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   def resetSparkContext(): Unit = {
     LocalSparkContext.stop(sc)
-    ResourceProfile.clearDefaultProfile
+    ResourceProfile.clearDefaultProfile()
     sc = null
   }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index df9c7c5..b6dfa69 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.TestUtils._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.resource.ResourceAllocation
 import org.apache.spark.resource.ResourceUtils._
@@ -784,7 +785,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
   }
 
   test(s"Avoid setting ${CPUS_PER_TASK.key} unreasonably (SPARK-27192)") {
-    val FAIL_REASON = s"has to be >= the task config: ${CPUS_PER_TASK.key}"
+    val FAIL_REASON = " has to be >= the number of cpus per task"
     Seq(
       ("local", 2, None),
       ("local[2]", 3, None),
@@ -864,9 +865,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("The executor resource config: spark.executor.resource.gpu.amount " +
-      "needs to be specified since a task requirement config: spark.task.resource.gpu.amount " +
-      "was specified"))
+    assert(error.contains("No executor resource configs were not specified for the following " +
+      "task configs: gpu"))
   }
 
   test("Test parsing resources executor config < task requirements") {
@@ -880,15 +880,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("The executor resource config: spark.executor.resource.gpu.amount = 1 " +
-      "has to be >= the requested amount in task resource config: " +
-      "spark.task.resource.gpu.amount = 2"))
+    assert(error.contains("The executor resource: gpu, amount: 1 needs to be >= the task " +
+      "resource request amount of 2.0"))
   }
 
   test("Parse resources executor config not the same multiple numbers of the task requirements") {
     val conf = new SparkConf()
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
+    conf.set(RESOURCES_WARNING_TESTING, true)
     conf.set(TASK_GPU_ID.amountConf, "2")
     conf.set(EXECUTOR_GPU_ID.amountConf, "4")
 
@@ -897,25 +897,9 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
     }.getMessage()
 
     assert(error.contains(
-      "The configuration of resource: gpu (exec = 4, task = 2, runnable tasks = 2) will result " +
-        "in wasted resources due to resource CPU limiting the number of runnable tasks per " +
-        "executor to: 1. Please adjust your configuration."))
-  }
-
-  test("Parse resources executor config cpus not limiting resource") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[1, 8, 1024]")
-      .setAppName("test-cluster")
-    conf.set(TASK_GPU_ID.amountConf, "2")
-    conf.set(EXECUTOR_GPU_ID.amountConf, "4")
-
-    var error = intercept[IllegalArgumentException] {
-      sc = new SparkContext(conf)
-    }.getMessage()
-
-    assert(error.contains("The number of slots on an executor has to be " +
-      "limited by the number of cores, otherwise you waste resources and " +
-      "dynamic allocation doesn't work properly"))
+      "The configuration of resource: gpu (exec = 4, task = 2.0/1, runnable tasks = 2) will " +
+        "result in wasted resources due to resource cpus limiting the number of runnable " +
+        "tasks per executor to: 1. Please adjust your configuration."))
   }
 
   test("test resource scheduling under local-cluster mode") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala
index 86511ae..c905797 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
 
 import org.apache.spark.{SparkFunSuite, Success, TaskResultLost, TaskState}
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.status.ListenerEventsTestHelper
 
@@ -141,7 +142,8 @@ class BasicEventFilterBuilderSuite extends SparkFunSuite {
     // - Re-submit stage 1, all tasks, and succeed them and the stage.
     val oldS1 = stages.last
     val newS1 = new StageInfo(oldS1.stageId, oldS1.attemptNumber + 1, oldS1.name, oldS1.numTasks,
-      oldS1.rddInfos, oldS1.parentIds, oldS1.details, oldS1.taskMetrics)
+      oldS1.rddInfos, oldS1.parentIds, oldS1.details, oldS1.taskMetrics,
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     time += 1
     newS1.submissionTime = Some(time)
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
new file mode 100644
index 0000000..0752603
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
+
+class ResourceProfileManagerSuite extends SparkFunSuite {
+
+  override def beforeAll() {
+    try {
+      ResourceProfile.clearDefaultProfile()
+    } finally {
+      super.beforeAll()
+    }
+  }
+
+  override def afterEach() {
+    try {
+      ResourceProfile.clearDefaultProfile()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  test("ResourceProfileManager") {
+    val conf = new SparkConf().set(EXECUTOR_CORES, 4)
+    val rpmanager = new ResourceProfileManager(conf)
+    val defaultProf = rpmanager.defaultResourceProfile
+    assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    assert(defaultProf.executorResources.size === 2,
+      "Executor resources should contain cores and memory by default")
+    assert(defaultProf.executorResources(ResourceProfile.CORES).amount === 4,
+      s"Executor resources should have 4 cores")
+  }
+
+  test("isSupported yarn no dynamic allocation") {
+    val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+    val rpmanager = new ResourceProfileManager(conf)
+    // default profile should always work
+    val defaultProf = rpmanager.defaultResourceProfile
+    val rprof = new ResourceProfileBuilder()
+    val gpuExecReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "someScript")
+    val immrprof = rprof.require(gpuExecReq).build
+    val error = intercept[SparkException] {
+      rpmanager.isSupported(immrprof)
+    }.getMessage()
+
+    assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation"))
+  }
+
+  test("isSupported yarn with dynamic allocation") {
+    val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
+    conf.set(DYN_ALLOCATION_ENABLED, true)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+    val rpmanager = new ResourceProfileManager(conf)
+    // default profile should always work
+    val defaultProf = rpmanager.defaultResourceProfile
+    val rprof = new ResourceProfileBuilder()
+    val gpuExecReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "someScript")
+    val immrprof = rprof.require(gpuExecReq).build
+    assert(rpmanager.isSupported(immrprof) == true)
+  }
+
+  test("isSupported yarn with local mode") {
+    val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+    val rpmanager = new ResourceProfileManager(conf)
+    // default profile should always work
+    val defaultProf = rpmanager.defaultResourceProfile
+    val rprof = new ResourceProfileBuilder()
+    val gpuExecReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "someScript")
+    val immrprof = rprof.require(gpuExecReq).build
+    var error = intercept[SparkException] {
+      rpmanager.isSupported(immrprof)
+    }.getMessage()
+
+    assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation"))
+  }
+
+
+
+}
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index c0637ee..b2f2c36 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -18,18 +18,28 @@
 package org.apache.spark.resource
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, SPARK_EXECUTOR_PREFIX}
+import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
 import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
+import org.apache.spark.resource.TestResourceIDs._
 
 class ResourceProfileSuite extends SparkFunSuite {
 
+  override def beforeAll() {
+    try {
+      ResourceProfile.clearDefaultProfile()
+    } finally {
+      super.beforeAll()
+    }
+  }
+
   override def afterEach() {
     try {
-      ResourceProfile.clearDefaultProfile
+      ResourceProfile.clearDefaultProfile()
     } finally {
       super.afterEach()
     }
   }
+
   test("Default ResourceProfile") {
     val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
     assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
@@ -59,18 +69,19 @@ class ResourceProfileSuite extends SparkFunSuite {
     conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g")
     conf.set(EXECUTOR_MEMORY.key, "4g")
     conf.set(EXECUTOR_CORES.key, "4")
-    conf.set("spark.task.resource.gpu.amount", "1")
-    conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1")
-    conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript")
+    conf.set(TASK_GPU_ID.amountConf, "1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "1")
+    conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, "nameOfScript")
     val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
     assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val execResources = rprof.executorResources
-    assert(execResources.size === 5,
-      "Executor resources should contain cores, memory, and gpu " + execResources)
+    assert(execResources.size === 5, s"Executor resources should contain cores, pyspark " +
+      s"memory, memory overhead, memory, and gpu $execResources")
     assert(execResources.contains("gpu"), "Executor resources should have gpu")
     assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
       "Executor resources should have 4 core")
-    assert(rprof.getExecutorCores.get === 4, "Executor resources should have 4 core")
+    assert(rprof.getExecutorCores.get === 4,
+      "Executor resources should have 4 core")
     assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
       "Executor resources should have 1024 memory")
     assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount == 2048,
@@ -84,12 +95,60 @@ class ResourceProfileSuite extends SparkFunSuite {
 
   test("test default profile task gpus fractional") {
     val sparkConf = new SparkConf()
-      .set("spark.executor.resource.gpu.amount", "2")
-      .set("spark.task.resource.gpu.amount", "0.33")
+      .set(EXECUTOR_GPU_ID.amountConf, "2")
+      .set(TASK_GPU_ID.amountConf, "0.33")
     val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
     assert(immrprof.taskResources.get("gpu").get.amount == 0.33)
   }
 
+  test("maxTasksPerExecutor cpus") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_CORES, 1)
+    val rprof = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val execReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
+    rprof.require(taskReq).require(execReq)
+    val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources)
+    assert(immrprof.limitingResource(sparkConf) == "cpus")
+    assert(immrprof.maxTasksPerExecutor(sparkConf) == 1)
+  }
+
+  test("maxTasksPerExecutor/limiting no executor cores") {
+    val sparkConf = new SparkConf().setMaster("spark://testing")
+    val rprof = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val execReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
+    rprof.require(taskReq).require(execReq)
+    val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources)
+    assert(immrprof.limitingResource(sparkConf) == "gpu")
+    assert(immrprof.maxTasksPerExecutor(sparkConf) == 2)
+    assert(immrprof.isCoresLimitKnown == false)
+  }
+
+  test("maxTasksPerExecutor/limiting no other resource no executor cores") {
+    val sparkConf = new SparkConf().setMaster("spark://testing")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.limitingResource(sparkConf) == "")
+    assert(immrprof.maxTasksPerExecutor(sparkConf) == 1)
+    assert(immrprof.isCoresLimitKnown == false)
+  }
+
+  test("maxTasksPerExecutor/limiting executor cores") {
+    val sparkConf = new SparkConf().setMaster("spark://testing").set(EXECUTOR_CORES, 2)
+    val rprof = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val execReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
+    rprof.require(taskReq).require(execReq)
+    val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources)
+    assert(immrprof.limitingResource(sparkConf) == ResourceProfile.CPUS)
+    assert(immrprof.maxTasksPerExecutor(sparkConf) == 2)
+    assert(immrprof.isCoresLimitKnown == true)
+  }
+
+
   test("Create ResourceProfile") {
     val rprof = new ResourceProfileBuilder()
     val taskReq = new TaskResourceRequests().resource("gpu", 1)
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
index dffe9a0..278a72a 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
@@ -26,8 +26,10 @@ import org.json4s.{DefaultFormats, Extraction}
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.TestUtils._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
+import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.util.Utils
 
 class ResourceUtilsSuite extends SparkFunSuite
@@ -165,6 +167,7 @@ class ResourceUtilsSuite extends SparkFunSuite
       val rpBuilder = new ResourceProfileBuilder()
       val ereqs = new ExecutorResourceRequests().resource(GPU, 2, gpuDiscovery)
       val treqs = new TaskResourceRequests().resource(GPU, 1)
+
       val rp = rpBuilder.require(ereqs).require(treqs).build
       val resourcesFromBoth = getOrDiscoverAllResourcesForResourceProfile(
         Some(resourcesFile), SPARK_EXECUTOR_PREFIX, rp, conf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index c063301..7666c6c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@@ -187,8 +187,6 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
   }
 
   test("extra resources from executor") {
-    import TestUtils._
-
     val conf = new SparkConf()
       .set(EXECUTOR_CORES, 1)
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
@@ -200,6 +198,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     conf.set(EXECUTOR_GPU_ID.amountConf, "1")
 
     sc = new SparkContext(conf)
+    val execGpu = new ExecutorResourceRequests().cores(1).resource(GPU, 3)
+    val taskGpu = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+    val rp = new ResourceProfile(execGpu.requests, taskGpu.requests)
+    sc.resourceProfileManager.addResourceProfile(rp)
+    assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
     val mockEndpointRef = mock[RpcEndpointRef]
     val mockAddress = mock[RpcAddress]
@@ -224,7 +227,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
         ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     backend.driverEndpoint.askSync[Boolean](
       RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
-        5))
+        rp.id))
 
     val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
     val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
@@ -234,7 +237,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
 
     var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
-    assert(exec3ResourceProfileId === 5)
+    assert(exec3ResourceProfileId === rp.id)
 
     val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
     var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 2869240..61ea21f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io._
 import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.{JsonProtocol, Utils}
 
@@ -438,12 +439,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
 
   private def createStageSubmittedEvent(stageId: Int) = {
     SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0,
-      Seq.empty, Seq.empty, "details"))
+      Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
   }
 
   private def createStageCompletedEvent(stageId: Int) = {
     SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0,
-      Seq.empty, Seq.empty, "details"))
+      Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
   }
 
   private def createExecutorAddedEvent(executorId: Int) = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 615389a..3596a9e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark._
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, UNKNOWN_RESOURCE_PROFILE_ID}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
@@ -255,25 +256,28 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   test("track executors pending for removal") {
     knownExecs ++= Set("1", "2", "3")
 
+    val execInfoRp1 = new ExecutorInfo("host1", 1, Map.empty,
+      Map.empty, Map.empty, 1)
+
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
-    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfoRp1))
     clock.setTime(idleDeadline)
-    assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3"))
+    assert(monitor.timedOutExecutors().toSet === Set(("1", 0), ("2", 0), ("3", 1)))
     assert(monitor.pendingRemovalCount === 0)
 
     // Notify that only a subset of executors was killed, to mimic the case where the scheduler
     // refuses to kill an executor that is busy for whatever reason the monitor hasn't detected yet.
     monitor.executorsKilled(Seq("1"))
-    assert(monitor.timedOutExecutors().toSet === Set("2", "3"))
+    assert(monitor.timedOutExecutors().toSet === Set(("2", 0), ("3", 1)))
     assert(monitor.pendingRemovalCount === 1)
 
     // Check the timed out executors again so that we're sure they're still timed out when no
     // events happen. This ensures that the monitor doesn't lose track of them.
-    assert(monitor.timedOutExecutors().toSet === Set("2", "3"))
+    assert(monitor.timedOutExecutors().toSet === Set(("2", 0), ("3", 1)))
 
     monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 1)))
-    assert(monitor.timedOutExecutors().toSet === Set("3"))
+    assert(monitor.timedOutExecutors().toSet === Set(("3", 1)))
 
     monitor.executorsKilled(Seq("3"))
     assert(monitor.pendingRemovalCount === 2)
@@ -282,7 +286,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
       new ExecutorMetrics, null))
     assert(monitor.timedOutExecutors().isEmpty)
     clock.advance(idleDeadline)
-    assert(monitor.timedOutExecutors().toSet === Set("2"))
+    assert(monitor.timedOutExecutors().toSet === Set(("2", 0)))
   }
 
   test("shuffle block tracking") {
@@ -435,7 +439,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
 
   private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = {
     new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "",
-      shuffleDepId = if (shuffleId >= 0) Some(shuffleId) else None)
+      shuffleDepId = if (shuffleId >= 0) Some(shuffleId) else None,
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   private def taskInfo(
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 255f918..24eb168 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark._
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.metrics.ExecutorMetricType
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster._
 import org.apache.spark.status.ListenerEventsTestHelper._
@@ -151,8 +152,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     // Start a job with 2 stages / 4 tasks each
     time += 1
     val stages = Seq(
-      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
-      new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2"))
+      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
+      new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
 
     val jobProps = new Properties()
     jobProps.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, "jobDescription")
@@ -524,7 +527,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     // - Re-submit stage 2, all tasks, and succeed them and the stage.
     val oldS2 = stages.last
     val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks,
-      oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
+      oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics,
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     time += 1
     newS2.submissionTime = Some(time)
@@ -575,8 +579,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     // change the stats of the already finished job.
     time += 1
     val j2Stages = Seq(
-      new StageInfo(3, 0, "stage1", 4, Nil, Nil, "details1"),
-      new StageInfo(4, 0, "stage2", 4, Nil, Seq(3), "details2"))
+      new StageInfo(3, 0, "stage1", 4, Nil, Nil, "details1",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
+      new StageInfo(4, 0, "stage2", 4, Nil, Seq(3), "details2",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     j2Stages.last.submissionTime = Some(time)
     listener.onJobStart(SparkListenerJobStart(2, time, j2Stages, null))
     assert(store.count(classOf[JobDataWrapper]) === 2)
@@ -703,7 +709,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     // Submit a stage for the first RDD before it's marked for caching, to make sure later
     // the listener picks up the correct storage level.
     val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, StorageLevel.NONE, false, Nil)
-    val stage0 = new StageInfo(0, 0, "stage0", 4, Seq(rdd1Info), Nil, "details0")
+    val stage0 = new StageInfo(0, 0, "stage0", 4, Seq(rdd1Info), Nil, "details0",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage0, new Properties()))
     listener.onStageCompleted(SparkListenerStageCompleted(stage0))
     assert(store.count(classOf[RDDStorageInfoWrapper]) === 0)
@@ -711,7 +718,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     // Submit a stage and make sure the RDDs are recorded.
     rdd1Info.storageLevel = level
     val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil)
-    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1")
+    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
 
     check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
@@ -1018,9 +1026,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     // data is not deleted.
     time += 1
     val stages = Seq(
-      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
-      new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2"),
-      new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3"))
+      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
+      new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
+      new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
 
     // Graph data is generated by the job start event, so fire it.
     listener.onJobStart(SparkListenerJobStart(4, time, stages, null))
@@ -1068,7 +1079,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
     assert(store.count(classOf[CachedQuantile], "stage", key(dropped)) === 0)
 
-    val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3")
+    val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     time += 1
     attempt2.submissionTime = Some(time)
     listener.onStageSubmitted(SparkListenerStageSubmitted(attempt2, new Properties()))
@@ -1139,9 +1151,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
     val listener = new AppStatusListener(store, testConf, true)
 
-    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
-    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
-    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     // Start stage 1 and stage 2
     time += 1
@@ -1172,8 +1187,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
     val listener = new AppStatusListener(store, testConf, true)
 
-    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
-    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     // Sart job 1
     time += 1
@@ -1193,7 +1210,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
 
     // Submit stage 3 and verify stage 2 is evicted
-    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     time += 1
     stage3.submissionTime = Some(time)
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
@@ -1208,7 +1226,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
     val listener = new AppStatusListener(store, testConf, true)
 
-    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     stage1.submissionTime = Some(time)
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
 
@@ -1243,9 +1262,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     val listener = new AppStatusListener(store, testConf, true)
     val appStore = new AppStatusStore(store)
 
-    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
-    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
-    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     time += 1
     stage1.submissionTime = Some(time)
@@ -1274,8 +1296,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
   test("SPARK-24415: update metrics for tasks that finish late") {
     val listener = new AppStatusListener(store, conf, true)
 
-    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
-    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     // Start job
     listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
@@ -1340,7 +1364,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
 
       listener.onExecutorAdded(createExecutorAddedEvent(1))
       listener.onExecutorAdded(createExecutorAddedEvent(2))
-      val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+      val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
       listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
       listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
 
@@ -1577,7 +1602,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
 
     // Submit a stage and make sure the RDDs are recorded.
     val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil)
-    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1")
+    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
 
     // Add partition 1 replicated on two block managers.
diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala
index 4b3fbac..99c0d95 100644
--- a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala
+++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala
@@ -23,6 +23,7 @@ import scala.collection.immutable.Map
 
 import org.apache.spark.{AccumulatorSuite, SparkContext, Success, TaskState}
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{RDDInfo, StorageLevel}
@@ -61,7 +62,8 @@ object ListenerEventsTestHelper {
   }
 
   def createStage(id: Int, rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = {
-    new StageInfo(id, 0, s"stage${id}", 4, rdds, parentIds, s"details${id}")
+    new StageInfo(id, 0, s"stage${id}", 4, rdds, parentIds, s"details${id}",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   def createStage(rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = {
@@ -96,13 +98,15 @@ object ListenerEventsTestHelper {
   /** Create a stage submitted event for the specified stage Id. */
   def createStageSubmittedEvent(stageId: Int): SparkListenerStageSubmitted = {
     SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0,
-      Seq.empty, Seq.empty, "details"))
+      Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
   }
 
   /** Create a stage completed event for the specified stage Id. */
   def createStageCompletedEvent(stageId: Int): SparkListenerStageCompleted = {
     SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0,
-      Seq.empty, Seq.empty, "details"))
+      Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
   }
 
   def createExecutorAddedEvent(executorId: Int): SparkListenerExecutorAdded = {
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index bd18e9e..7711934 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -27,6 +27,7 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
 import org.apache.spark._
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
@@ -131,7 +132,8 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
       val page = new StagePage(tab, statusStore)
 
       // Simulate a stage in job progress listener
-      val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
+      val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
       // Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness
       (1 to 2).foreach {
         taskId =>
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a2a4b3a..edc0662 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -32,8 +32,7 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.resource.ResourceInformation
-import org.apache.spark.resource.ResourceUtils
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceUtils}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.shuffle.MetadataFetchFailedException
@@ -341,7 +340,8 @@ class JsonProtocolSuite extends SparkFunSuite {
     val stageIds = Seq[Int](1, 2, 3, 4)
     val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
     val dummyStageInfos =
-      stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
+      stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown",
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
     val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
     val expectedJobStart =
@@ -383,9 +383,11 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("StageInfo backward compatibility (parent IDs)") {
     // Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property
-    val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details")
+    val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val oldStageInfo = JsonProtocol.stageInfoToJson(stageInfo).removeField({ _._1 == "Parent IDs"})
-    val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details")
+    val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
   }
 
@@ -873,7 +875,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
 
   private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
     val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
-    val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details")
+    val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
     stageInfo.accumulables(acc1.id) = acc1
     stageInfo.accumulables(acc2.id) = acc2
diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py
index c7f435a..edfea42 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -275,9 +275,13 @@ class ContextTestsWithResources(unittest.TestCase):
         self.tempFile = tempfile.NamedTemporaryFile(delete=False)
         self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}')
         self.tempFile.close()
+        # create temporary directory for Worker resources coordination
+        self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+        os.unlink(self.tempdir.name)
         os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP |
                  stat.S_IROTH | stat.S_IXOTH)
         conf = SparkConf().set("spark.test.home", SPARK_HOME)
+        conf = conf.set("spark.resources.dir", self.tempdir.name)
         conf = conf.set("spark.driver.resource.gpu.amount", "1")
         conf = conf.set("spark.driver.resource.gpu.discoveryScript", self.tempFile.name)
         self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf)
@@ -292,6 +296,7 @@ class ContextTestsWithResources(unittest.TestCase):
 
     def tearDown(self):
         os.unlink(self.tempFile.name)
+        shutil.rmtree(self.tempdir.name)
         self.sc.stop()
 
 
diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py
index 6095a38..68cfe81 100644
--- a/python/pyspark/tests/test_taskcontext.py
+++ b/python/pyspark/tests/test_taskcontext.py
@@ -16,6 +16,7 @@
 #
 import os
 import random
+import shutil
 import stat
 import sys
 import tempfile
@@ -277,9 +278,13 @@ class TaskContextTestsWithResources(unittest.TestCase):
         self.tempFile = tempfile.NamedTemporaryFile(delete=False)
         self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}')
         self.tempFile.close()
+        # create temporary directory for Worker resources coordination
+        self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+        os.unlink(self.tempdir.name)
         os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP |
                  stat.S_IROTH | stat.S_IXOTH)
         conf = SparkConf().set("spark.test.home", SPARK_HOME)
+        conf = conf.set("spark.resources.dir", self.tempdir.name)
         conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.tempFile.name)
         conf = conf.set("spark.worker.resource.gpu.amount", 1)
         conf = conf.set("spark.task.resource.gpu.amount", "1")
@@ -297,6 +302,7 @@ class TaskContextTestsWithResources(unittest.TestCase):
 
     def tearDown(self):
         os.unlink(self.tempFile.name)
+        shutil.rmtree(self.tempdir.name)
         self.sc.stop()
 
 if __name__ == "__main__":
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 105841a..5655ef5 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@@ -55,6 +56,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
 
+  private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile
+
   // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
   private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
     if (isExecutorActive(executorId)) {
@@ -116,8 +119,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
     }
   }
 
-  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
-    podAllocator.setTotalExpectedExecutors(requestedTotal)
+  override def doRequestTotalExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
+    podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs(defaultProfile))
     Future.successful(true)
   }
 
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 7e1e39c..8c683e8 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.resource.ResourceProfileManager
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -86,10 +87,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
   private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
 
+  private val resourceProfileManager = new ResourceProfileManager(sparkConf)
+
   before {
     MockitoAnnotations.initMocks(this)
     when(taskScheduler.sc).thenReturn(sc)
     when(sc.conf).thenReturn(sparkConf)
+    when(sc.resourceProfileManager).thenReturn(resourceProfileManager)
     when(sc.env).thenReturn(env)
     when(env.rpcEnv).thenReturn(rpcEnv)
     driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e916125..0b44702 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -38,6 +38,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef}
 import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -181,6 +182,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   private var schedulerDriver: SchedulerDriver = _
 
+  private val defaultProfile = sc.resourceProfileManager.defaultResourceProfile
+
+
   def newMesosTaskId(): String = {
     val id = nextMesosTaskId
     nextMesosTaskId += 1
@@ -595,13 +599,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   private def satisfiesLocality(offerHostname: String): Boolean = {
+    val hostToLocalTaskCount =
+      rpHostToLocalTaskCount.getOrElse(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, Map.empty)
     if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) {
       return true
     }
 
     // Check the locality information
     val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
-    val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+    val allDesiredHosts = hostToLocalTaskCount.map { case (k, v) => k }.toSet
+
     // Try to match locality for hosts which do not have executors yet, to potentially
     // increase coverage.
     val remainingHosts = allDesiredHosts -- currentHosts
@@ -759,11 +766,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       super.applicationId
     }
 
-  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
+  override def doRequestTotalExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]
+  ): Future[Boolean] = Future.successful {
     // We don't truly know if we can fulfill the full amount of executors
     // since at coarse grain it depends on the amount of slaves available.
-    logInfo("Capping the total amount of executors to " + requestedTotal)
-    executorLimitOption = Some(requestedTotal)
+    val numExecs = resourceProfileToTotalExecs.getOrElse(defaultProfile, 0)
+    logInfo("Capping the total amount of executors to " + numExecs)
+    executorLimitOption = Some(numExecs)
     // Update the locality wait start time to continue trying for locality.
     localityWaitStartTimeNs = System.nanoTime()
     true
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 1876861..5ab277ed 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -71,8 +71,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     offerResources(offers)
     verifyTaskLaunched(driver, "o1")
 
+    val totalExecs = Map(ResourceProfile.getOrCreateDefaultProfile(sparkConf) -> 0)
     // kills executors
-    assert(backend.doRequestTotalExecutors(0).futureValue)
+    val defaultResourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(backend.doRequestTotalExecutors(Map(defaultResourceProfile -> 0)).futureValue)
     assert(backend.doKillExecutors(Seq("0")).futureValue)
     val taskID0 = createTaskId("0")
     verify(driver, times(1)).killTask(taskID0)
@@ -82,7 +84,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     verifyDeclinedOffer(driver, createOfferId("o2"))
 
     // Launches a new task when requested executors is positive
-    backend.doRequestTotalExecutors(2)
+    backend.doRequestTotalExecutors(Map(defaultResourceProfile -> 2))
     offerResources(offers, 2)
     verifyTaskLaunched(driver, "o2")
   }
@@ -635,7 +637,12 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     assert(backend.getExecutorIds().isEmpty)
 
-    backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 1))
+    val defaultProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+    val defaultProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    backend.requestTotalExecutors(
+      Map(defaultProfileId -> 2),
+      Map(defaultProfileId -> 2),
+      Map(defaultProfileId -> Map("hosts10" -> 1, "hosts11" -> 1)))
 
     // Offer non-local resources, which should be rejected
     offerResourcesAndVerify(1, false)
@@ -651,7 +658,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     offerResourcesAndVerify(1, true)
 
     // Update total executors
-    backend.requestTotalExecutors(3, 3, Map("hosts10" -> 1, "hosts11" -> 1, "hosts12" -> 1))
+    backend.requestTotalExecutors(
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 3),
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 2),
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
+        Map("hosts10" -> 1, "hosts11" -> 1, "hosts12" -> 1)))
 
     // Offer non-local resources, which should be rejected
     offerResourcesAndVerify(3, false)
@@ -660,8 +671,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     Thread.sleep(2000)
 
     // Update total executors
-    backend.requestTotalExecutors(4, 4, Map("hosts10" -> 1, "hosts11" -> 1, "hosts12" -> 1,
-      "hosts13" -> 1))
+    backend.requestTotalExecutors(
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 4),
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 4),
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
+            Map("hosts10" -> 1, "hosts11" -> 1, "hosts12" -> 1, "hosts13" -> 1)))
 
     // Offer non-local resources, which should be rejected
     offerResourcesAndVerify(3, false)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 471ee58..f8bbc39 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -27,13 +27,13 @@ import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -92,7 +92,7 @@ private[spark] abstract class YarnSchedulerBackend(
     try {
       // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which
       // was Stopped by SchedulerBackend.
-      requestTotalExecutors(0, 0, Map.empty)
+      requestTotalExecutors(Map.empty, Map.empty, Map.empty)
       super.stop()
     } finally {
       stopped.set(true)
@@ -123,21 +123,28 @@ private[spark] abstract class YarnSchedulerBackend(
     }
   }
 
-  private[cluster] def prepareRequestExecutors(requestedTotal: Int): RequestExecutors = {
+  private[cluster] def prepareRequestExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): RequestExecutors = {
     val nodeBlacklist: Set[String] = scheduler.nodeBlacklist()
     // For locality preferences, ignore preferences for nodes that are blacklisted
-    val filteredHostToLocalTaskCount =
-      hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklist.contains(k) }
-    RequestExecutors(requestedTotal, localityAwareTasks, filteredHostToLocalTaskCount,
-      nodeBlacklist)
+    val filteredRPHostToLocalTaskCount = rpHostToLocalTaskCount.map { case (rpid, v) =>
+      (rpid, v.filter { case (host, count) => !nodeBlacklist.contains(host) })
+    }
+    // TODO - default everything to default profile until YARN pieces
+    val defaultProf = ResourceProfile.getOrCreateDefaultProfile(conf)
+    val hostToLocalTaskCount = filteredRPHostToLocalTaskCount.getOrElse(defaultProf.id, Map.empty)
+    val localityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(defaultProf.id, 0)
+    val numExecutors = resourceProfileToTotalExecs.getOrElse(defaultProf, 0)
+    RequestExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)
   }
 
   /**
    * Request executors from the ApplicationMaster by specifying the total number desired.
    * This includes executors already pending or running.
    */
-  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
-    yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
+  override def doRequestTotalExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
+    yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(resourceProfileToTotalExecs))
   }
 
   /**
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index a87820b..c0c6fff 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -24,6 +24,7 @@ import org.mockito.Mockito.when
 import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.ui.TestFilter
@@ -51,7 +52,8 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
   private class TestYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
       extends YarnSchedulerBackend(scheduler, sc) {
     def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
-      this.hostToLocalTaskCount = hostToLocalTaskCount
+      this.rpHostToLocalTaskCount = Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
+        hostToLocalTaskCount)
     }
   }
 
@@ -72,7 +74,8 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
     } {
       yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
       sched.setNodeBlacklist(blacklist)
-      val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numRequested)
+      val numReq = Map(ResourceProfile.getOrCreateDefaultProfile(sc.getConf) -> numRequested)
+      val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numReq)
       assert(req.requestedTotal === numRequested)
       assert(req.nodeBlacklist === blacklist)
       assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
index a88abc8..c09ff51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{SparkConf, TaskState}
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.SQLMetricInfo
@@ -89,7 +90,8 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
 
     val taskEventsTime = (0 until numStages).map { _ =>
       val stageInfo = new StageInfo(idgen.incrementAndGet(), 0, getClass().getName(),
-        numTasks, Nil, Nil, getClass().getName())
+        numTasks, Nil, Nil, getClass().getName(),
+        resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
       val jobId = idgen.incrementAndGet()
       val jobStart = SparkListenerJobStart(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 55b551d..fdfd392 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -86,7 +87,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
       name = "",
       rddInfos = Nil,
       parentIds = Nil,
-      details = "")
+      details = "",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   private def createTaskInfo(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index e85a3b9..58bd56c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -23,6 +23,7 @@ import scala.util.Random
 import org.apache.spark.{ExecutorAllocationClient, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Streaming._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, Utils}
 
@@ -111,7 +112,11 @@ private[streaming] class ExecutorAllocationManager(
     logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
     val targetTotalExecutors =
       math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
-    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
+    // Just map the targetTotalExecutors to the default ResourceProfile
+    client.requestTotalExecutors(
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> targetTotalExecutors),
+      Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0),
+      Map.empty)
     logInfo(s"Requested total $targetTotalExecutors executors")
   }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 9121da4b..65efa10 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -27,6 +27,7 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark.{ExecutorAllocationClient, SparkConf}
 import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING}
 import org.apache.spark.internal.config.Streaming._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase}
 import org.apache.spark.util.{ManualClock, Utils}
 
@@ -71,10 +72,15 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
         if (expectedRequestedTotalExecs.nonEmpty) {
           require(expectedRequestedTotalExecs.get > 0)
           verify(allocationClient, times(1)).requestTotalExecutors(
-            meq(expectedRequestedTotalExecs.get), meq(0), meq(Map.empty))
+              meq(Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
+                expectedRequestedTotalExecs.get)),
+              meq(Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0)),
+              meq(Map.empty))
         } else {
-          verify(allocationClient, never).requestTotalExecutors(0, 0, Map.empty)
-        }
+          verify(allocationClient, never).requestTotalExecutors(
+            Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0),
+            Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0),
+            Map.empty)}
       }
 
       /** Verify that a particular executor was killed */
@@ -139,8 +145,11 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
       reset(allocationClient)
       when(allocationClient.getExecutorIds()).thenReturn((1 to numExecs).map(_.toString))
       requestExecutors(allocationManager, numNewExecs)
-      verify(allocationClient, times(1)).requestTotalExecutors(
-        meq(expectedRequestedTotalExecs), meq(0), meq(Map.empty))
+      val defaultProfId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+      verify(allocationClient, times(1)).
+        requestTotalExecutors(
+          meq(Map(defaultProfId -> expectedRequestedTotalExecs)),
+          meq(Map(defaultProfId -> 0)), meq(Map.empty))
     }
 
     withAllocationManager(numReceivers = 1) { case (_, allocationManager) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org