You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/11/13 22:05:44 UTC

[spark] branch master updated: [SPARK-33288][SPARK-32661][K8S] Stage level scheduling support for Kubernetes

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new acfd846  [SPARK-33288][SPARK-32661][K8S] Stage level scheduling support for Kubernetes
acfd846 is described below

commit acfd8467534fbf58c12e9f2d993b7d135fb8d32b
Author: Thomas Graves <tg...@apache.org>
AuthorDate: Fri Nov 13 16:04:13 2020 -0600

    [SPARK-33288][SPARK-32661][K8S] Stage level scheduling support for Kubernetes
    
    ### What changes were proposed in this pull request?
    
    This adds support for Stage level scheduling to kubernetes. Kubernetes can support dynamic allocation via the shuffle tracking option which means we can support stage level scheduling by getting new executors.
    The main changes here are having the k8s cluster manager pass the resource profile id into the executors and then the ExecutorsPodsAllocator has to request executors based on the individual resource profiles.  I tried to keep code changes here to a minimum. I specifically choose to leave the ExecutorPodsSnapshot the way it was and construct the resource profile to pod states on the fly, with a fast path when not using other resource profiles, to keep the impact to a minimum.  This res [...]
    
    This also adds support for [SPARK-32661]Spark executors on K8S should request extra memory for off-heap allocations because the stage level scheduling api has support for this and it made sense to make consistent with YARN.  This was started with PR https://github.com/apache/spark/pull/29477 but never updated so I just did it here.   To do this I moved a few functions around that were now used by both YARN and kubernetes so you will see some changes in Utils.
    
    ### Why are the changes needed?
    
    Add the feature to Kubernetes based on customer feedback.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes the feature now works with K8s, but not underlying API changes.
    
    ### How was this patch tested?
    
    Tested manually on kubernetes cluster and with unit tests.
    
    Closes #30204 from tgravescs/stagek8sOrigSnapshotsRebase.
    
    Lead-authored-by: Thomas Graves <tg...@apache.org>
    Co-authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/resource/ResourceProfile.scala    | 133 +++++++++-
 .../spark/resource/ResourceProfileManager.scala    |  21 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  21 ++
 .../resource/ResourceProfileManagerSuite.scala     |  27 +-
 .../spark/resource/ResourceProfileSuite.scala      |   8 +-
 .../scala/org/apache/spark/util/UtilsSuite.scala   |  27 ++
 docs/configuration.md                              |   2 +-
 docs/running-on-kubernetes.md                      |   4 +
 docs/running-on-yarn.md                            |   1 +
 .../org/apache/spark/deploy/k8s/Constants.scala    |   3 +-
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |  12 +-
 .../k8s/features/BasicDriverFeatureStep.scala      |   4 +-
 .../k8s/features/BasicExecutorFeatureStep.scala    |  89 ++++---
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 285 ++++++++++++---------
 .../k8s/KubernetesClusterSchedulerBackend.scala    |   5 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala    |   8 +-
 .../spark/deploy/k8s/KubernetesConfSuite.scala     |  15 +-
 .../k8s/features/BasicDriverFeatureStepSuite.scala |   5 +-
 .../features/BasicExecutorFeatureStepSuite.scala   | 106 ++++++--
 .../cluster/k8s/ExecutorLifecycleTestUtils.scala   |  40 +--
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 109 +++++++-
 .../KubernetesClusterSchedulerBackendSuite.scala   |   5 +-
 .../k8s/KubernetesExecutorBuilderSuite.scala       |   4 +-
 .../src/main/dockerfiles/spark/entrypoint.sh       |   1 +
 .../org/apache/spark/deploy/yarn/Client.scala      |   9 +-
 .../spark/deploy/yarn/ResourceRequestHelper.scala  |   4 +-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 136 +++++-----
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    |  24 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     |  20 +-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala     |  27 --
 30 files changed, 772 insertions(+), 383 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 8a37670..ac7e8e8 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -29,6 +29,7 @@ import org.apache.spark.annotation.{Evolving, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
+import org.apache.spark.util.Utils
 
 /**
  * Resource profile to associate with an RDD. A ResourceProfile allows the user to
@@ -256,6 +257,8 @@ object ResourceProfile extends Logging {
   val UNKNOWN_RESOURCE_PROFILE_ID = -1
   val DEFAULT_RESOURCE_PROFILE_ID = 0
 
+  private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
+
   private lazy val nextProfileId = new AtomicInteger(0)
   private val DEFAULT_PROFILE_LOCK = new Object()
 
@@ -263,6 +266,7 @@ object ResourceProfile extends Logging {
   // var so that it can be reset for testing purposes.
   @GuardedBy("DEFAULT_PROFILE_LOCK")
   private var defaultProfile: Option[ResourceProfile] = None
+  private var defaultProfileExecutorResources: Option[DefaultProfileExecutorResources] = None
 
   private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement()
 
@@ -284,6 +288,14 @@ object ResourceProfile extends Logging {
     }
   }
 
+  private[spark] def getDefaultProfileExecutorResources(
+      conf: SparkConf): DefaultProfileExecutorResources = {
+    defaultProfileExecutorResources.getOrElse {
+      getOrCreateDefaultProfile(conf)
+      defaultProfileExecutorResources.get
+    }
+  }
+
   private def getDefaultTaskResources(conf: SparkConf): Map[String, TaskResourceRequest] = {
     val cpusPerTask = conf.get(CPUS_PER_TASK)
     val treqs = new TaskResourceRequests().cpus(cpusPerTask)
@@ -293,20 +305,26 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    ereqs.cores(conf.get(EXECUTOR_CORES))
-    ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
-    conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString))
-    conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString))
-    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
-      // Explicitly add suffix b as default unit of offHeapMemory is Mib
-      ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b")
-    }
+    val cores = conf.get(EXECUTOR_CORES)
+    ereqs.cores(cores)
+    val memory = conf.get(EXECUTOR_MEMORY)
+    ereqs.memory(memory.toString)
+    val overheadMem = conf.get(EXECUTOR_MEMORY_OVERHEAD)
+    overheadMem.map(mem => ereqs.memoryOverhead(mem.toString))
+    val pysparkMem = conf.get(PYSPARK_EXECUTOR_MEMORY)
+    pysparkMem.map(mem => ereqs.pysparkMemory(mem.toString))
+    val offheapMem = Utils.executorOffHeapMemorySizeAsMb(conf)
+    ereqs.offHeapMemory(offheapMem.toString)
     val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
     execReq.foreach { req =>
-      val name = req.id.resourceName
-      ereqs.resource(name, req.amount, req.discoveryScript.orElse(""),
+      ereqs.resource(req.id.resourceName, req.amount, req.discoveryScript.orElse(""),
         req.vendor.orElse(""))
     }
+    val customResourceNames = execReq.map(_.id.resourceName).toSet
+    val customResources = ereqs.requests.filter(v => customResourceNames.contains(v._1))
+    defaultProfileExecutorResources =
+      Some(DefaultProfileExecutorResources(cores, memory, offheapMem, pysparkMem,
+        overheadMem, customResources))
     ereqs.requests
   }
 
@@ -320,6 +338,7 @@ object ResourceProfile extends Logging {
   private[spark] def clearDefaultProfile(): Unit = {
     DEFAULT_PROFILE_LOCK.synchronized {
       defaultProfile = None
+      defaultProfileExecutorResources = None
     }
   }
 
@@ -342,6 +361,100 @@ object ResourceProfile extends Logging {
     rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
   }
 
+  /**
+   * Get offHeap memory size from [[ExecutorResourceRequest]]
+   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
+      execRequest: ExecutorResourceRequest): Long = {
+    Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
+  }
+
+  private[spark] case class ExecutorResourcesOrDefaults(
+      cores: Int,
+      executorMemoryMiB: Long,
+      memoryOffHeapMiB: Long,
+      pysparkMemoryMiB: Long,
+      memoryOverheadMiB: Long,
+      totalMemMiB: Long,
+      customResources: Map[String, ExecutorResourceRequest])
+
+  private[spark] case class DefaultProfileExecutorResources(
+      cores: Int,
+      executorMemoryMiB: Long,
+      memoryOffHeapMiB: Long,
+      pysparkMemoryMiB: Option[Long],
+      memoryOverheadMiB: Option[Long],
+      customResources: Map[String, ExecutorResourceRequest])
+
+  private[spark] def calculateOverHeadMemory(
+      overHeadMemFromConf: Option[Long],
+      executorMemoryMiB: Long,
+      overheadFactor: Double): Long = {
+    overHeadMemFromConf.getOrElse(math.max((overheadFactor * executorMemoryMiB).toInt,
+        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
+  }
+
+  /**
+   * Gets the full list of resources to allow a cluster manager to request the appropriate
+   * container. If the resource profile is not the default one we either get the resources
+   * specified in the profile or fall back to the default profile resource size for everything
+   * except for custom resources.
+   */
+  private[spark] def getResourcesForClusterManager(
+      rpId: Int,
+      execResources: Map[String, ExecutorResourceRequest],
+      overheadFactor: Double,
+      conf: SparkConf,
+      isPythonApp: Boolean,
+      resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults = {
+    val defaultResources = getDefaultProfileExecutorResources(conf)
+    // set all the default values, which may change for custom ResourceProfiles
+    var cores = defaultResources.cores
+    var executorMemoryMiB = defaultResources.executorMemoryMiB
+    var memoryOffHeapMiB = defaultResources.memoryOffHeapMiB
+    var pysparkMemoryMiB = defaultResources.pysparkMemoryMiB.getOrElse(0L)
+    var memoryOverheadMiB = calculateOverHeadMemory(defaultResources.memoryOverheadMiB,
+      executorMemoryMiB, overheadFactor)
+
+    val finalCustomResources = if (rpId != DEFAULT_RESOURCE_PROFILE_ID) {
+      val customResources = new mutable.HashMap[String, ExecutorResourceRequest]
+      execResources.foreach { case (r, execReq) =>
+        r match {
+          case ResourceProfile.MEMORY =>
+            executorMemoryMiB = execReq.amount
+          case ResourceProfile.OVERHEAD_MEM =>
+            memoryOverheadMiB = execReq.amount
+          case ResourceProfile.PYSPARK_MEM =>
+            pysparkMemoryMiB = execReq.amount
+          case ResourceProfile.OFFHEAP_MEM =>
+            memoryOffHeapMiB = executorOffHeapMemorySizeAsMb(conf, execReq)
+          case ResourceProfile.CORES =>
+            cores = execReq.amount.toInt
+          case rName =>
+            val nameToUse = resourceMappings.get(rName).getOrElse(rName)
+            customResources(nameToUse) = execReq
+        }
+      }
+      customResources.toMap
+    } else {
+      defaultResources.customResources.map { case (rName, execReq) =>
+        val nameToUse = resourceMappings.get(rName).getOrElse(rName)
+        (nameToUse, execReq)
+      }
+    }
+    // only add in pyspark memory if actually a python application
+    val pysparkMemToUseMiB = if (isPythonApp) {
+      pysparkMemoryMiB
+    } else {
+      0L
+    }
+    val totalMemMiB =
+      (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
+    ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
+      pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
+  }
+
   private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory"
   private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores"
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index f365548..d538f0b 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -52,18 +52,25 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
 
   private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
   private val master = sparkConf.getOption("spark.master")
-  private val isNotYarn = master.isDefined && !master.get.equals("yarn")
-  private val errorForTesting = !isTesting || sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
+  private val isYarn = master.isDefined && master.get.equals("yarn")
+  private val isK8s = master.isDefined && master.get.startsWith("k8s://")
+  private val notRunningUnitTests = !isTesting
+  private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   // If we use anything except the default profile, its only supported on YARN right now.
   // Throw an exception if not supported.
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnAndNotDefaultProfile = isNotDefaultProfile && isNotYarn
-    val YarnNotDynAllocAndNotDefaultProfile = isNotDefaultProfile && !isNotYarn && !dynamicEnabled
-    if (errorForTesting && (notYarnAndNotDefaultProfile || YarnNotDynAllocAndNotDefaultProfile)) {
-      throw new SparkException("ResourceProfiles are only supported on YARN with dynamic " +
-        "allocation enabled.")
+    val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || isK8s)
+    val YarnOrK8sNotDynAllocAndNotDefaultProfile =
+      isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled
+    // We want the exception to be thrown only when we are specifically testing for the
+    // exception or in a real application. Otherwise in all other testing scenarios we want
+    // to skip throwing the exception so that we can test in other modes to make testing easier.
+    if ((notRunningUnitTests || testExceptionThrown) &&
+        (notYarnOrK8sAndNotDefaultProfile || YarnOrK8sNotDynAllocAndNotDefaultProfile)) {
+      throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
+        "with dynamic allocation enabled.")
     }
     true
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b8b044b..7f1f3a7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2971,6 +2971,27 @@ private[spark] object Utils extends Logging {
     metadata.append("]")
     metadata.toString
   }
+
+  /**
+   * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
+    val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
+    checkOffHeapEnabled(sparkConf, sizeInMB).toInt
+  }
+
+  /**
+   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
+    if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
+      require(offHeapSize > 0,
+        s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
+      offHeapSize
+    } else {
+      0
+    }
+  }
 }
 
 private[util] object CallerContext extends Logging {
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index ddfe80e..36a5620 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -47,8 +47,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     val rpmanager = new ResourceProfileManager(conf, listenerBus)
     val defaultProf = rpmanager.defaultResourceProfile
     assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-    assert(defaultProf.executorResources.size === 2,
-      "Executor resources should contain cores and memory by default")
+    assert(defaultProf.executorResources.size === 3,
+      "Executor resources should contain cores, heap and offheap memory by default")
     assert(defaultProf.executorResources(ResourceProfile.CORES).amount === 4,
       s"Executor resources should have 4 cores")
   }
@@ -67,7 +67,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
       rpmanager.isSupported(immrprof)
     }.getMessage()
 
-    assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation"))
+    assert(error.contains(
+      "ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation"))
   }
 
   test("isSupported yarn with dynamic allocation") {
@@ -84,7 +85,22 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     assert(rpmanager.isSupported(immrprof) == true)
   }
 
-  test("isSupported yarn with local mode") {
+  test("isSupported k8s with dynamic allocation") {
+    val conf = new SparkConf().setMaster("k8s://foo").set(EXECUTOR_CORES, 4)
+    conf.set(DYN_ALLOCATION_ENABLED, true)
+    conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
+    // default profile should always work
+    val defaultProf = rpmanager.defaultResourceProfile
+    val rprof = new ResourceProfileBuilder()
+    val gpuExecReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "someScript", "nvidia")
+    val immrprof = rprof.require(gpuExecReq).build
+    assert(rpmanager.isSupported(immrprof) == true)
+  }
+
+  test("isSupported with local mode") {
     val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
     val rpmanager = new ResourceProfileManager(conf, listenerBus)
@@ -98,7 +114,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
       rpmanager.isSupported(immrprof)
     }.getMessage()
 
-    assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation"))
+    assert(error.contains(
+      "ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation"))
   }
 
   test("ResourceProfileManager has equivalent profile") {
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index f8c4a3a..27cc44a 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -43,8 +43,8 @@ class ResourceProfileSuite extends SparkFunSuite {
   test("Default ResourceProfile") {
     val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
     assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-    assert(rprof.executorResources.size === 2,
-      "Executor resources should contain cores and memory by default")
+    assert(rprof.executorResources.size === 3,
+      "Executor resources should contain cores, heap and offheap memory by default")
     assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
       "Executor resources should have 1 core")
     assert(rprof.getExecutorCores.get === 1,
@@ -55,8 +55,8 @@ class ResourceProfileSuite extends SparkFunSuite {
       "pyspark memory empty if not specified")
     assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
       "overhead memory empty if not specified")
-    assert(rprof.executorResources.get(ResourceProfile.OFFHEAP_MEM) == None,
-      "offHeap memory empty if not specified")
+    assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 0,
+      "Executor resources should have 0 offheap memory")
     assert(rprof.taskResources.size === 1,
       "Task resources should just contain cpus by default")
     assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 7ec7c5a..857749e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1406,6 +1406,33 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assert(hostnamePort._1.equals("localhost"))
     assert(hostnamePort._2 === 0)
   }
+
+  test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
+    val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(new SparkConf())
+    assert(executorOffHeapMemory == 0)
+  }
+
+  test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
+    val offHeapMemoryInMB = 50
+    val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
+    val sparkConf = new SparkConf()
+      .set(MEMORY_OFFHEAP_ENABLED, true)
+      .set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
+    val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)
+    assert(executorOffHeapMemory == offHeapMemoryInMB)
+  }
+
+  test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
+    "but MEMORY_OFFHEAP_SIZE not config scene") {
+    val sparkConf = new SparkConf()
+      .set(MEMORY_OFFHEAP_ENABLED, true)
+    val expected =
+      s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
+    val message = intercept[IllegalArgumentException] {
+      Utils.executorOffHeapMemorySizeAsMb(sparkConf)
+    }.getMessage
+    assert(message.contains(expected))
+  }
 }
 
 private class SimpleExtension
diff --git a/docs/configuration.md b/docs/configuration.md
index d4738f1..14ff38d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3051,6 +3051,6 @@ See your cluster manager specific page for requirements and details on each of -
 # Stage Level Scheduling Overview
 
 The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at th [...]
-This is only available for the RDD API in Scala, Java, and Python and requires dynamic allocation to be enabled.  It is only available on YARN at this time. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page for more implementation details.
+This is only available for the RDD API in Scala, Java, and Python.  It is available on YARN and Kubernetes when dynamic allocation is enabled. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page for more implementation details.
 
 See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. The current implementation acquires new executors for each `ResourceProfile`  created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one Resour [...]
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 4714e35..5ec7a2c 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1399,3 +1399,7 @@ Spark automatically handles translating the Spark configs <code>spark.{driver/ex
 
 Kubernetes does not tell Spark the addresses of the resources allocated to each container. For that reason, the user must specify a discovery script that gets run by the executor on startup to discover what resources are available to that executor. You can find an example scripts in `examples/src/main/scripts/getGpusResources.sh`. The script must have execute permissions set and the user should setup permissions to not allow malicious users to modify it. The script should write to STDOUT [...]
 
+### Stage Level Scheduling Overview
+
+Stage level scheduling is supported on Kubernetes when dynamic allocation is enabled. This also requires <code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled since Kubernetes doesn't support an external shuffle service at this time. The order in which containers for different profiles is requested from Kubernetes is not guaranteed. Note that since dynamic allocation on Kubernetes requires the shuffle tracking feature, this means that executors from previous stages t [...]
+Note, there is a difference in the way pod template resources are handled between the base default profile and custom ResourceProfiles. Any resources specified in the pod template file will only be used with the base default profile. If you create custom ResourceProfiles be sure to include all necessary resources there since the resources from the template file will not be propogated to custom ResourceProfiles.
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index f19ce3d..73c4930 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -644,6 +644,7 @@ YARN does not tell Spark the addresses of the resources allocated to each contai
 # Stage Level Scheduling Overview
 
 Stage level scheduling is supported on YARN when dynamic allocation is enabled. One thing to note that is YARN specific is that each ResourceProfile requires a different container priority on YARN. The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. This means that profiles created earlier will have a higher priority in YARN. Normally this won't matter as Spark finishes one stage before starting another one, the only case this mig [...]
+Note there is a difference in the way custom resources are handled between the base default profile and custom ResourceProfiles. To allow for the user to request YARN containers with extra resources without Spark scheduling on them, the user can specify resources via the <code>spark.yarn.executor.resource.</code> config. Those configs are only used in the base default profile though and do not get propogated into any other custom ResourceProfiles. This is because there would be no way to [...]
 
 # Important notes
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 991205a..7d9e494 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -21,6 +21,7 @@ private[spark] object Constants {
   // Labels
   val SPARK_APP_ID_LABEL = "spark-app-selector"
   val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
+  val SPARK_RESOURCE_PROFILE_ID_LABEL = "spark-exec-resourceprofile-id"
   val SPARK_ROLE_LABEL = "spark-role"
   val SPARK_POD_DRIVER_ROLE = "driver"
   val SPARK_POD_EXECUTOR_ROLE = "executor"
@@ -63,6 +64,7 @@ private[spark] object Constants {
   val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
   val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
   val ENV_SPARK_USER = "SPARK_USER"
+  val ENV_RESOURCE_PROFILE_ID = "SPARK_RESOURCE_PROFILE_ID"
   // Spark app configs for containers
   val SPARK_CONF_VOLUME = "spark-conf-volume"
   val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
@@ -84,7 +86,6 @@ private[spark] object Constants {
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
   val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
   val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
-  val MEMORY_OVERHEAD_MIN_MIB = 384L
   val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
 
   // Hadoop Configuration
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index f3e492e..087eeee 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,8 @@ private[spark] class KubernetesExecutorConf(
     sparkConf: SparkConf,
     val appId: String,
     val executorId: String,
-    val driverPod: Option[Pod])
+    val driverPod: Option[Pod],
+    val resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID)
   extends KubernetesConf(sparkConf) with Logging {
 
   override val resourceNamePrefix: String = {
@@ -144,7 +146,8 @@ private[spark] class KubernetesExecutorConf(
     val presetLabels = Map(
       SPARK_EXECUTOR_ID_LABEL -> executorId,
       SPARK_APP_ID_LABEL -> appId,
-      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
+      SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString)
 
     val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
@@ -217,8 +220,9 @@ private[spark] object KubernetesConf {
       sparkConf: SparkConf,
       executorId: String,
       appId: String,
-      driverPod: Option[Pod]): KubernetesExecutorConf = {
-    new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod)
+      driverPod: Option[Pod],
+      resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID): KubernetesExecutorConf = {
+    new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod, resourceProfileId)
   }
 
   def getResourceNamePrefix(appName: String): String = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 63f1812..6503bc8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
 
@@ -66,7 +67,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
 
   private val memoryOverheadMiB = conf
     .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
+    .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt,
+      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
   private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
 
   override def configurePod(pod: SparkPod): SparkPod = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 82e00d5..8c75162 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -26,14 +26,15 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.Python._
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile}
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
 private[spark] class BasicExecutorFeatureStep(
     kubernetesConf: KubernetesExecutorConf,
-    secMgr: SecurityManager)
+    secMgr: SecurityManager,
+    resourceProfile: ResourceProfile)
   extends KubernetesFeatureConfigStep with Logging {
 
   // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
@@ -50,33 +51,43 @@ private[spark] class BasicExecutorFeatureStep(
     kubernetesConf.get(DRIVER_HOST_ADDRESS),
     kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
     CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-  private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = kubernetesConf.get(
-    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
-
-  private val memoryOverheadMiB = kubernetesConf
-    .get(EXECUTOR_MEMORY_OVERHEAD)
-    .getOrElse(math.max(
-      (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
-      MEMORY_OVERHEAD_MIN_MIB))
-  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
-  private val executorMemoryTotal =
-    if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) {
-      executorMemoryWithOverhead +
-        kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
-    } else {
-      executorMemoryWithOverhead
-    }
 
-  private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
+  private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+  private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
+
+  val execResources = ResourceProfile.getResourcesForClusterManager(
+    resourceProfile.id,
+    resourceProfile.executorResources,
+    kubernetesConf.get(MEMORY_OVERHEAD_FACTOR),
+    kubernetesConf.sparkConf,
+    isPythonApp,
+    Map.empty)
+
+  private val executorMemoryString = s"${execResources.executorMemoryMiB}m"
+  // we don't include any kubernetes conf specific requests or limits when using custom
+  // ResourceProfiles because we don't have a way of overriding them if needed
   private val executorCoresRequest =
-    if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
+    if (isDefaultProfile && kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
       kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
     } else {
-      executorCores.toString
+      execResources.cores.toString
     }
   private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
 
+  private def buildExecutorResourcesQuantities(
+      customResources: Set[ExecutorResourceRequest]): Map[String, Quantity] = {
+    customResources.map { request =>
+      val vendorDomain = if (request.vendor.nonEmpty) {
+        request.vendor
+      } else {
+        throw new SparkException(s"Resource: ${request.resourceName} was requested, " +
+          "but vendor was not specified.")
+      }
+      val quantity = new Quantity(request.amount.toString)
+      (KubernetesConf.buildKubernetesResourceName(vendorDomain, request.resourceName), quantity)
+    }.toMap
+  }
+
   override def configurePod(pod: SparkPod): SparkPod = {
     val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}"
 
@@ -89,22 +100,21 @@ private[spark] class BasicExecutorFeatureStep(
       // Replace dangerous characters in the remaining string with a safe alternative.
       .replaceAll("[^\\w-]+", "_")
 
-    val executorMemoryQuantity = new Quantity(s"${executorMemoryTotal}Mi")
+    val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
     val executorCpuQuantity = new Quantity(executorCoresRequest)
-
     val executorResourceQuantities =
-      KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_PREFIX,
-        kubernetesConf.sparkConf)
+      buildExecutorResourcesQuantities(execResources.customResources.values.toSet)
 
     val executorEnv: Seq[EnvVar] = {
         (Seq(
           (ENV_DRIVER_URL, driverUrl),
-          (ENV_EXECUTOR_CORES, executorCores.toString),
+          (ENV_EXECUTOR_CORES, execResources.cores.toString),
           (ENV_EXECUTOR_MEMORY, executorMemoryString),
           (ENV_APPLICATION_ID, kubernetesConf.appId),
           // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
           (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
-          (ENV_EXECUTOR_ID, kubernetesConf.executorId)
+          (ENV_EXECUTOR_ID, kubernetesConf.executorId),
+          (ENV_RESOURCE_PROFILE_ID, resourceProfile.id.toString)
         ) ++ kubernetesConf.environment).map { case (k, v) =>
           new EnvVarBuilder()
             .withName(k)
@@ -166,6 +176,13 @@ private[spark] class BasicExecutorFeatureStep(
           .build()
       }
 
+    if (!isDefaultProfile) {
+      if (pod.container != null && pod.container.getResources() != null) {
+        logDebug("NOT using the default profile and removing template resources")
+        pod.container.setResources(new ResourceRequirements())
+      }
+    }
+
     val executorContainer = new ContainerBuilder(pod.container)
       .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
       .withImage(executorContainerImage)
@@ -184,14 +201,18 @@ private[spark] class BasicExecutorFeatureStep(
       .withPorts(requiredPorts.asJava)
       .addToArgs("executor")
       .build()
-    val containerWithLimitCores = executorLimitCores.map { limitCores =>
-      val executorCpuLimitQuantity = new Quantity(limitCores)
-      new ContainerBuilder(executorContainer)
-        .editResources()
+    val containerWithLimitCores = if (isDefaultProfile) {
+      executorLimitCores.map { limitCores =>
+        val executorCpuLimitQuantity = new Quantity(limitCores)
+        new ContainerBuilder(executorContainer)
+          .editResources()
           .addToLimits("cpu", executorCpuLimitQuantity)
           .endResources()
-        .build()
-    }.getOrElse(executorContainer)
+          .build()
+      }.getOrElse(executorContainer)
+    } else {
+      executorContainer
+    }
     val containerWithLifecycle =
       if (!kubernetesConf.workerDecommissioning) {
         logInfo("Decommissioning not enabled, skipping shutdown script")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 4e8ca47..c029b24 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -17,13 +17,14 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.time.Instant
-import java.time.format.DateTimeParseException
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
-import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, PodBuilder}
+import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -33,6 +34,8 @@ import org.apache.spark.deploy.k8s.KubernetesConf
 import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
 import org.apache.spark.util.{Clock, Utils}
 
 private[spark] class ExecutorPodsAllocator(
@@ -45,7 +48,11 @@ private[spark] class ExecutorPodsAllocator(
 
   private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
 
-  private val totalExpectedExecutors = new AtomicInteger(0)
+  // ResourceProfile id -> total expected executors per profile, currently we don't remove
+  // any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749
+  private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]()
+
+  private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
 
   private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
 
@@ -73,8 +80,8 @@ private[spark] class ExecutorPodsAllocator(
           s"namespace $namespace (this was supposed to be the driver pod.).")))
 
   // Executor IDs that have been requested from Kubernetes but have not been detected in any
-  // snapshot yet. Mapped to the timestamp when they were created.
-  private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, Long]
+  // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created.
+  private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, Long)]
 
   private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
 
@@ -93,9 +100,12 @@ private[spark] class ExecutorPodsAllocator(
     }
   }
 
-  def setTotalExpectedExecutors(total: Int): Unit = {
-    logDebug(s"Set totalExpectedExecutors to $total")
-    totalExpectedExecutors.set(total)
+  def setTotalExpectedExecutors(resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
+    resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
+      rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
+      totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs)
+    }
+    logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId")
     if (!hasPendingPods.get()) {
       snapshotsStore.notifySubscribers()
     }
@@ -114,7 +124,7 @@ private[spark] class ExecutorPodsAllocator(
     // both the creation and deletion events. In either case, delete the missing pod
     // if possible, and mark such a pod to be rescheduled below.
     val currentTime = clock.getTimeMillis()
-    val timedOut = newlyCreatedExecutors.flatMap { case (execId, timeCreated) =>
+    val timedOut = newlyCreatedExecutors.flatMap { case (execId, (_, timeCreated)) =>
       if (currentTime - timeCreated > podCreationTimeout) {
         Some(execId)
       } else {
@@ -147,136 +157,171 @@ private[spark] class ExecutorPodsAllocator(
       lastSnapshot = snapshots.last
     }
 
-    val currentRunningCount = lastSnapshot.executorPods.values.count {
-      case PodRunning(_) => true
-      case _ => false
-    }
-
-    val currentPendingExecutors = lastSnapshot.executorPods
-      .filter {
-        case (_, PodPending(_)) => true
-        case _ => false
-      }
-
     // Make a local, non-volatile copy of the reference since it's used multiple times. This
     // is the only method that modifies the list, so this is safe.
     var _deletedExecutorIds = deletedExecutorIds
-
     if (snapshots.nonEmpty) {
-      logDebug(s"Pod allocation status: $currentRunningCount running, " +
-        s"${currentPendingExecutors.size} pending, " +
-        s"${newlyCreatedExecutors.size} unacknowledged.")
-
       val existingExecs = lastSnapshot.executorPods.keySet
       _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
     }
 
-    val currentTotalExpectedExecutors = totalExpectedExecutors.get
-
-    // This variable is used later to print some debug logs. It's updated when cleaning up
-    // excess pod requests, since currentPendingExecutors is immutable.
-    var knownPendingCount = currentPendingExecutors.size
-
-    // It's possible that we have outstanding pods that are outdated when dynamic allocation
-    // decides to downscale the application. So check if we can release any pending pods early
-    // instead of waiting for them to time out. Drop them first from the unacknowledged list,
-    // then from the pending. However, in order to prevent too frequent frunctuation, newly
-    // requested pods are protected during executorIdleTimeout period.
-    //
-    // TODO: with dynamic allocation off, handle edge cases if we end up with more running
-    // executors than expected.
-    val knownPodCount = currentRunningCount + currentPendingExecutors.size +
-      newlyCreatedExecutors.size
-    if (knownPodCount > currentTotalExpectedExecutors) {
-      val excess = knownPodCount - currentTotalExpectedExecutors
-      val knownPendingToDelete = currentPendingExecutors
-        .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
-        .map { case (id, _) => id }
-        .take(excess - newlyCreatedExecutors.size)
-      val toDelete = newlyCreatedExecutors
-        .filter(x => currentTime - x._2 > executorIdleTimeout)
-        .keys.take(excess).toList ++ knownPendingToDelete
-
-      if (toDelete.nonEmpty) {
-        logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
-        _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+    // Map the pods into per ResourceProfile id so we can check per ResourceProfile,
+    // add a fast path if not using other ResourceProfiles.
+    val rpIdToExecsAndPodState =
+      mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
+    if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
+      rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
+        mutable.HashMap.empty ++= lastSnapshot.executorPods
+    } else {
+      lastSnapshot.executorPods.foreach { case (execId, execPodState) =>
+        val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
+        val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
+          mutable.HashMap[Long, ExecutorPodState]())
+        execPods(execId) = execPodState
+      }
+    }
 
-        Utils.tryLogNonFatalError {
-          kubernetesClient
-            .pods()
-            .withField("status.phase", "Pending")
-            .withLabel(SPARK_APP_ID_LABEL, applicationId)
-            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
-            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
-            .delete()
-          newlyCreatedExecutors --= toDelete
-          knownPendingCount -= knownPendingToDelete.size
+    var totalPendingCount = 0
+    // The order we request executors for each ResourceProfile is not guaranteed.
+    totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) =>
+      val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)
+
+      val currentRunningCount = podsForRpId.values.count {
+        case PodRunning(_) => true
+        case _ => false
+      }
+
+      val currentPendingExecutors = podsForRpId.filter {
+        case (_, PodPending(_)) => true
+        case _ => false
+      }
+      // This variable is used later to print some debug logs. It's updated when cleaning up
+      // excess pod requests, since currentPendingExecutors is immutable.
+      var knownPendingCount = currentPendingExecutors.size
+
+      val newlyCreatedExecutorsForRpId =
+        newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
+          rpId == waitingRpId
         }
+
+      if (podsForRpId.nonEmpty) {
+        logDebug(s"ResourceProfile Id: $rpId " +
+          s"pod allocation status: $currentRunningCount running, " +
+          s"${currentPendingExecutors.size} pending. " +
+          s"${newlyCreatedExecutorsForRpId.size} unacknowledged.")
       }
-    }
 
-    if (newlyCreatedExecutors.isEmpty
-        && knownPodCount < currentTotalExpectedExecutors) {
-      val numExecutorsToAllocate = math.min(
-        currentTotalExpectedExecutors - knownPodCount, podAllocationSize)
-      logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
-      for ( _ <- 0 until numExecutorsToAllocate) {
-        val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
-        val executorConf = KubernetesConf.createExecutorConf(
-          conf,
-          newExecutorId.toString,
-          applicationId,
-          driverPod)
-        val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
-          kubernetesClient)
-        val executorPod = resolvedExecutorSpec.pod
-        val podWithAttachedContainer = new PodBuilder(executorPod.pod)
-          .editOrNewSpec()
-          .addToContainers(executorPod.container)
-          .endSpec()
-          .build()
-        val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
-        try {
-          val resources = resolvedExecutorSpec.executorKubernetesResources
-          addOwnerReference(createdExecutorPod, resources)
-          resources
-            .filter(_.getKind == "PersistentVolumeClaim")
-            .foreach { resource =>
-              val pvc = resource.asInstanceOf[PersistentVolumeClaim]
-              logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
-                s"StorageClass ${pvc.getSpec.getStorageClassName}")
-              kubernetesClient.persistentVolumeClaims().create(pvc)
-            }
-          newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
-          logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
-        } catch {
-          case NonFatal(e) =>
-            kubernetesClient.pods().delete(createdExecutorPod)
-            throw e
+      // It's possible that we have outstanding pods that are outdated when dynamic allocation
+      // decides to downscale the application. So check if we can release any pending pods early
+      // instead of waiting for them to time out. Drop them first from the unacknowledged list,
+      // then from the pending. However, in order to prevent too frequent fluctuation, newly
+      // requested pods are protected during executorIdleTimeout period.
+      //
+      // TODO: with dynamic allocation off, handle edge cases if we end up with more running
+      // executors than expected.
+      val knownPodCount = currentRunningCount + currentPendingExecutors.size +
+        newlyCreatedExecutorsForRpId.size
+
+      if (knownPodCount > targetNum) {
+        val excess = knownPodCount - targetNum
+        val knownPendingToDelete = currentPendingExecutors
+          .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
+          .map { case (id, _) => id }
+          .take(excess - newlyCreatedExecutorsForRpId.size)
+        val toDelete = newlyCreatedExecutorsForRpId
+          .filter { case (_, (_, createTime)) =>
+            currentTime - createTime > executorIdleTimeout
+          }.keys.take(excess).toList ++ knownPendingToDelete
+
+        if (toDelete.nonEmpty) {
+          logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
+          _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+
+          Utils.tryLogNonFatalError {
+            kubernetesClient
+              .pods()
+              .withField("status.phase", "Pending")
+              .withLabel(SPARK_APP_ID_LABEL, applicationId)
+              .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+              .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
+              .delete()
+            newlyCreatedExecutors --= toDelete
+            knownPendingCount -= knownPendingToDelete.size
+          }
         }
       }
-    }
 
+      if (newlyCreatedExecutorsForRpId.isEmpty
+        && knownPodCount < targetNum) {
+        requestNewExecutors(targetNum, knownPodCount, applicationId, rpId)
+      }
+      totalPendingCount += knownPendingCount
+
+      // The code below just prints debug messages, which are only useful when there's a change
+      // in the snapshot state. Since the messages are a little spammy, avoid them when we know
+      // there are no useful updates.
+      if (log.isDebugEnabled && snapshots.nonEmpty) {
+        val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size
+        if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
+          logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
+            "equal to the number of requested executors. Not scaling up further.")
+        } else {
+          if (outstanding > 0) {
+            logDebug(s"Still waiting for $outstanding executors for ResourceProfile " +
+              s"Id $rpId before requesting more.")
+          }
+        }
+      }
+    }
     deletedExecutorIds = _deletedExecutorIds
 
     // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
     // update method when not needed.
-    hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)
-
-    // The code below just prints debug messages, which are only useful when there's a change
-    // in the snapshot state. Since the messages are a little spammy, avoid them when we know
-    // there are no useful updates.
-    if (!log.isDebugEnabled || snapshots.isEmpty) {
-      return
-    }
+    hasPendingPods.set(totalPendingCount + newlyCreatedExecutors.size > 0)
+  }
 
-    if (currentRunningCount >= currentTotalExpectedExecutors && !dynamicAllocationEnabled) {
-      logDebug("Current number of running executors is equal to the number of requested" +
-        " executors. Not scaling up further.")
-    } else {
-      val outstanding = knownPendingCount + newlyCreatedExecutors.size
-      if (outstanding > 0) {
-        logDebug(s"Still waiting for $outstanding executors before requesting more.")
+  private def requestNewExecutors(
+      expected: Int,
+      running: Int,
+      applicationId: String,
+      resourceProfileId: Int): Unit = {
+    val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
+    logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
+      s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
+    for ( _ <- 0 until numExecutorsToAllocate) {
+      val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
+      val executorConf = KubernetesConf.createExecutorConf(
+        conf,
+        newExecutorId.toString,
+        applicationId,
+        driverPod,
+        resourceProfileId)
+      val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
+        kubernetesClient, rpIdToResourceProfile(resourceProfileId))
+      val executorPod = resolvedExecutorSpec.pod
+      val podWithAttachedContainer = new PodBuilder(executorPod.pod)
+        .editOrNewSpec()
+        .addToContainers(executorPod.container)
+        .endSpec()
+        .build()
+      val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
+      try {
+        val resources = resolvedExecutorSpec.executorKubernetesResources
+        addOwnerReference(createdExecutorPod, resources)
+        resources
+          .filter(_.getKind == "PersistentVolumeClaim")
+          .foreach { resource =>
+            val pvc = resource.asInstanceOf[PersistentVolumeClaim]
+            logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
+              s"StorageClass ${pvc.getSpec.getStorageClassName}")
+            kubernetesClient.persistentVolumeClaims().create(pvc)
+          }
+        newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
+        logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+      } catch {
+        case NonFatal(e) =>
+          kubernetesClient.pods().delete(createdExecutorPod)
+          throw e
       }
     }
   }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 4ea22eb..7d1565d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -78,7 +78,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   override def start(): Unit = {
     super.start()
-    podAllocator.setTotalExpectedExecutors(initialExecutors)
+    val initExecs = Map(defaultProfile -> initialExecutors)
+    podAllocator.setTotalExpectedExecutors(initExecs)
     lifecycleEventHandler.start(this)
     podAllocator.start(applicationId())
     watchEvents.start(applicationId())
@@ -121,7 +122,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   override def doRequestTotalExecutors(
       resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
-    podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs(defaultProfile))
+    podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs)
     Future.successful(true)
   }
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index b5f21fe..5388d18 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -23,13 +23,15 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.spark.SecurityManager
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.resource.ResourceProfile
 
 private[spark] class KubernetesExecutorBuilder {
 
   def buildFromFeatures(
       conf: KubernetesExecutorConf,
       secMgr: SecurityManager,
-      client: KubernetesClient): KubernetesExecutorSpec = {
+      client: KubernetesClient,
+      resourceProfile: ResourceProfile): KubernetesExecutorSpec = {
     val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
       .map { file =>
         KubernetesUtils.loadPodFromTemplate(
@@ -40,7 +42,7 @@ private[spark] class KubernetesExecutorBuilder {
       .getOrElse(SparkPod.initialPod())
 
     val features = Seq(
-      new BasicExecutorFeatureStep(conf, secMgr),
+      new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
       new ExecutorKubernetesCredentialsFeatureStep(conf),
       new MountSecretsFeatureStep(conf),
       new EnvSecretsFeatureStep(conf),
@@ -51,6 +53,8 @@ private[spark] class KubernetesExecutorBuilder {
       initialPod,
       executorKubernetesResources = Seq.empty)
 
+    // If using a template this will always get the resources from that and combine
+    // them with any Spark conf or ResourceProfile resources.
     features.foldLeft(spec) { case (spec, feature) =>
       val configuredPod = feature.configurePod(spec.pod)
       val addedResources = feature.getAdditionalKubernetesResources()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 1ca4dbc..0b97322 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 
 class KubernetesConfSuite extends SparkFunSuite {
 
@@ -96,6 +97,17 @@ class KubernetesConfSuite extends SparkFunSuite {
       Some(DRIVER_POD))
     assert(conf.executorId === EXECUTOR_ID)
     assert(conf.driverPod.get === DRIVER_POD)
+    assert(conf.resourceProfileId === DEFAULT_RESOURCE_PROFILE_ID)
+  }
+
+  test("resource profile not default.") {
+    val conf = KubernetesConf.createExecutorConf(
+      new SparkConf(false),
+      EXECUTOR_ID,
+      KubernetesTestConf.APP_ID,
+      Some(DRIVER_POD),
+      10)
+    assert(conf.resourceProfileId === 10)
   }
 
   test("Image pull secrets.") {
@@ -134,7 +146,8 @@ class KubernetesConfSuite extends SparkFunSuite {
     assert(conf.labels === Map(
       SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID,
       SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
-      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
+      SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS)
     assert(conf.annotations === CUSTOM_ANNOTATIONS)
     assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
     assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index c8c934b..858b4f1 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
-import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.{ResourceID, ResourceProfile}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.util.Utils
 
@@ -191,7 +191,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   ).foreach { case (name, resource, factor, expectedFactor) =>
     test(s"memory overhead factor: $name") {
       // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
-      val driverMem = MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+      val driverMem =
+        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
       // main app resource, overhead factor
       val sparkConf = new SparkConf(false)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index d56ffe1..92031c6 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
-import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEndpointAddress
@@ -55,6 +55,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   private val RESOURCE_NAME_PREFIX = "base"
   private val EXECUTOR_IMAGE = "executor-image"
   private val LABELS = Map("label1key" -> "label1value")
+  private var defaultProfile: ResourceProfile = _
   private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2")
   private val TEST_IMAGE_PULL_SECRET_OBJECTS =
     TEST_IMAGE_PULL_SECRETS.map { secret =>
@@ -84,6 +85,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       .set(config.DRIVER_PORT, DRIVER_PORT)
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
       .set("spark.kubernetes.resource.type", "java")
+    initDefaultProfile(baseConf)
   }
 
   private def newExecutorConf(
@@ -95,10 +97,17 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       environment = environment)
   }
 
+  private def initDefaultProfile(baseConf: SparkConf): Unit = {
+    ResourceProfile.clearDefaultProfile()
+    defaultProfile = ResourceProfile.getOrCreateDefaultProfile(baseConf)
+  }
+
   test("test spark resource missing vendor") {
     baseConf.set(EXECUTOR_GPU_ID.amountConf, "2")
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
     val error = intercept[SparkException] {
+      initDefaultProfile(baseConf)
+      val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+        defaultProfile)
       val executor = step.configurePod(SparkPod.initialPod())
     }.getMessage()
     assert(error.contains("Resource: gpu was requested, but vendor was not specified"))
@@ -106,9 +115,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
 
   test("test spark resource missing amount") {
     baseConf.set(EXECUTOR_GPU_ID.vendorConf, "nvidia.com")
-
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
     val error = intercept[SparkException] {
+      initDefaultProfile(baseConf)
+      val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
       val executor = step.configurePod(SparkPod.initialPod())
     }.getMessage()
     assert(error.contains("You must specify an amount for gpu"))
@@ -124,7 +134,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       baseConf.set(testRInfo.rId.amountConf, testRInfo.count)
       baseConf.set(testRInfo.rId.vendorConf, testRInfo.vendor)
     }
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    initDefaultProfile(baseConf)
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
     val executor = step.configurePod(SparkPod.initialPod())
 
     assert(executor.container.getResources.getLimits.size() === 3)
@@ -137,7 +149,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   test("basic executor pod has reasonable defaults") {
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
     val executor = step.configurePod(SparkPod.initialPod())
 
     // The executor pod name and default labels.
@@ -167,7 +180,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
 
     baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix)
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    initDefaultProfile(baseConf)
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
     assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
   }
 
@@ -175,7 +190,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     val invalidPrefix = "abcdef-*_/[]{}+==.,;'\"-----------------------------------------------"
 
     baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalidPrefix)
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    initDefaultProfile(baseConf)
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
     val hostname = step.configurePod(SparkPod.initialPod()).pod.getSpec().getHostname()
     assert(hostname.length <= 63)
     assert(InternetDomainName.isValid(hostname))
@@ -184,8 +201,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   test("classpath and extra java options get translated into environment variables") {
     baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
     baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
+    initDefaultProfile(baseConf)
     val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
-    val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf))
+    val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf),
+      defaultProfile)
     val executor = step.configurePod(SparkPod.initialPod())
 
     checkEnv(executor, baseConf,
@@ -198,7 +217,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   test("SPARK-32655 Support appId/execId placeholder in SPARK_EXECUTOR_DIRS") {
     val kconf = newExecutorConf(environment = Map(ENV_EXECUTOR_DIRS ->
       "/p1/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID,/p2/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID"))
-    val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf))
+    val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf),
+      defaultProfile)
     val executor = step.configurePod(SparkPod.initialPod())
 
     checkEnv(executor, baseConf, Map(ENV_EXECUTOR_DIRS ->
@@ -208,8 +228,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
   test("test executor pyspark memory") {
     baseConf.set("spark.kubernetes.resource.type", "python")
     baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
-
-    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    initDefaultProfile(baseConf)
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
     val executor = step.configurePod(SparkPod.initialPod())
     // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
     assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi")
@@ -224,7 +245,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     secMgr.initializeAuth()
 
     val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
-      secMgr)
+      secMgr, defaultProfile)
 
     val executor = step.configurePod(SparkPod.initialPod())
     checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
@@ -240,15 +261,65 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       .set("spark.master", "k8s://127.0.0.1")
     val secMgr = new SecurityManager(conf)
     secMgr.initializeAuth()
-
     val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
-      secMgr)
+      secMgr, defaultProfile)
 
     val executor = step.configurePod(SparkPod.initialPod())
     assert(!KubernetesFeaturesTestUtils.containerHasEnvVar(
       executor.container, SecurityManager.ENV_AUTH_SECRET))
   }
 
+  test("SPARK-32661 test executor offheap memory") {
+    baseConf.set(MEMORY_OFFHEAP_ENABLED, true)
+    baseConf.set("spark.memory.offHeap.size", "42m")
+    initDefaultProfile(baseConf)
+
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
+      defaultProfile)
+    val executor = step.configurePod(SparkPod.initialPod())
+    // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
+    assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi")
+  }
+
+  test("basic resourceprofile") {
+    baseConf.set("spark.kubernetes.resource.type", "python")
+    initDefaultProfile(baseConf)
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(4).memory("2g").memoryOverhead("1g").pysparkMemory("3g")
+    treq.cpus(2)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    assert(amountAndFormat(executor.container.getResources
+      .getRequests.get("cpu")) === "4")
+    assert(amountAndFormat(executor.container.getResources
+      .getLimits.get("memory")) === "6144Mi")
+  }
+
+  test("resourceprofile with gpus") {
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(2).resource("gpu", 2, "/path/getGpusResources.sh", "nvidia.com")
+    treq.cpus(1)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    assert(amountAndFormat(executor.container.getResources
+      .getLimits.get("memory")) === "1408Mi")
+    assert(amountAndFormat(executor.container.getResources
+      .getRequests.get("cpu")) === "2")
+
+    assert(executor.container.getResources.getLimits.size() === 2)
+    assert(amountAndFormat(executor.container.getResources.getLimits.get("nvidia.com/gpu")) === "2")
+  }
+
   // There is always exactly one controller reference, and it points to the driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)
@@ -265,11 +336,12 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       ENV_EXECUTOR_ID -> "1",
       ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
       ENV_EXECUTOR_CORES -> "1",
-      ENV_EXECUTOR_MEMORY -> "1g",
+      ENV_EXECUTOR_MEMORY -> "1024m",
       ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
       ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
       ENV_EXECUTOR_POD_IP -> null,
-      ENV_SPARK_USER -> Utils.getCurrentUserName())
+      ENV_SPARK_USER -> Utils.getCurrentUserName(),
+      ENV_RESOURCE_PROFILE_ID -> "0")
 
     val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
     val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
index 62c79e6..ad79e3a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -22,13 +22,15 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
 
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.SparkPod
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 
 object ExecutorLifecycleTestUtils {
 
   val TEST_SPARK_APP_ID = "spark-app-id"
 
-  def failedExecutorWithoutDeletion(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def failedExecutorWithoutDeletion(
+      executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
         .withPhase("failed")
         .withStartTime(Instant.now.toString)
@@ -58,8 +60,8 @@ object ExecutorLifecycleTestUtils {
       .build()
   }
 
-  def pendingExecutor(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def pendingExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
         .withPhase("pending")
         .withStartTime(Instant.now.toString)
@@ -67,8 +69,8 @@ object ExecutorLifecycleTestUtils {
       .build()
   }
 
-  def runningExecutor(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def runningExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
         .withPhase("running")
         .withStartTime(Instant.now.toString)
@@ -82,8 +84,9 @@ object ExecutorLifecycleTestUtils {
    * state (terminated with non-zero exit code). This pod is used for unit-testing the
    * spark.kubernetes.executor.checkAllContainers Spark Conf.
    */
-  def runningExecutorWithFailedContainer(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def runningExecutorWithFailedContainer(
+      executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
         .withPhase("running")
         .addNewContainerStatus()
@@ -103,32 +106,34 @@ object ExecutorLifecycleTestUtils {
       .build()
   }
 
-  def succeededExecutor(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def succeededExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
         .withPhase("succeeded")
         .endStatus()
       .build()
   }
 
-  def deletedExecutor(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def deletedExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewMetadata()
         .withDeletionTimestamp("523012521")
         .endMetadata()
       .build()
   }
 
-  def unknownExecutor(executorId: Long): Pod = {
-    new PodBuilder(podWithAttachedContainerForId(executorId))
+  def unknownExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
         .withPhase("unknown")
         .endStatus()
       .build()
   }
 
-  def podWithAttachedContainerForId(executorId: Long): Pod = {
-    val sparkPod = executorPodWithId(executorId)
+  def podWithAttachedContainerForId(
+      executorId: Long,
+      rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
+    val sparkPod = executorPodWithId(executorId, rpId)
     val podWithAttachedContainer = new PodBuilder(sparkPod.pod)
       .editOrNewSpec()
         .addToContainers(sparkPod.container)
@@ -137,13 +142,14 @@ object ExecutorLifecycleTestUtils {
     podWithAttachedContainer
   }
 
-  def executorPodWithId(executorId: Long): SparkPod = {
+  def executorPodWithId(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): SparkPod = {
     val pod = new PodBuilder()
       .withNewMetadata()
         .withName(s"spark-executor-$executorId")
         .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
         .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
         .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+        .addToLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, rpId.toString)
       .endMetadata()
       .editOrNewSpec()
         .withRestartPolicy("Never")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 37f9cae..528b755 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
+import org.apache.spark.resource._
 import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
 import org.apache.spark.util.ManualClock
 
@@ -54,6 +55,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
     .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")
 
+  private val defaultProfile: ResourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
   private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
   private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
   private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
@@ -89,7 +91,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
     when(driverPodOperations.get).thenReturn(driverPod)
     when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
-      meq(kubernetesClient))).thenAnswer(executorPodAnswer())
+      meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
     snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
     waitForExecutorPodsClock = new ManualClock(0L)
     podsAllocatorUnderTest = new ExecutorPodsAllocator(
@@ -99,7 +101,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
   test("Initially request executors in batches. Do not request another batch if the" +
     " first has not finished.") {
-    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
     for (nextId <- 1 to podAllocationSize) {
       verify(podOperations).create(podWithAttachedContainerForId(nextId))
     }
@@ -108,7 +110,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
   test("Request executors in batches. Allow another batch to be requested if" +
     " all pending executors start running.") {
-    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
     for (execId <- 1 until podAllocationSize) {
       snapshotsStore.updatePod(runningExecutor(execId))
     }
@@ -124,7 +126,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
   test("When a current batch reaches error states immediately, re-request" +
     " them on the next batch.") {
-    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> podAllocationSize))
     for (execId <- 1 until podAllocationSize) {
       snapshotsStore.updatePod(runningExecutor(execId))
     }
@@ -145,7 +147,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     when(podOperations
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
       .thenReturn(labeledPods)
-    podsAllocatorUnderTest.setTotalExpectedExecutors(1)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
     verify(podOperations).create(podWithAttachedContainerForId(1))
     waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
     snapshotsStore.notifySubscribers()
@@ -171,7 +173,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     waitForExecutorPodsClock.setTime(startTime)
 
     // Target 1 executor, make sure it's requested, even with an empty initial snapshot.
-    podsAllocatorUnderTest.setTotalExpectedExecutors(1)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
     verify(podOperations).create(podWithAttachedContainerForId(1))
 
     // Mark executor as running, verify that subsequent allocation cycle is a no-op.
@@ -181,7 +183,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     verify(podOperations, never()).delete()
 
     // Request 3 more executors, make sure all are requested.
-    podsAllocatorUnderTest.setTotalExpectedExecutors(4)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4))
     snapshotsStore.notifySubscribers()
     verify(podOperations).create(podWithAttachedContainerForId(2))
     verify(podOperations).create(podWithAttachedContainerForId(3))
@@ -196,7 +198,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
     // Scale down to 1. Pending executors (both acknowledged and not) should be deleted.
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
-    podsAllocatorUnderTest.setTotalExpectedExecutors(1)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
     snapshotsStore.notifySubscribers()
     verify(podOperations, times(4)).create(any())
     verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
@@ -231,7 +233,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
 
-    podsAllocatorUnderTest.setTotalExpectedExecutors(5)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5))
     verify(podOperations).create(podWithAttachedContainerForId(1))
     verify(podOperations).create(podWithAttachedContainerForId(2))
     verify(podOperations).create(podWithAttachedContainerForId(3))
@@ -243,7 +245,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.updatePod(pendingExecutor(2))
 
     // Newly created executors (both acknowledged and not) are protected by executorIdleTimeout
-    podsAllocatorUnderTest.setTotalExpectedExecutors(0)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 0))
     snapshotsStore.notifySubscribers()
     verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5")
     verify(podOperations, never()).delete()
@@ -255,6 +257,88 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     verify(podOperations).delete()
   }
 
+  test("SPARK-33288: multiple resource profiles") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(4).memory("2g")
+    treq.cpus(2)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build
+
+    // Target 1 executor for default profile, 2 for other profile,
+    // make sure it's requested, even with an empty initial snapshot.
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2))
+    verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(2, rp.id))
+    verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+
+    // Mark executor as running, verify that subsequent allocation cycle is a no-op.
+    snapshotsStore.updatePod(runningExecutor(1, defaultProfile.id))
+    snapshotsStore.updatePod(runningExecutor(2, rp.id))
+    snapshotsStore.updatePod(runningExecutor(3, rp.id))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, times(3)).create(any())
+    verify(podOperations, never()).delete()
+
+    // Request 3 more executors for default profile and 1 more for other profile,
+    // make sure all are requested.
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
+
+    // Mark 4 as running, 5 and 7 as pending. Allocation cycle should do nothing.
+    snapshotsStore.updatePod(runningExecutor(4, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(5, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(7, rp.id))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, times(7)).create(any())
+    verify(podOperations, never()).delete()
+
+    // Scale down to 1 for both resource profiles. Pending executors
+    // (both acknowledged and not) should be deleted.
+    waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, times(7)).create(any())
+    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
+    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
+    verify(podOperations, times(2)).delete()
+    assert(podsAllocatorUnderTest.isDeleted("5"))
+    assert(podsAllocatorUnderTest.isDeleted("6"))
+    assert(podsAllocatorUnderTest.isDeleted("7"))
+
+    // Update the snapshot to not contain the deleted executors, make sure the
+    // allocator cleans up internal state.
+    snapshotsStore.updatePod(deletedExecutor(5))
+    snapshotsStore.updatePod(deletedExecutor(6))
+    snapshotsStore.updatePod(deletedExecutor(7))
+    snapshotsStore.removeDeletedExecutors()
+    snapshotsStore.notifySubscribers()
+    assert(!podsAllocatorUnderTest.isDeleted("5"))
+    assert(!podsAllocatorUnderTest.isDeleted("6"))
+    assert(!podsAllocatorUnderTest.isDeleted("7"))
+  }
+
   test("SPARK-33262: pod allocator does not stall with pending pods") {
     when(podOperations
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
@@ -269,7 +353,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6"))
       .thenReturn(podOperations)
 
-    podsAllocatorUnderTest.setTotalExpectedExecutors(6)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6))
     // Initial request of pods
     verify(podOperations).create(podWithAttachedContainerForId(1))
     verify(podOperations).create(podWithAttachedContainerForId(2))
