You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/02/28 21:23:59 UTC

[spark] branch master updated: [SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e2ca11  [SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling
0e2ca11 is described below

commit 0e2ca11d80c3921387d7b077cb64c3a0c06b08d7
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Fri Feb 28 15:23:33 2020 -0600

    [SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling
    
    ### What changes were proposed in this pull request?
    
    Yarn side changes for Stage level scheduling.  The previous PR for dynamic allocation changes was https://github.com/apache/spark/pull/27313
    
    Modified the data structures to store things on a per ResourceProfile basis.
     I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same.
    On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level.
    Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for.
    The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests.
    
    If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference
    
    ### Why are the changes needed?
    
    For stage level scheduling YARN support.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Tested manually on YARN cluster and then unit tests.
    
    Closes #27583 from tgravescs/SPARK-29149.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../cluster/CoarseGrainedClusterMessage.scala      |   6 +-
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |  30 +-
 .../spark/deploy/yarn/ApplicationMaster.scala      |   9 +-
 .../deploy/yarn/ApplicationMasterSource.scala      |   4 +-
 ...calityPreferredContainerPlacementStrategy.scala |  25 +-
 .../spark/deploy/yarn/ResourceRequestHelper.scala  |  11 +
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 556 ++++++++++++++-------
 .../scheduler/cluster/YarnSchedulerBackend.scala   |   8 +-
 .../yarn/ContainerPlacementStrategySuite.scala     |  46 +-
 .../yarn/LocalityPlacementStrategySuite.scala      |   6 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 258 +++++++---
 .../cluster/YarnSchedulerBackendSuite.scala        |  20 +-
 12 files changed, 663 insertions(+), 316 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 8db0122..465c0d2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -117,9 +117,9 @@ private[spark] object CoarseGrainedClusterMessages {
   // Request executors by specifying the new total number of executors desired
   // This includes executors already pending or running
   case class RequestExecutors(
-      requestedTotal: Int,
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int],
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int],
+      numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
+      hostToLocalTaskCount: Map[Int, Map[String, Int]],
       nodeBlacklist: Set[String])
     extends CoarseGrainedClusterMessage
 
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index a929695..3126913 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
-import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -61,6 +61,7 @@ class HeartbeatReceiverSuite
     PrivateMethod[collection.Map[String, Long]](Symbol("executorLastSeen"))
   private val _executorTimeoutMs = PrivateMethod[Long](Symbol("executorTimeoutMs"))
   private val _killExecutorThread = PrivateMethod[ExecutorService](Symbol("killExecutorThread"))
+  var conf: SparkConf = _
 
   /**
    * Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]]
@@ -68,7 +69,7 @@ class HeartbeatReceiverSuite
    */
   override def beforeEach(): Unit = {
     super.beforeEach()
-    val conf = new SparkConf()
+    conf = new SparkConf()
       .setMaster("local[2]")
       .setAppName("test")
       .set(DYN_ALLOCATION_TESTING, true)
@@ -76,7 +77,6 @@ class HeartbeatReceiverSuite
     scheduler = mock(classOf[TaskSchedulerImpl])
     when(sc.taskScheduler).thenReturn(scheduler)
     when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
-    when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
     when(scheduler.sc).thenReturn(sc)
     heartbeatReceiverClock = new ManualClock
     heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
@@ -164,9 +164,10 @@ class HeartbeatReceiverSuite
   test("expire dead hosts should kill executors with replacement (SPARK-8119)") {
     // Set up a fake backend and cluster manager to simulate killing executors
     val rpcEnv = sc.env.rpcEnv
-    val fakeClusterManager = new FakeClusterManager(rpcEnv)
+    val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
     val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
-    val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef)
+    val fakeSchedulerBackend =
+      new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager)
     when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
 
     // Register fake executors with our fake scheduler backend
@@ -282,18 +283,16 @@ private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpo
 private class FakeSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     rpcEnv: RpcEnv,
-    clusterManagerEndpoint: RpcEndpointRef)
+    clusterManagerEndpoint: RpcEndpointRef,
+    resourceProfileManager: ResourceProfileManager)
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
   protected override def doRequestTotalExecutors(
       resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
     clusterManagerEndpoint.ask[Boolean](
-      RequestExecutors(
-        resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)),
-        numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
-        rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
-        Set.empty))
-  }
+      RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
+        rpHostToLocalTaskCount, Set.empty))
+}
 
   protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
     clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
@@ -303,7 +302,7 @@ private class FakeSchedulerBackend(
 /**
  * Dummy cluster manager to simulate responses to executor allocation requests.
  */
-private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint {
+private class FakeClusterManager(override val rpcEnv: RpcEnv, conf: SparkConf) extends RpcEndpoint {
   private var targetNumExecutors = 0
   private val executorIdsToKill = new mutable.HashSet[String]
 
@@ -311,8 +310,9 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin
   def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case RequestExecutors(requestedTotal, _, _, _) =>
-      targetNumExecutors = requestedTotal
+    case RequestExecutors(resourceProfileToTotalExecs, _, _, _) =>
+      targetNumExecutors =
+        resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf))
       context.reply(true)
     case KillExecutors(executorIds) =>
       executorIdsToKill ++= executorIds
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1e8f408..43cd745 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -593,7 +593,7 @@ private[spark] class ApplicationMaster(
           }
       }
       try {
-        val numPendingAllocate = allocator.getPendingAllocate.size
+        val numPendingAllocate = allocator.getNumContainersPendingAllocate
         var sleepStartNs = 0L
         var sleepInterval = 200L // ms
         allocatorLock.synchronized {
@@ -778,8 +778,11 @@ private[spark] class ApplicationMaster(
       case r: RequestExecutors =>
         Option(allocator) match {
           case Some(a) =>
-            if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
-              r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
+            if (a.requestTotalExecutorsWithPreferredLocalities(
+              r.resourceProfileToTotalExecs,
+              r.numLocalityAwareTasksPerResourceProfileId,
+              r.hostToLocalTaskCount,
+              r.nodeBlacklist)) {
               resetAllocatorInterval()
             }
             context.reply(true)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
index 0fec916..62ac17c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
@@ -40,11 +40,11 @@ private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: Yarn
   })
 
   metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
-    override def getValue: Int = yarnAllocator.numLocalityAwareTasks
+    override def getValue: Int = yarnAllocator.getNumLocalityAwareTasks
   })
 
   metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
-    override def getValue: Int = yarnAllocator.numContainersPendingAllocate
+    override def getValue: Int = yarnAllocator.getNumContainersPendingAllocate
   })
 
 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 2288bb5..a6380ab 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceProfile
 
 private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
 
@@ -82,7 +82,6 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
 private[yarn] class LocalityPreferredContainerPlacementStrategy(
     val sparkConf: SparkConf,
     val yarnConf: Configuration,
-    val resource: Resource,
     resolver: SparkRackResolver) {
 
   /**
@@ -96,6 +95,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
    *                                     containers
    * @param localityMatchedPendingAllocations A sequence of pending container request which
    *                                          matches the localities of current required tasks.
+   * @param rp The ResourceProfile associated with this container.
    * @return node localities and rack localities, each locality is an array of string,
    *         the length of localities is the same as number of containers
    */
@@ -104,11 +104,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
       numLocalityAwareTasks: Int,
       hostToLocalTaskCount: Map[String, Int],
       allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
-      localityMatchedPendingAllocations: Seq[ContainerRequest]
+      localityMatchedPendingAllocations: Seq[ContainerRequest],
+      rp: ResourceProfile
     ): Array[ContainerLocalityPreferences] = {
     val updatedHostToContainerCount = expectedHostToContainerCount(
       numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
-        localityMatchedPendingAllocations)
+        localityMatchedPendingAllocations, rp)
     val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
 
     // The number of containers to allocate, divided into two groups, one with preferred locality,
@@ -152,11 +153,14 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
   }
 
   /**
-   * Calculate the number of executors need to satisfy the given number of pending tasks.
+   * Calculate the number of executors needed to satisfy the given number of pending tasks for
+   * the ResourceProfile.
    */
-  private def numExecutorsPending(numTasksPending: Int): Int = {
-    val coresPerExecutor = resource.getVirtualCores
-    (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
+  private def numExecutorsPending(
+      numTasksPending: Int,
+      rp: ResourceProfile): Int = {
+    val tasksPerExec = rp.maxTasksPerExecutor(sparkConf)
+    math.ceil(numTasksPending / tasksPerExec.toDouble).toInt
   }
 
   /**
@@ -175,14 +179,15 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
       localityAwareTasks: Int,
       hostToLocalTaskCount: Map[String, Int],
       allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
-      localityMatchedPendingAllocations: Seq[ContainerRequest]
+      localityMatchedPendingAllocations: Seq[ContainerRequest],
+      rp: ResourceProfile
     ): Map[String, Int] = {
     val totalLocalTaskNum = hostToLocalTaskCount.values.sum
     val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)
 
     hostToLocalTaskCount.map { case (host, count) =>
       val expectedCount =
-        count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
+        count.toDouble * numExecutorsPending(localityAwareTasks, rp) / totalLocalTaskNum
       // Take the locality of pending containers into consideration
       val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
         pendingHostToContainersMap.getOrElse(host, 0.0)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index ae316b0..3d800be 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -227,6 +227,17 @@ private object ResourceRequestHelper extends Logging {
     resourceInformation
   }
 
+  def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
+    try {
+      // Use reflection as this uses APIs only available in Hadoop 3
+      val getResourcesMethod = resource.getClass().getMethod("getResources")
+      val resources = getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
+      if (resources.nonEmpty) true else false
+    } catch {
+      case  _: NoSuchMethodException => false
+    }
+  }
+
   /**
    * Checks whether Hadoop 2.x or 3 is used as a dependency.
    * In case of Hadoop 3 and later, the ResourceInformation class
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 09414cb..cd0e7d5 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.util.Collections
-import java.util.concurrent._
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -39,6 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -75,19 +76,69 @@ private[yarn] class YarnAllocator(
   import YarnAllocator._
 
   // Visible for testing.
-  val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
+  @GuardedBy("this")
+  val allocatedHostToContainersMapPerRPId =
+    new HashMap[Int, HashMap[String, collection.mutable.Set[ContainerId]]]
+
+  @GuardedBy("this")
   val allocatedContainerToHostMap = new HashMap[ContainerId, String]
 
   // Containers that we no longer care about. We've either already told the RM to release them or
   // will on the next heartbeat. Containers get removed from this map after the RM tells us they've
   // completed.
-  private val releasedContainers = Collections.newSetFromMap[ContainerId](
-    new ConcurrentHashMap[ContainerId, java.lang.Boolean])
+  @GuardedBy("this")
+  private val releasedContainers = collection.mutable.HashSet[ContainerId]()
+
+  @GuardedBy("this")
+  private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]()
 
-  private val runningExecutors = Collections.newSetFromMap[String](
-    new ConcurrentHashMap[String, java.lang.Boolean]())
+  @GuardedBy("this")
+  private val numExecutorsStartingPerResourceProfileId = new HashMap[Int, AtomicInteger]
 
-  private val numExecutorsStarting = new AtomicInteger(0)
+  @GuardedBy("this")
+  private val targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]
+
+  // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
+  // list of requesters that should be responded to once we find out why the given executor
+  // was lost.
+  @GuardedBy("this")
+  private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+
+  // Maintain loss reasons for already released executors, it will be added when executor loss
+  // reason is got from AM-RM call, and be removed after querying this loss reason.
+  @GuardedBy("this")
+  private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
+
+  // Keep track of which container is running which executor to remove the executors later
+  // Visible for testing.
+  @GuardedBy("this")
+  private[yarn] val executorIdToContainer = new HashMap[String, Container]
+
+  @GuardedBy("this")
+  private var numUnexpectedContainerRelease = 0L
+
+  @GuardedBy("this")
+  private val containerIdToExecutorIdAndResourceProfileId = new HashMap[ContainerId, (String, Int)]
+
+  // Use a ConcurrentHashMap because this is used in matchContainerToRequest, which is called
+  // from the rack resolver thread where synchronize(this) on this would cause a deadlock.
+  @GuardedBy("ConcurrentHashMap")
+  private[yarn] val rpIdToYarnResource = new ConcurrentHashMap[Int, Resource]()
+
+  // note currently we don't remove ResourceProfiles
+  @GuardedBy("this")
+  private[yarn] val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
+
+  // A map of ResourceProfile id to a map of preferred hostname and possible
+  // task numbers running on it.
+  @GuardedBy("this")
+  private var hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]] =
+    Map(DEFAULT_RESOURCE_PROFILE_ID -> Map.empty)
+
+  // ResourceProfile Id to number of tasks that have locality preferences in active stages
+  @GuardedBy("this")
+  private[yarn] var numLocalityAwareTasksPerResourceProfileId: Map[Int, Int] =
+    Map(DEFAULT_RESOURCE_PROFILE_ID -> 0)
 
   /**
    * Used to generate a unique ID per executor
@@ -102,6 +153,7 @@ private[yarn] class YarnAllocator(
    *
    * @see SPARK-12864
    */
+  @GuardedBy("this")
   private var executorIdCounter: Int =
     driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
 
@@ -110,26 +162,6 @@ private[yarn] class YarnAllocator(
   private val allocatorBlacklistTracker =
     new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)
 
-  @volatile private var targetNumExecutors =
-    SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
-
-
-  // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
-  // list of requesters that should be responded to once we find out why the given executor
-  // was lost.
-  private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
-
-  // Maintain loss reasons for already released executors, it will be added when executor loss
-  // reason is got from AM-RM call, and be removed after querying this loss reason.
-  private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
-
-  // Keep track of which container is running which executor to remove the executors later
-  // Visible for testing.
-  private[yarn] val executorIdToContainer = new HashMap[String, Container]
-
-  private var numUnexpectedContainerRelease = 0L
-  private val containerIdToExecutorId = new HashMap[ContainerId, String]
-
   // Executor memory in MiB.
   protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
   // Executor offHeap memory in MiB.
@@ -142,17 +174,18 @@ private[yarn] class YarnAllocator(
   } else {
     0
   }
-  // Number of cores per executor.
-  protected val executorCores = sparkConf.get(EXECUTOR_CORES)
+  // Number of cores per executor for the default profile
+  protected val defaultExecutorCores = sparkConf.get(EXECUTOR_CORES)
 
   private val executorResourceRequests =
     getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
     getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
 
-  // Resource capability requested for each executor
-  private[yarn] val resource: Resource = {
-    val resource = Resource.newInstance(
-      executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
+  // Resource capability requested for each executor for the default profile
+  private[yarn] val defaultResource: Resource = {
+    val resource: Resource = Resource.newInstance(
+      executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory,
+      defaultExecutorCores)
     ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
     logDebug(s"Created resource capability: $resource")
     resource
@@ -166,19 +199,42 @@ private[yarn] class YarnAllocator(
 
   private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
-  // A map to store preferred hostname and possible task numbers running on it.
-  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
-
-  // Number of tasks that have locality preferences in active stages
-  private[yarn] var numLocalityAwareTasks: Int = 0
-
   // A container placement strategy based on pending tasks' locality preference
   private[yarn] val containerPlacementStrategy =
-    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
+    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
+
+  // The default profile is always present so we need to initialize the datastructures keyed by
+  // ResourceProfile id to ensure its present if things start running before a request for
+  // executors could add it. This approach is easier then going and special casing everywhere.
+  private def initDefaultProfile(): Unit = synchronized {
+    allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
+      new HashMap[String, mutable.Set[ContainerId]]()
+    runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]())
+    numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0)
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
+      SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
+    rpIdToYarnResource.put(DEFAULT_RESOURCE_PROFILE_ID, defaultResource)
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
+      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+  }
+
+  initDefaultProfile()
 
-  def getNumExecutorsRunning: Int = runningExecutors.size()
+  def getNumExecutorsRunning: Int = synchronized {
+    runningExecutorsPerResourceProfileId.values.map(_.size).sum
+  }
+
+  def getNumLocalityAwareTasks: Int = synchronized {
+    numLocalityAwareTasksPerResourceProfileId.values.sum
+  }
 
-  def getNumReleasedContainers: Int = releasedContainers.size()
+  def getNumExecutorsStarting: Int = synchronized {
+    numExecutorsStartingPerResourceProfileId.values.map(_.get()).sum
+  }
+
+  def getNumReleasedContainers: Int = synchronized {
+    releasedContainers.size
+  }
 
   def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
 
@@ -186,49 +242,147 @@ private[yarn] class YarnAllocator(
 
   /**
    * A sequence of pending container requests that have not yet been fulfilled.
+   * ResourceProfile id -> pendingAllocate container request
    */
-  def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
+  def getPendingAllocate: Map[Int, Seq[ContainerRequest]] = getPendingAtLocation(ANY_HOST)
 
-  def numContainersPendingAllocate: Int = synchronized {
-    getPendingAllocate.size
+  def getNumContainersPendingAllocate: Int = synchronized {
+    getPendingAllocate.values.flatten.size
+  }
+
+  // YARN priorities are such that lower number is higher priority.
+  // We need to allocate a different priority for each ResourceProfile because YARN
+  // won't allow different container resource requirements within a Priority.
+  // We could allocate per Stage to make sure earlier stages get priority but Spark
+  // always finishes a stage before starting a later one and if we have 2 running in parallel
+  // the priority doesn't matter.
+  // We are using the ResourceProfile id as the priority.
+  private def getContainerPriority(rpId: Int): Priority = {
+    Priority.newInstance(rpId)
+  }
+
+  // The ResourceProfile id is the priority
+  private def getResourceProfileIdFromPriority(priority: Priority): Int = {
+    priority.getPriority()
+  }
+
+  private def getOrUpdateAllocatedHostToContainersMapForRPId(
+      rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] = synchronized {
+    allocatedHostToContainersMapPerRPId.getOrElseUpdate(rpId,
+      new HashMap[String, mutable.Set[ContainerId]]())
+  }
+
+  private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = synchronized {
+    runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[String]())
+  }
+
+  private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = synchronized {
+    numExecutorsStartingPerResourceProfileId.getOrElseUpdate(rpId, new AtomicInteger(0))
+  }
+
+  private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int = synchronized {
+    targetNumExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
+      SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf))
   }
 
   /**
-   * A sequence of pending container requests at the given location that have not yet been
-   * fulfilled.
+   * A sequence of pending container requests at the given location for each ResourceProfile id
+   * that have not yet been fulfilled.
    */
-  private def getPendingAtLocation(location: String): Seq[ContainerRequest] =
-    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala
-      .flatMap(_.asScala)
+  private def getPendingAtLocation(
+      location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
+    val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
+    rpIdToResourceProfile.keys.map { id =>
+      val profResource = rpIdToYarnResource.get(id)
+      val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
+        .asScala.flatMap(_.asScala)
+      allContainerRequests(id) = result
+    }
+    allContainerRequests.toMap
+  }
+
+  // if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it
+  private def createYarnResourceForResourceProfile(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = synchronized {
+    resourceProfileToTotalExecs.foreach { case (rp, num) =>
+      if (!rpIdToYarnResource.contains(rp.id)) {
+        // Start with the application or default settings
+        var heapMem = executorMemory.toLong
+        // Note we currently don't support off heap memory in ResourceProfile - SPARK-30794
+        var offHeapMem = executorOffHeapMemory.toLong
+        var overheadMem = memoryOverhead.toLong
+        var pysparkMem = pysparkWorkerMemory.toLong
+        var cores = defaultExecutorCores
+        val customResources = new mutable.HashMap[String, String]
+        // track the resource profile if not already there
+        getOrUpdateRunningExecutorForRPId(rp.id)
+        logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+        val execResources = rp.executorResources
+        execResources.foreach { case (r, execReq) =>
+          r match {
+            case ResourceProfile.MEMORY =>
+              heapMem = execReq.amount
+            case ResourceProfile.OVERHEAD_MEM =>
+              overheadMem = execReq.amount
+            case ResourceProfile.PYSPARK_MEM =>
+              pysparkMem = execReq.amount
+            case ResourceProfile.CORES =>
+              cores = execReq.amount.toInt
+            case "gpu" =>
+              customResources(YARN_GPU_RESOURCE_CONFIG) = execReq.amount.toString
+            case "fpga" =>
+              customResources(YARN_FPGA_RESOURCE_CONFIG) = execReq.amount.toString
+            case rName =>
+              customResources(rName) = execReq.amount.toString
+          }
+        }
+        val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt
+        val resource = Resource.newInstance(totalMem, cores)
+        ResourceRequestHelper.setResourceRequests(customResources.toMap, resource)
+        logDebug(s"Created resource capability: $resource")
+        rpIdToYarnResource.putIfAbsent(rp.id, resource)
+        rpIdToResourceProfile(rp.id) = rp
+      }
+    }
+  }
 
   /**
    * Request as many executors from the ResourceManager as needed to reach the desired total. If
    * the requested total is smaller than the current number of running executors, no executors will
    * be killed.
-   * @param requestedTotal total number of containers requested
-   * @param localityAwareTasks number of locality aware tasks to be used as container placement hint
-   * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
-   *                             container placement hint.
+   * @param resourceProfileToTotalExecs total number of containers requested for each
+   *                                    ResourceProfile
+   * @param numLocalityAwareTasksPerResourceProfileId number of locality aware tasks for each
+   *                                                  ResourceProfile id to be used as container
+   *                                                  placement hint.
+   * @param hostToLocalTaskCount a map of preferred hostname to possible task counts for each
+   *                             ResourceProfile id to be used as container placement hint.
    * @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers
    *                      on them. It will be used to update the application master's blacklist.
    * @return Whether the new requested total is different than the old value.
    */
   def requestTotalExecutorsWithPreferredLocalities(
-      requestedTotal: Int,
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int],
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int],
+      numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
+      hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]],
       nodeBlacklist: Set[String]): Boolean = synchronized {
-    this.numLocalityAwareTasks = localityAwareTasks
-    this.hostToLocalTaskCounts = hostToLocalTaskCount
-
-    if (requestedTotal != targetNumExecutors) {
-      logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
-      targetNumExecutors = requestedTotal
-      allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
-      true
-    } else {
-      false
+    this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
+    this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId
+
+    createYarnResourceForResourceProfile(resourceProfileToTotalExecs)
+
+    val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
+      if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
+        logInfo(s"Driver requested a total number of $numExecs executor(s) " +
+          s"for resource profile id: ${rp.id}.")
+        targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
+        allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
+        true
+      } else {
+        false
+      }
     }