@@ -292,6 +376,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
     (invocation: InvocationOnMock) => {
       val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
-      KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt), Seq.empty)
+      KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
+        k8sConf.resourceProfileId.toInt), Seq.empty)
   }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 894e1e4..a7c6ffa 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
-import org.apache.spark.resource.ResourceProfileManager
+import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -89,6 +89,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
 
   private val listenerBus = new LiveListenerBus(new SparkConf())
   private val resourceProfileManager = new ResourceProfileManager(sparkConf, listenerBus)
+  private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
 
   before {
     MockitoAnnotations.initMocks(this)
@@ -118,7 +119,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
 
   test("Start all components") {
     schedulerBackendUnderTest.start()
-    verify(podAllocator).setTotalExpectedExecutors(3)
+    verify(podAllocator).setTotalExpectedExecutors(Map(defaultProfile -> 3))
     verify(podAllocator).start(TEST_SPARK_APP_ID)
     verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
     verify(watchEvents).start(TEST_SPARK_APP_ID)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 796e212..c64b733 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -21,6 +21,7 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.resource.ResourceProfile
 
 class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
 
@@ -32,7 +33,8 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
     sparkConf.set("spark.driver.host", "https://driver.host.com")
     val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
     val secMgr = new SecurityManager(sparkConf)
-    new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client).pod
+    val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, defaultProfile).pod
   }
 
 }
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index d605ae4..fd66a5d 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -85,6 +85,7 @@ case "$1" in
       --cores $SPARK_EXECUTOR_CORES
       --app-id $SPARK_APPLICATION_ID
       --hostname $SPARK_EXECUTOR_POD_IP