+    res.exists(_ == true)
   }
 
   /**
@@ -237,8 +391,9 @@ private[yarn] class YarnAllocator(
   def killExecutor(executorId: String): Unit = synchronized {
     executorIdToContainer.get(executorId) match {
       case Some(container) if !releasedContainers.contains(container.getId) =>
+        val (_, rpId) = containerIdToExecutorIdAndResourceProfileId(container.getId)
         internalReleaseContainer(container)
-        runningExecutors.remove(executorId)
+        getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
       case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
     }
   }
@@ -267,8 +422,8 @@ private[yarn] class YarnAllocator(
         "Launching executor count: %d. Cluster resources: %s.")
         .format(
           allocatedContainers.size,
-          runningExecutors.size,
-          numExecutorsStarting.get,
+          getNumExecutorsRunning,
+          getNumExecutorsStarting,
           allocateResponse.getAvailableResources))
 
       handleAllocatedContainers(allocatedContainers.asScala)
@@ -279,108 +434,122 @@ private[yarn] class YarnAllocator(
       logDebug("Completed %d containers".format(completedContainers.size))
       processCompletedContainers(completedContainers.asScala)
       logDebug("Finished processing %d completed containers. Current running executor count: %d."
-        .format(completedContainers.size, runningExecutors.size))
+        .format(completedContainers.size, getNumExecutorsRunning))
     }
   }
 
   /**
    * Update the set of container requests that we will sync with the RM based on the number of
-   * executors we have currently running and our target number of executors.
+   * executors we have currently running and our target number of executors for each
+   * ResourceProfile.
    *
    * Visible for testing.
    */