+      --resourceProfileId $SPARK_RESOURCE_PROFILE_ID
     )
     ;;
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 30ca4a6..552167c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -63,6 +63,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper}
 
@@ -93,7 +94,8 @@ private[spark] class Client(
   private val amMemoryOverhead = {
     val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
     sparkConf.get(amMemoryOverheadEntry).getOrElse(
-      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong,
+        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
   }
   private val amCores = if (isClusterMode) {
     sparkConf.get(DRIVER_CORES)
@@ -104,9 +106,10 @@ private[spark] class Client(
   // Executor related configurations
   private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
   // Executor offHeap memory in MiB.
-  protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
+  protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)
   private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong,
+      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
 
   private val isPython = sparkConf.get(IS_PYTHON_APP)
   private val pysparkWorkerMemory: Int = if (isPython) {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 3d800be..3aabc46 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -43,6 +43,8 @@ private object ResourceRequestHelper extends Logging {
   private val RESOURCE_NOT_FOUND = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
   val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
   val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
+  private[yarn] val resourceNameMapping =
+    Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG)
   @volatile private var numResourceErrors: Int = 0
 
   private[yarn] def getYarnResourcesAndAmounts(
@@ -76,7 +78,7 @@ private object ResourceRequestHelper extends Logging {
       confPrefix: String,
       sparkConf: SparkConf
   ): Map[String, String] = {
-    Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
+    resourceNameMapping.map {
       case (rName, yarnName) =>
         (yarnName -> sparkConf.get(new ResourceID(confPrefix, rName).amountConf, "0"))
     }.filter { case (_, count) => count.toLong > 0 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ef01a2a..c3b7cc0 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -37,7 +37,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.internal.config.Python._
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
@@ -162,34 +161,7 @@ private[yarn] class YarnAllocator(
   private val allocatorNodeHealthTracker =
     new YarnAllocatorNodeHealthTracker(sparkConf, amClient, failureTracker)
 
-  // Executor memory in MiB.
-  protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
-  // Executor offHeap memory in MiB.
-  protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
-  // Additional memory overhead.
-  protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
-  protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
-    sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
-  } else {
-    0
-  }
-  // Number of cores per executor for the default profile
-  protected val defaultExecutorCores = sparkConf.get(EXECUTOR_CORES)
-
-  private val executorResourceRequests =
-    getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
-    getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
-
-  // Resource capability requested for each executor for the default profile
-  private[yarn] val defaultResource: Resource = {
-    val resource: Resource = Resource.newInstance(
-      executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory,
-      defaultExecutorCores)
-    ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
-    logDebug(s"Created resource capability: $resource")
-    resource
-  }
+  private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
 
   private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
     "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
@@ -211,11 +183,10 @@ private[yarn] class YarnAllocator(
       new HashMap[String, mutable.Set[ContainerId]]()
     runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]())
     numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0)
-    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
-      SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
-    rpIdToYarnResource.put(DEFAULT_RESOURCE_PROFILE_ID, defaultResource)
-    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
-      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val initTargetExecNum = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initTargetExecNum
+    val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    createYarnResourceForResourceProfile(defaultProfile)
   }
 
   initDefaultProfile()
@@ -302,48 +273,55 @@ private[yarn] class YarnAllocator(
   }
 
   // if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it
-  private def createYarnResourceForResourceProfile(
-      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = synchronized {
-    resourceProfileToTotalExecs.foreach { case (rp, num) =>
-      if (!rpIdToYarnResource.contains(rp.id)) {
-        // Start with the application or default settings
-        var heapMem = executorMemory.toLong
-        var offHeapMem = executorOffHeapMemory.toLong
-        var overheadMem = memoryOverhead.toLong
-        var pysparkMem = pysparkWorkerMemory.toLong
-        var cores = defaultExecutorCores
-        val customResources = new mutable.HashMap[String, String]
-        // track the resource profile if not already there
-        getOrUpdateRunningExecutorForRPId(rp.id)
-        logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
-        val execResources = rp.executorResources
-        execResources.foreach { case (r, execReq) =>
-          r match {
-            case ResourceProfile.MEMORY =>
-              heapMem = execReq.amount
-            case ResourceProfile.OVERHEAD_MEM =>
-              overheadMem = execReq.amount
-            case ResourceProfile.PYSPARK_MEM =>
-              pysparkMem = execReq.amount
-            case ResourceProfile.OFFHEAP_MEM =>
-              offHeapMem = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
-            case ResourceProfile.CORES =>
-              cores = execReq.amount.toInt
-            case "gpu" =>
-              customResources(YARN_GPU_RESOURCE_CONFIG) = execReq.amount.toString
-            case "fpga" =>
-              customResources(YARN_FPGA_RESOURCE_CONFIG) = execReq.amount.toString
-            case rName =>
-              customResources(rName) = execReq.amount.toString
-          }
+  private def createYarnResourceForResourceProfile(rp: ResourceProfile): Unit = synchronized {
+    if (!rpIdToYarnResource.contains(rp.id)) {
+      // track the resource profile if not already there
+      getOrUpdateRunningExecutorForRPId(rp.id)
+      logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+      val resourcesWithDefaults =
+        ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources,
+          MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp,
+          ResourceRequestHelper.resourceNameMapping)
+      val customSparkResources =
+        resourcesWithDefaults.customResources.map { case (name, execReq) =>
+          (name, execReq.amount.toString)
         }
-        val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt
-        val resource = Resource.newInstance(totalMem, cores)
-        ResourceRequestHelper.setResourceRequests(customResources.toMap, resource)
-        logDebug(s"Created resource capability: $resource")
-        rpIdToYarnResource.putIfAbsent(rp.id, resource)
-        rpIdToResourceProfile(rp.id) = rp
+      // There is a difference in the way custom resources are handled between
+      // the base default profile and custom ResourceProfiles. To allow for the user
+      // to request YARN containers with extra resources without Spark scheduling on
+      // them, the user can specify resources via the <code>spark.yarn.executor.resource.</code>
+      // config. Those configs are only used in the base default profile though and do
+      // not get propogated into any other custom ResourceProfiles. This is because
+      // there would be no way to remove them if you wanted a stage to not have them.
+      // This results in your default profile getting custom resources defined in
+      // <code>spark.yarn.executor.resource.</code> plus spark defined resources of
+      // GPU or FPGA. Spark converts GPU and FPGA resources into the YARN built in
+      // types <code>yarn.io/gpu</code>) and <code>yarn.io/fpga</code>, but does not
+      // know the mapping of any other resources. Any other Spark custom resources
+      // are not propogated to YARN for the default profile. So if you want Spark
+      // to schedule based off a custom resource and have it requested from YARN, you
+      // must specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>)
+      // and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the Spark
+      // config off if you only want YARN containers with the extra resources but Spark not to
+      // schedule using them. Now for custom ResourceProfiles, it doesn't currently have a way
+      // to only specify YARN resources without Spark scheduling off of them. This means for
+      // custom ResourceProfiles we propogate all the resources defined in the ResourceProfile
+      // to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires
+      // that the name of any custom resources you specify match what they are defined as in YARN.
+      val customResources = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) {
+        getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
+          customSparkResources.filterKeys { r =>
+            (r == YARN_GPU_RESOURCE_CONFIG || r == YARN_FPGA_RESOURCE_CONFIG)
+          }
+      } else {
+        customSparkResources
       }
+      val resource =
+        Resource.newInstance(resourcesWithDefaults.totalMemMiB, resourcesWithDefaults.cores)
+      ResourceRequestHelper.setResourceRequests(customResources, resource)
+      logDebug(s"Created resource capability: $resource")
+      rpIdToYarnResource.putIfAbsent(rp.id, resource)
+      rpIdToResourceProfile(rp.id) = rp
     }
   }
 
@@ -370,9 +348,8 @@ private[yarn] class YarnAllocator(
     this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
     this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId
 
-    createYarnResourceForResourceProfile(resourceProfileToTotalExecs)
-
     val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
+      createYarnResourceForResourceProfile(rp)
       if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
         logInfo(s"Driver requested a total number of $numExecs executor(s) " +
           s"for resource profile id: ${rp.id}.")
@@ -477,7 +454,7 @@ private[yarn] class YarnAllocator(
           var requestContainerMessage = s"Will request $missing executor container(s) for " +
             s" ResourceProfile Id: $rpId, each with " +
             s"${resource.getVirtualCores} core(s) and " +
-            s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
+            s"${resource.getMemory} MB memory."
           if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
             ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
             requestContainerMessage ++= s" with custom resources: " + resource.toString
@@ -723,9 +700,10 @@ private[yarn] class YarnAllocator(
       }
 
       val rp = rpIdToResourceProfile(rpId)
+      val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
       val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
-        map(_.amount.toInt).getOrElse(executorMemory)
-      val containerCores = rp.getExecutorCores.getOrElse(defaultExecutorCores)
+        map(_.amount).getOrElse(defaultResources.executorMemoryMiB).toInt
+      val containerCores = rp.getExecutorCores.getOrElse(defaultResources.cores)
       val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
       if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
         getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index fe8990b..0273de1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -38,7 +38,6 @@ object YarnSparkHadoopUtil {
   // the common cases. Memory overhead tends to grow with container size.
 
   val MEMORY_OVERHEAD_FACTOR = 0.10
-  val MEMORY_OVERHEAD_MIN = 384L
 
   val ANY_HOST = "*"
 
@@ -185,32 +184,11 @@ object YarnSparkHadoopUtil {
   }
 
   /**
-   * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
-   */
-  def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
-    val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
-    checkOffHeapEnabled(sparkConf, sizeInMB).toInt
-  }
-
-  /**
    * Get offHeap memory size from [[ExecutorResourceRequest]]
    * return 0 if MEMORY_OFFHEAP_ENABLED is false.
    */
   def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
     execRequest: ExecutorResourceRequest): Long = {
-    checkOffHeapEnabled(sparkConf, execRequest.amount)
-  }
-
-  /**
-   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
-   */
-  def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
-    if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
-      require(offHeapSize > 0,
-        s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
-      offHeapSize
-    } else {
-      0
-    }
+    Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
   }
 }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6b5c72a..825bdd9 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -75,7 +75,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
   // priority has to be 0 to match default profile id
   val RM_REQUEST_PRIORITY = Priority.newInstance(0)
   val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-  val defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+  var defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -114,6 +114,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     for ((name, value) <- additionalConfigs) {
       sparkConfClone.set(name, value)
     }
+    // different spark confs means we need to reinit the default profile
+    ResourceProfile.clearDefaultProfile()
+    defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConfClone)
 
     val allocator = new YarnAllocator(
       "not used",
@@ -268,12 +271,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
 
     handler.updateResourceRequests()
-    val container = createContainer("host1", resource = handler.defaultResource)
+    val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+    val container = createContainer("host1", resource = defaultResource)
     handler.handleAllocatedContainers(Array(container))
 
     // get amount of memory and vcores from resource, so effectively skipping their validation
-    val expectedResources = Resource.newInstance(handler.defaultResource.getMemory(),
-      handler.defaultResource.getVirtualCores)
+    val expectedResources = Resource.newInstance(defaultResource.getMemory(),
+      defaultResource.getVirtualCores)
     setResourceRequests(Map("gpu" -> "2G"), expectedResources)
     val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
 
@@ -296,7 +300,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
 
     handler.updateResourceRequests()
-    val yarnRInfo = ResourceRequestTestHelper.getResources(handler.defaultResource)
+    val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+    val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
     val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap
     assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
     assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
@@ -656,9 +661,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
       val (handler, _) = createAllocator(maxExecutors = 1,
         additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
-      val memory = handler.defaultResource.getMemory
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemory
       assert(memory ==
-        executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
+        executorMemory + offHeapMemoryInMB + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)
     } finally {
       sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled)
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 9cd3747..7f8dd59 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -142,31 +142,4 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
     }
 
   }
-
-  test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
-    val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf())
-    assert(executorOffHeapMemory == 0)
-  }
-
-  test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
-    val offHeapMemoryInMB = 50
-    val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
-    val sparkConf = new SparkConf()
-      .set(MEMORY_OFFHEAP_ENABLED, true)
-      .set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
-    val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
-    assert(executorOffHeapMemory == offHeapMemoryInMB)
-  }
-
-  test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
-    "but MEMORY_OFFHEAP_SIZE not config scene") {
-    val sparkConf = new SparkConf()
-      .set(MEMORY_OFFHEAP_ENABLED, true)
-    val expected =
-      s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
-    val message = intercept[IllegalArgumentException] {
-      YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
-    }.getMessage
-    assert(message.contains(expected))
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org