-  def updateResourceRequests(): Unit = {
-    val pendingAllocate = getPendingAllocate
-    val numPendingAllocate = pendingAllocate.size
-    val missing = targetNumExecutors - numPendingAllocate -
-      numExecutorsStarting.get - runningExecutors.size
-    logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
-      s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
-      s"executorsStarting: ${numExecutorsStarting.get}")
-
-    // Split the pending container request into three groups: locality matched list, locality
-    // unmatched list and non-locality list. Take the locality matched container request into
-    // consideration of container placement, treat as allocated containers.
-    // For locality unmatched and locality free container requests, cancel these container
-    // requests, since required locality preference has been changed, recalculating using
-    // container placement strategy.
-    val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
-      hostToLocalTaskCounts, pendingAllocate)
-
-    if (missing > 0) {
-      if (log.isInfoEnabled()) {
-        var requestContainerMessage = s"Will request $missing executor container(s), each with " +
+  def updateResourceRequests(): Unit = synchronized {
+    val pendingAllocatePerResourceProfileId = getPendingAllocate
+    val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) =>
+      val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
+      val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size
+      val running = getOrUpdateRunningExecutorForRPId(rpId).size
+      logDebug(s"Updating resource requests for ResourceProfile id: $rpId, target: " +
+        s"$targetNum, pending: $pending, running: $running, executorsStarting: $starting")
+      (rpId, targetNum - pending - running - starting)
+    }.toMap
+
+    missingPerProfile.foreach { case (rpId, missing) =>
+      val hostToLocalTaskCount =
+        hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty)
+      val pendingAllocate = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty)
+      val numPendingAllocate = pendingAllocate.size
+      // Split the pending container request into three groups: locality matched list, locality
+      // unmatched list and non-locality list. Take the locality matched container request into
+      // consideration of container placement, treat as allocated containers.
+      // For locality unmatched and locality free container requests, cancel these container
+      // requests, since required locality preference has been changed, recalculating using
+      // container placement strategy.
+      val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
+        hostToLocalTaskCount, pendingAllocate)
+
+      if (missing > 0) {
+        val resource = rpIdToYarnResource.get(rpId)
+        if (log.isInfoEnabled()) {
+          var requestContainerMessage = s"Will request $missing executor container(s) for " +
+            s" ResourceProfile Id: $rpId, each with " +
             s"${resource.getVirtualCores} core(s) and " +
             s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
-        if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
-            executorResourceRequests.nonEmpty) {
-          requestContainerMessage ++= s" with custom resources: " + resource.toString
+          if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
+            ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
+            requestContainerMessage ++= s" with custom resources: " + resource.toString
+          }
+          logInfo(requestContainerMessage)
         }
-        logInfo(requestContainerMessage)
-      }
-
-      // cancel "stale" requests for locations that are no longer needed
-      staleRequests.foreach { stale =>
-        amClient.removeContainerRequest(stale)
-      }
-      val cancelledContainers = staleRequests.size
-      if (cancelledContainers > 0) {
-        logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
-      }
 
-      // consider the number of new containers and cancelled stale containers available
-      val availableContainers = missing + cancelledContainers
+        // cancel "stale" requests for locations that are no longer needed
+        staleRequests.foreach { stale =>
+          amClient.removeContainerRequest(stale)
+        }
+        val cancelledContainers = staleRequests.size
+        if (cancelledContainers > 0) {
+          logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
+        }
 
-      // to maximize locality, include requests with no locality preference that can be cancelled
-      val potentialContainers = availableContainers + anyHostRequests.size
+        // consider the number of new containers and cancelled stale containers available
+        val availableContainers = missing + cancelledContainers
 
-      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
-        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
-          allocatedHostToContainersMap, localRequests)
+        // to maximize locality, include requests with no locality preference that can be cancelled
+        val potentialContainers = availableContainers + anyHostRequests.size
 
-      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
-      containerLocalityPreferences.foreach {
-        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
-          newLocalityRequests += createContainerRequest(resource, nodes, racks)
-        case _ =>
-      }
+        val allocatedHostToContainer = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
+        val numLocalityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
+        val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
+          potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount,
+          allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId))
 
-      if (availableContainers >= newLocalityRequests.size) {
-        // more containers are available than needed for locality, fill in requests for any host
-        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
-          newLocalityRequests += createContainerRequest(resource, null, null)
-        }
-      } else {
-        val numToCancel = newLocalityRequests.size - availableContainers
-        // cancel some requests without locality preferences to schedule more local containers
-        anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
-          amClient.removeContainerRequest(nonLocal)
-        }
-        if (numToCancel > 0) {
-          logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
+        val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+        containerLocalityPreferences.foreach {
+          case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+            newLocalityRequests += createContainerRequest(resource, nodes, racks, rpId)
+          case _ =>
         }
-      }
 
-      newLocalityRequests.foreach { request =>
-        amClient.addContainerRequest(request)
-      }
+        if (availableContainers >= newLocalityRequests.size) {
+          // more containers are available than needed for locality, fill in requests for any host
+          for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
+            newLocalityRequests += createContainerRequest(resource, null, null, rpId)
+          }
+        } else {
+          val numToCancel = newLocalityRequests.size - availableContainers
+          // cancel some requests without locality preferences to schedule more local containers
+          anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
+            amClient.removeContainerRequest(nonLocal)
+          }
+          if (numToCancel > 0) {
+            logInfo(s"Canceled $numToCancel unlocalized container requests to " +
+              s"resubmit with locality")
+          }
+        }
 
-      if (log.isInfoEnabled()) {
-        val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
-        if (anyHost.nonEmpty) {
-          logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+        newLocalityRequests.foreach { request =>
+          amClient.addContainerRequest(request)
         }
-        localized.foreach { request =>
-          logInfo(s"Submitted container request for host ${hostStr(request)}.")
+
+        if (log.isInfoEnabled()) {
+          val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
+          if (anyHost.nonEmpty) {
+            logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+          }
+          localized.foreach { request =>
+            logInfo(s"Submitted container request for host ${hostStr(request)}.")
+          }
         }
+      } else if (numPendingAllocate > 0 && missing < 0) {
+        val numToCancel = math.min(numPendingAllocate, -missing)
+        logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new " +
+          s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)} executors.")
+        // cancel pending allocate requests by taking locality preference into account
+        val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
+        cancelRequests.foreach(amClient.removeContainerRequest)
       }
-    } else if (numPendingAllocate > 0 && missing < 0) {
-      val numToCancel = math.min(numPendingAllocate, -missing)
-      logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
-        s"total $targetNumExecutors executors.")
-      // cancel pending allocate requests by taking locality preference into account
-      val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
-      cancelRequests.foreach(amClient.removeContainerRequest)
     }
   }
 
@@ -405,8 +574,10 @@ private[yarn] class YarnAllocator(
   private def createContainerRequest(
       resource: Resource,
       nodes: Array[String],
-      racks: Array[String]): ContainerRequest = {
-    new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull)
+      racks: Array[String],
+      rpId: Int): ContainerRequest = {
+    new ContainerRequest(resource, nodes, racks, getContainerPriority(rpId),
+      true, labelExpression.orNull)
   }
 
   /**
@@ -499,20 +670,17 @@ private[yarn] class YarnAllocator(
       location: String,
       containersToUse: ArrayBuffer[Container],
       remaining: ArrayBuffer[Container]): Unit = {
-    // SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
-    // request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
-    // memory, but use the asked vcore count for matching, effectively disabling matching on vcore
-    // count.
-    val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
-      resource.getVirtualCores)
-
-    ResourceRequestHelper.setResourceRequests(executorResourceRequests, matchingResource)
+    // Match on the exact resource we requested so there shouldn't be a mismatch,
+    // we are relying on YARN to return a container with resources no less then we requested.
+    // If we change this, or starting validating the container, be sure the logic covers SPARK-6050.
+    val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
+    val resourceForRP = rpIdToYarnResource.get(rpId)
 
     logDebug(s"Calling amClient.getMatchingRequests with parameters: " +
         s"priority: ${allocatedContainer.getPriority}, " +
-        s"location: $location, resource: $matchingResource")
+        s"location: $location, resource: $resourceForRP")
     val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
-      matchingResource)
+      resourceForRP)
 
     // Match the allocation to a request
     if (!matchingRequests.isEmpty) {
@@ -528,30 +696,38 @@ private[yarn] class YarnAllocator(
   /**
    * Launches executors in the allocated containers.
    */
-  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
+  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
     for (container <- containersToUse) {
+      val rpId = getResourceProfileIdFromPriority(container.getPriority)
       executorIdCounter += 1
       val executorHostname = container.getNodeId.getHost
       val containerId = container.getId
       val executorId = executorIdCounter.toString
-      assert(container.getResource.getMemory >= resource.getMemory)
+      val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
+      assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
       logInfo(s"Launching container $containerId on host $executorHostname " +
-        s"for executor with ID $executorId")
+        s"for executor with ID $executorId for ResourceProfile Id $rpId")
 
       def updateInternalState(): Unit = synchronized {
-        runningExecutors.add(executorId)
-        numExecutorsStarting.decrementAndGet()
+        getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
+        getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
         executorIdToContainer(executorId) = container
-        containerIdToExecutorId(container.getId) = executorId
+        containerIdToExecutorIdAndResourceProfileId(container.getId) = (executorId, rpId)
 
-        val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+        val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
+        val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname,
           new HashSet[ContainerId])
         containerSet += containerId
         allocatedContainerToHostMap.put(containerId, executorHostname)
       }
 
-      if (runningExecutors.size() < targetNumExecutors) {
-        numExecutorsStarting.incrementAndGet()
+      val rp = rpIdToResourceProfile(rpId)
+      val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
+        map(_.amount.toInt).getOrElse(executorMemory)
+      val containerCores = rp.getExecutorCores.getOrElse(defaultExecutorCores)
+      val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
+      if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
+        getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
         if (launchContainers) {
           launcherPool.execute(() => {
             try {
@@ -562,17 +738,17 @@ private[yarn] class YarnAllocator(
                 driverUrl,
                 executorId,
                 executorHostname,
-                executorMemory,
-                executorCores,
+                containerMem,
+                containerCores,
                 appAttemptId.getApplicationId.toString,
                 securityMgr,
                 localResources,
-                ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
+                rp.id
               ).run()
               updateInternalState()
             } catch {
               case e: Throwable =>
-                numExecutorsStarting.decrementAndGet()
+                getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
                 if (NonFatal(e)) {
                   logError(s"Failed to launch executor $executorId on container $containerId", e)
                   // Assigned container should be released immediately
@@ -589,24 +765,28 @@ private[yarn] class YarnAllocator(
         }
       } else {
         logInfo(("Skip launching executorRunnable as running executors count: %d " +
-          "reached target executors count: %d.").format(
-          runningExecutors.size, targetNumExecutors))
+          "reached target executors count: %d.").format(rpRunningExecs,
+          getOrUpdateTargetNumExecutorsForRPId(rpId)))
       }
     }
   }
 
   // Visible for testing.
-  private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+  private[yarn] def processCompletedContainers(
+      completedContainers: Seq[ContainerStatus]): Unit = synchronized {
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
+      val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId,
+        ("", DEFAULT_RESOURCE_PROFILE_ID))
       val alreadyReleased = releasedContainers.remove(containerId)
       val hostOpt = allocatedContainerToHostMap.get(containerId)
       val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
       val exitReason = if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of allocating.
-        containerIdToExecutorId.get(containerId) match {
-          case Some(executorId) => runningExecutors.remove(executorId)
+        containerIdToExecutorIdAndResourceProfileId.get(containerId) match {
+          case Some((executorId, _)) =>
+            getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
           case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
         }
 
@@ -679,19 +859,19 @@ private[yarn] class YarnAllocator(
 
       for {
         host <- hostOpt
-        containerSet <- allocatedHostToContainersMap.get(host)
+        containerSet <- getOrUpdateAllocatedHostToContainersMapForRPId(rpId).get(host)
       } {
         containerSet.remove(containerId)
         if (containerSet.isEmpty) {
-          allocatedHostToContainersMap.remove(host)
+          getOrUpdateAllocatedHostToContainersMapForRPId(rpId).remove(host)
         } else {
-          allocatedHostToContainersMap.update(host, containerSet)
+          getOrUpdateAllocatedHostToContainersMapForRPId(rpId).update(host, containerSet)
         }
 
         allocatedContainerToHostMap.remove(containerId)
       }
 
-      containerIdToExecutorId.remove(containerId).foreach { eid =>
+      containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
         executorIdToContainer.remove(eid)
         pendingLossReasonRequests.remove(eid) match {
           case Some(pendingRequests) =>
@@ -737,12 +917,14 @@ private[yarn] class YarnAllocator(
     }
   }
 
-  private def internalReleaseContainer(container: Container): Unit = {
+  private def internalReleaseContainer(container: Container): Unit = synchronized {
     releasedContainers.add(container.getId())
     amClient.releaseAssignedContainer(container.getId())
   }
 
-  private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+  private[yarn] def getNumUnexpectedContainerRelease: Long = synchronized {
+    numUnexpectedContainerRelease
+  }
 
   private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
     pendingLossReasonRequests.size
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index f8bbc39..e428bab 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -130,12 +130,8 @@ private[spark] abstract class YarnSchedulerBackend(
     val filteredRPHostToLocalTaskCount = rpHostToLocalTaskCount.map { case (rpid, v) =>
       (rpid, v.filter { case (host, count) => !nodeBlacklist.contains(host) })
     }
-    // TODO - default everything to default profile until YARN pieces
-    val defaultProf = ResourceProfile.getOrCreateDefaultProfile(conf)
-    val hostToLocalTaskCount = filteredRPHostToLocalTaskCount.getOrElse(defaultProf.id, Map.empty)
-    val localityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(defaultProf.id, 0)
-    val numExecutors = resourceProfileToTotalExecs.getOrElse(defaultProf, 0)
-    RequestExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)
+    RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
+      filteredRPHostToLocalTaskCount, nodeBlacklist)
   }
 
   /**
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
index 29f1c05..d83a0d2 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.deploy.yarn
 
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.resource.ResourceProfile
 
 class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
 
@@ -28,7 +31,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
   import yarnAllocatorSuite._
 
   def createContainerRequest(nodes: Array[String]): ContainerRequest =
-    new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+    new ContainerRequest(containerResource, nodes, null, Priority.newInstance(1))
 
   override def beforeEach(): Unit = {
     yarnAllocatorSuite.beforeEach()
@@ -38,18 +41,22 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
     yarnAllocatorSuite.afterEach()
   }
 
+  val defaultResourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+
   test("allocate locality preferred containers with enough resource and no matched existed " +
     "containers") {
     // 1. All the locations of current containers cannot satisfy the new requirements
     // 2. Current requested container number can fully satisfy the pending tasks.
 
-    val handler = createAllocator(2)
+    val (handler, allocatorConf) = createAllocator(2)
     handler.updateResourceRequests()
     handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
 
+    ResourceProfile.clearDefaultProfile
+    val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
     val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
       3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
+      handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
 
     assert(localities.map(_.nodes) === Array(
       Array("host3", "host4", "host5"),
@@ -62,7 +69,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
     // 1. Parts of current containers' locations can satisfy the new requirements
     // 2. Current requested container number can fully satisfy the pending tasks.
 
-    val handler = createAllocator(3)
+    val (handler, allocatorConf) = createAllocator(3)
     handler.updateResourceRequests()
     handler.handleAllocatedContainers(Array(
       createContainer("host1"),
@@ -70,9 +77,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
       createContainer("host2")
     ))
 
+    ResourceProfile.clearDefaultProfile
+    val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
+
     val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
       3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
+      handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
 
     assert(localities.map(_.nodes) ===
       Array(null, Array("host2", "host3"), Array("host2", "host3")))
@@ -83,7 +93,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
     // 1. Parts of current containers' locations can satisfy the new requirements
     // 2. Current requested container number cannot fully satisfy the pending tasks.
 
-    val handler = createAllocator(3)
+    val (handler, allocatorConf) = createAllocator(3)
     handler.updateResourceRequests()
     handler.handleAllocatedContainers(Array(
       createContainer("host1"),
@@ -91,9 +101,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
       createContainer("host2")
     ))
 
+    ResourceProfile.clearDefaultProfile
+    val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
     val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
       1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
+      handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
 
     assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
   }
@@ -101,7 +113,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
   test("allocate locality preferred containers with fully matched containers") {
     // Current containers' locations can fully satisfy the new requirements
 
-    val handler = createAllocator(5)
+    val (handler, allocatorConf) = createAllocator(5)
     handler.updateResourceRequests()
     handler.handleAllocatedContainers(Array(
       createContainer("host1"),
@@ -111,9 +123,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
       createContainer("host3")
     ))
 
+    ResourceProfile.clearDefaultProfile
+    val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
     val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
       3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
+      handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
 
     assert(localities.map(_.nodes) === Array(null, null, null))
   }
@@ -121,18 +135,21 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
   test("allocate containers with no locality preference") {
     // Request new container without locality preference
 
-    val handler = createAllocator(2)
+    val (handler, allocatorConf) = createAllocator(2)
     handler.updateResourceRequests()
     handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
 
+    ResourceProfile.clearDefaultProfile
+    val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
     val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
+      1, 0, Map.empty,
+      handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
 
     assert(localities.map(_.nodes) === Array(null))
   }
 
   test("allocate locality preferred containers by considering the localities of pending requests") {
-    val handler = createAllocator(3)
+    val (handler, allocatorConf) = createAllocator(3)
     handler.updateResourceRequests()
     handler.handleAllocatedContainers(Array(
       createContainer("host1"),
@@ -144,9 +161,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
       createContainerRequest(Array("host2", "host3")),
       createContainerRequest(Array("host1", "host4")))
 
+    ResourceProfile.clearDefaultProfile
+    val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
     val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
       1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, pendingAllocationRequests)
+      handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
+      pendingAllocationRequests, rp)
 
     assert(localities.map(_.nodes) === Array(Array("host3")))
   }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
index b7f2565..7278517 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Mockito._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.resource.ResourceProfile
 
 class LocalityPlacementStrategySuite extends SparkFunSuite {
 
@@ -58,7 +59,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
 
     val resource = Resource.newInstance(8 * 1024, 4)
     val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
-      yarnConf, resource, new MockResolver())
+      yarnConf, new MockResolver())
 
     val totalTasks = 32 * 1024
     val totalContainers = totalTasks / 16
@@ -75,9 +76,10 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
       containers.drop(count * i).take(i).foreach { c => hostContainers += c }
       hostToContainerMap(host) = hostContainers
     }
+    val rp = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
 
     strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
-      hostToContainerMap, Nil)
+      hostToContainerMap, Nil, rp)
   }
 
 }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6216d47..2003d0b 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
 import java.util.Collections
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
@@ -32,9 +33,9 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.ResourceRequestHelper._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU}
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEndpointRef
@@ -69,6 +70,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   var containerNum = 0
 
+  // priority has to be 0 to match default profile id
+  val RM_REQUEST_PRIORITY = Priority.newInstance(0)
+  val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+  val defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     rmClient = AMRMClient.createAMRMClient()
@@ -93,7 +99,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
   def createAllocator(
       maxExecutors: Int = 5,
       rmClient: AMRMClient[ContainerRequest] = rmClient,
-      additionalConfigs: Map[String, String] = Map()): YarnAllocator = {
+      additionalConfigs: Map[String, String] = Map()): (YarnAllocator, SparkConf) = {
     val args = Array(
       "--jar", "somejar.jar",
       "--class", "SomeClass")
@@ -107,7 +113,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConfClone.set(name, value)
     }
 
-    new YarnAllocator(
+    val allocator = new YarnAllocator(
       "not used",
       mock(classOf[RpcEndpointRef]),
       conf,
@@ -118,16 +124,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       Map(),
       new MockResolver(),
       clock)
+    (allocator, sparkConfClone)
   }
 
   def createContainer(
       host: String,
       containerNumber: Int = containerNum,
-      resource: Resource = containerResource): Container = {
+      resource: Resource = containerResource,
+      priority: Priority = RM_REQUEST_PRIORITY): Container = {
     val  containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum)
     containerNum += 1
     val nodeId = NodeId.newInstance(host, 1000)
-    Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
+    Container.newInstance(containerId, nodeId, "", resource, priority, null)
   }
 
   def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = {
@@ -145,20 +153,108 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   test("single container allocated") {
     // request a single container and receive it
-    val handler = createAllocator(1)
+    val (handler, _) = createAllocator(1)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (1)
+    handler.getNumContainersPendingAllocate should be (1)
 
     val container = createContainer("host1")
     handler.handleAllocatedContainers(Array(container))
 
     handler.getNumExecutorsRunning should be (1)
     handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+    hostTocontainer.get("host1").get should contain(container.getId)
+
+    val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
+    size should be (0)
+  }
+
+  test("single container allocated with ResourceProfile") {
+    assume(isYarnResourceTypesAvailable())
+    val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG)
+    ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
+    // create default profile so we get a different id to test below
+    val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val execReq = new ExecutorResourceRequests().resource("gpu", 6)
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
+    // request a single container and receive it
+    val (handler, _) = createAllocator(0)
+
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
+
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumContainersPendingAllocate should be (1)
+
+    val container = createContainer("host1", priority = Priority.newInstance(rprof.id))
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
+    hostTocontainer.get("host1").get should contain(container.getId)
 
     val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
     size should be (0)
+
+    ResourceProfile.reInitDefaultProfile(sparkConf)
+  }
+
+  test("multiple containers allocated with ResourceProfiles") {
+    assume(isYarnResourceTypesAvailable())
+    val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG)
+    ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
+    // create default profile so we get a different id to test below
+    val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val execReq = new ExecutorResourceRequests().resource("gpu", 6)
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
+
+    val execReq2 = new ExecutorResourceRequests().memory("8g").resource("fpga", 2)
+    val taskReq2 = new TaskResourceRequests().resource("fpga", 1)
+    val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests)
+
+
+    // request a single container and receive it
+    val (handler, _) = createAllocator(1)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1, rprof2 -> 2)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0, rprof2.id -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
+
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumContainersPendingAllocate should be (3)
+
+    val containerResourcerp2 = Resource.newInstance(10240, 5)
+
+    val container = createContainer("host1", priority = Priority.newInstance(rprof.id))
+    val container2 = createContainer("host2", resource = containerResourcerp2,
+      priority = Priority.newInstance(rprof2.id))
+    val container3 = createContainer("host3", resource = containerResourcerp2,
+      priority = Priority.newInstance(rprof2.id))
+    handler.handleAllocatedContainers(Array(container, container2, container3))
+
+    handler.getNumExecutorsRunning should be (3)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+    handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
+    handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host3")
+
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
+    hostTocontainer.get("host1").get should contain(container.getId)
+    val hostTocontainer2 = handler.allocatedHostToContainersMapPerRPId(rprof2.id)
+    hostTocontainer2.get("host2").get should contain(container2.getId)
+    hostTocontainer2.get("host3").get should contain(container3.getId)
+
+    val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
+    size should be (0)
+
+    ResourceProfile.reInitDefaultProfile(sparkConf)
   }
 
   test("custom resource requested from yarn") {
@@ -166,16 +262,16 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
 
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
-    val handler = createAllocator(1, mockAmClient,
+    val (handler, _) = createAllocator(1, mockAmClient,
       Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
 
     handler.updateResourceRequests()
-    val container = createContainer("host1", resource = handler.resource)
+    val container = createContainer("host1", resource = handler.defaultResource)
     handler.handleAllocatedContainers(Array(container))
 
     // get amount of memory and vcores from resource, so effectively skipping their validation
-    val expectedResources = Resource.newInstance(handler.resource.getMemory(),
-      handler.resource.getVirtualCores)
+    val expectedResources = Resource.newInstance(handler.defaultResource.getMemory(),
+      handler.defaultResource.getVirtualCores)
     setResourceRequests(Map("gpu" -> "2G"), expectedResources)
     val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
 
@@ -195,10 +291,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       Map(EXECUTOR_GPU_ID.amountConf -> "3",
         EXECUTOR_FPGA_ID.amountConf -> "2",
         madeupConfigName -> "5")
-    val handler = createAllocator(1, mockAmClient, sparkResources)
+    val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
 
     handler.updateResourceRequests()
-    val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource)
+    val yarnRInfo = ResourceRequestTestHelper.getResources(handler.defaultResource)
     val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap
     assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
     assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
@@ -210,17 +306,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   test("container should not be created if requested number if met") {
     // request a single container and receive it
-    val handler = createAllocator(1)
+    val (handler, _) = createAllocator(1)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (1)
+    handler.getNumContainersPendingAllocate should be (1)
 
     val container = createContainer("host1")
     handler.handleAllocatedContainers(Array(container))
 
     handler.getNumExecutorsRunning should be (1)
     handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+    hostTocontainer.get("host1").get should contain(container.getId)
 
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container2))
@@ -229,10 +326,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   test("some containers allocated") {
     // request a few containers and receive some of them
-    val handler = createAllocator(4)
+    val (handler, _) = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
+    handler.getNumContainersPendingAllocate should be (4)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host1")
@@ -243,16 +340,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
     handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
     handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
-    handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+    hostTocontainer.get("host1").get should contain(container1.getId)
+    hostTocontainer.get("host1").get should contain (container2.getId)
+    hostTocontainer.get("host2").get should contain (container3.getId)
   }
 
   test("receive more containers than requested") {
-    val handler = createAllocator(2)
+    val (handler, _) = createAllocator(2)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (2)
+    handler.getNumContainersPendingAllocate should be (2)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
@@ -263,42 +361,52 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
     handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
     handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
-    handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
-    handler.allocatedHostToContainersMap.contains("host4") should be (false)
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+    hostTocontainer.get("host1").get should contain(container1.getId)
+    hostTocontainer.get("host2").get should contain (container2.getId)
+    hostTocontainer.contains("host4") should be (false)
   }
 
   test("decrease total requested executors") {
-    val handler = createAllocator(4)
+    val (handler, _) = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
+    handler.getNumContainersPendingAllocate should be (4)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
     handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (3)
+    handler.getNumContainersPendingAllocate should be (3)
 
     val container = createContainer("host1")
     handler.handleAllocatedContainers(Array(container))
 
     handler.getNumExecutorsRunning should be (1)
     handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
+    hostTocontainer.get("host1").get should contain(container.getId)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
+    resourceProfileToTotalExecs(defaultRP) = 2
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
     handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (1)
+    handler.getNumContainersPendingAllocate should be (1)
   }
 
   test("decrease total requested executors to less than currently running") {
-    val handler = createAllocator(4)
+    val (handler, _) = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
+    handler.getNumContainersPendingAllocate should be (4)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
     handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (3)
+    handler.getNumContainersPendingAllocate should be (3)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
@@ -306,23 +414,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
     handler.getNumExecutorsRunning should be (2)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
+    resourceProfileToTotalExecs(defaultRP) = 1
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
     handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (0)
+    handler.getNumContainersPendingAllocate should be (0)
     handler.getNumExecutorsRunning should be (2)
   }
 
   test("kill executors") {
-    val handler = createAllocator(4)
+    val (handler, _) = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
+    handler.getNumContainersPendingAllocate should be (4)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
 
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
     handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
 
     val statuses = Seq(container1, container2).map { c =>
@@ -331,20 +444,20 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.updateResourceRequests()
     handler.processCompletedContainers(statuses)
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (1)
+    handler.getNumContainersPendingAllocate should be (1)
   }
 
   test("kill same executor multiple times") {
-    val handler = createAllocator(2)
+    val (handler, _) = createAllocator(2)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (2)
+    handler.getNumContainersPendingAllocate should be (2)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
     handler.getNumExecutorsRunning should be (2)
-    handler.getPendingAllocate.size should be (0)
+    handler.getNumContainersPendingAllocate should be (0)
 
     val executorToKill = handler.executorIdToContainer.keys.head
     handler.killExecutor(executorToKill)
@@ -353,22 +466,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.killExecutor(executorToKill)
     handler.killExecutor(executorToKill)
     handler.getNumExecutorsRunning should be (1)
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
     handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (1)
+    handler.getNumContainersPendingAllocate should be (1)
   }
 
   test("process same completed container multiple times") {
-    val handler = createAllocator(2)
+    val (handler, _) = createAllocator(2)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (2)
+    handler.getNumContainersPendingAllocate should be (2)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
     handler.getNumExecutorsRunning should be (2)
-    handler.getPendingAllocate.size should be (0)
+    handler.getNumContainersPendingAllocate should be (0)
 
     val statuses = Seq(container1, container1, container2).map { c =>
       ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
@@ -379,16 +495,19 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
   }
 
   test("lost executor removed from backend") {
-    val handler = createAllocator(4)
+    val (handler, _) = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
+    handler.getNumContainersPendingAllocate should be (4)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
 
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set.empty)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
 
     val statuses = Seq(container1, container2).map { c =>
       ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
@@ -397,7 +516,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.processCompletedContainers(statuses)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (2)
+    handler.getNumContainersPendingAllocate should be (2)
     handler.getNumExecutorsFailed should be (2)
     handler.getNumUnexpectedContainerRelease should be (2)
   }
@@ -406,28 +525,35 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     // Internally we track the set of blacklisted nodes, but yarn wants us to send *changes*
     // to the blacklist.  This makes sure we are sending the right updates.
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
-    val handler = createAllocator(4, mockAmClient)
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA"))
+    val (handler, _) = createAllocator(4, mockAmClient)
+    val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1)
+    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set("hostA"))
     verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava)
 
     val blacklistedNodes = Set(
       "hostA",
       "hostB"
     )
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), blacklistedNodes)
-    verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set.empty)
+    resourceProfileToTotalExecs(defaultRP) = 2
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map(), blacklistedNodes)
+    verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava)
+    resourceProfileToTotalExecs(defaultRP) = 3
+    handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+      numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
     verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
   }
 
   test("window based failure executor counting") {
     sparkConf.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100 * 1000L)
-    val handler = createAllocator(4)
+    val (handler, _) = createAllocator(4)
 
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
+    handler.getNumContainersPendingAllocate should be (4)
 
     val containers = Seq(
       createContainer("host1"),
@@ -468,7 +594,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     val rmClientSpy = spy(rmClient)
     val maxExecutors = 11
 
-    val handler = createAllocator(
+    val (handler, _) = createAllocator(
       maxExecutors,
       rmClientSpy,
       Map(
@@ -525,9 +651,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     try {
       sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
       sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
-      val allocator = createAllocator(maxExecutors = 1,
+      val (handler, _) = createAllocator(maxExecutors = 1,
         additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
-      val memory = allocator.resource.getMemory
+      val memory = handler.defaultResource.getMemory
       assert(memory ==
         executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
     } finally {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index c0c6fff..9003c2f 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -51,9 +51,8 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
 
   private class TestYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
       extends YarnSchedulerBackend(scheduler, sc) {
-    def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
-      this.rpHostToLocalTaskCount = Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
-        hostToLocalTaskCount)
+    def setHostToLocalTaskCount(hostToLocalTaskCount: Map[Int, Map[String, Int]]): Unit = {
+      this.rpHostToLocalTaskCount = hostToLocalTaskCount
     }
   }
 
@@ -64,21 +63,24 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
     val yarnSchedulerBackendExtended = new TestYarnSchedulerBackend(sched, sc)
     yarnSchedulerBackend = yarnSchedulerBackendExtended
     val ser = new JavaSerializer(sc.conf).newInstance()
+    val defaultResourceProf = ResourceProfile.getOrCreateDefaultProfile(sc.getConf)
     for {
       blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
       numRequested <- 0 until 10
       hostToLocalCount <- IndexedSeq(
-        Map[String, Int](),
-        Map("a" -> 1, "b" -> 2)
+        Map(defaultResourceProf.id -> Map.empty[String, Int]),
+        Map(defaultResourceProf.id -> Map("a" -> 1, "b" -> 2))
       )
     } {
       yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
       sched.setNodeBlacklist(blacklist)
-      val numReq = Map(ResourceProfile.getOrCreateDefaultProfile(sc.getConf) -> numRequested)
-      val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numReq)
-      assert(req.requestedTotal === numRequested)
+      val request = Map(defaultResourceProf -> numRequested)
+      val req = yarnSchedulerBackendExtended.prepareRequestExecutors(request)
+      assert(req.resourceProfileToTotalExecs(defaultResourceProf) === numRequested)
       assert(req.nodeBlacklist === blacklist)
-      assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
+      val hosts =
+        req.hostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID).keySet
+      assert(hosts.intersect(blacklist).isEmpty)
       // Serialize to make sure serialization doesn't throw an error
       ser.serialize(req)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org