You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/11/05 21:20:52 UTC

[spark] branch branch-3.2 updated: [SPARK-37208][YARN] Support mapping Spark gpu/fpga resource types to custom YARN resource types

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 966c90c  [SPARK-37208][YARN] Support mapping Spark gpu/fpga resource types to custom YARN resource types
966c90c is described below

commit 966c90c0b5e66b3da332bd3183b9809df8959960
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Fri Nov 5 14:19:16 2021 -0700

    [SPARK-37208][YARN] Support mapping Spark gpu/fpga resource types to custom YARN resource types
    
    ### What changes were proposed in this pull request?
    Add configs to allow mapping the Spark gpu/fpga resource type to a custom YARN resource type.
    Currently Spark hardcodes the mapping of resource "gpu" to "yarn.io/gpu" and "fpga" to "yarn.io/fpga". This PR just allows the user to specify the "yarn.io/*" resource side.
    
    Note it would be nice to put this in 3.2.1 as well, let me know if any objections.
    
    ### Why are the changes needed?
    YARN supports custom resource types and in Hadoop 3.3.1 made it easier for users to plugin in custom resource types. This means users may create a custom resource type that represents a GPU or FPGAs because they want additional logic that the YARN built in versions don't have.  Ideally Spark users still just  use the generic "gpu" or "fpga" types in Spark.  So this adds that ability so Spark end users don't need to know about changes to YARN resource types.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Configs added.
    
    ### How was this patch tested?
    
    Tested manually with Hadoop 3.3.1 plugin https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/DevelopYourOwnDevicePlugin.html
    and unit tests added here.
    
    Closes #34485 from tgravescs/migyarnlatest.
    
    Lead-authored-by: Thomas Graves <tg...@nvidia.com>
    Co-authored-by: Thomas Graves <tg...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 8e6f636260a1cee242e0953973b695e1a717929c)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 docs/running-on-yarn.md                            | 25 +++++++++-
 .../spark/deploy/yarn/ResourceRequestHelper.scala  | 18 +++----
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  9 ++--
 .../org/apache/spark/deploy/yarn/config.scala      | 18 +++++++
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 55 +++++++++++++++++-----
 .../deploy/yarn/ResourceRequestHelperSuite.scala   |  4 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 46 +++++++++++++++---
 7 files changed, 141 insertions(+), 34 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 9930f3e..4a1dddf 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -206,6 +206,28 @@ To use a custom metrics.properties for the application master and executors, upd
   <td>3.0.0</td>
 </tr>
 <tr>
+  <td><code>spark.yarn.resourceGpuDeviceName</code></td>
+  <td><code>yarn.io/gpu</code></td>
+  <td>
+    Specify the mapping of the Spark resource type of <code>gpu</code> to the YARN resource
+    representing a GPU. By default YARN uses <code>yarn.io/gpu</code> but if YARN has been
+    configured with a custom resource type, this allows remapping it.
+    Applies when using the <code>spark.{driver/executor}.resource.gpu.*</code> configs.
+  </td>
+  <td>3.2.1</td>
+</tr>
+<tr>
+  <td><code>spark.yarn.resourceFpgaDeviceName</code></td>
+  <td><code>yarn.io/fpga</code></td>
+  <td>
+    Specify the mapping of the Spark resource type of <code>fpga</code> to the YARN resource
+    representing a FPGA. By default YARN uses <code>yarn.io/fpga</code> but if YARN has been
+    configured with a custom resource type, this allows remapping it.
+    Applies when using the <code>spark.{driver/executor}.resource.fpga.*</code> configs.
+  </td>
+  <td>3.2.1</td>
+</tr>
+<tr>
   <td><code>spark.yarn.am.cores</code></td>
   <td><code>1</code></td>
   <td>
@@ -635,7 +657,8 @@ Please make sure to have read the Custom Resource Scheduling and Configuration O
 
 YARN needs to be configured to support any resources the user wants to use with Spark. Resource scheduling on YARN was added in YARN 3.1.0. See the YARN documentation for more information on configuring resources and properly setting up isolation. Ideally the resources are setup isolated so that an executor can only see the resources it was allocated. If you do not have isolation enabled, the user is responsible for creating a discovery script that ensures the resource is not shared betw [...]
 
-YARN currently supports any user defined resource type but has built in types for GPU (<code>yarn.io/gpu</code>) and FPGA (<code>yarn.io/fpga</code>). For that reason, if you are using either of those resources, Spark can translate your request for spark resources into YARN resources and you only have to specify the <code>spark.{driver/executor}.resource.</code> configs. If you are using a resource other then FPGA or GPU, the user is responsible for specifying the configs for both YARN ( [...]
+YARN supports user defined resource types but has built in types for GPU (<code>yarn.io/gpu</code>) and FPGA (<code>yarn.io/fpga</code>). For that reason, if you are using either of those resources, Spark can translate your request for spark resources into YARN resources and you only have to specify the <code>spark.{driver/executor}.resource.</code> configs. Note, if you are using a custom resource type for GPUs or FPGAs with YARN you can change the Spark mapping using <code>spark.yarn.r [...]
+ If you are using a resource other then FPGA or GPU, the user is responsible for specifying the configs for both YARN (<code>spark.yarn.{driver/executor}.resource.</code>) and Spark (<code>spark.{driver/executor}.resource.</code>).
 
 For example, the user wants to request 2 GPUs for each executor. The user can just specify <code>spark.executor.resource.gpu.amount=2</code> and Spark will handle requesting <code>yarn.io/gpu</code> resource type from YARN.
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 3aabc46..50e8225 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -41,10 +41,6 @@ private object ResourceRequestHelper extends Logging {
   private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
   private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation"
   private val RESOURCE_NOT_FOUND = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
-  val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
-  val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
-  private[yarn] val resourceNameMapping =
-    Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG)
   @volatile private var numResourceErrors: Int = 0
 
   private[yarn] def getYarnResourcesAndAmounts(
@@ -68,6 +64,10 @@ private object ResourceRequestHelper extends Logging {
     }.toMap
   }
 
+  private[yarn] def getResourceNameMapping(sparkConf: SparkConf): Map[String, String] = {
+    Map(GPU -> sparkConf.get(YARN_GPU_DEVICE), FPGA -> sparkConf.get(YARN_FPGA_DEVICE))
+  }
+
   /**
    * Convert Spark resources into YARN resources.
    * The only resources we know how to map from spark configs to yarn configs are
@@ -78,7 +78,7 @@ private object ResourceRequestHelper extends Logging {
       confPrefix: String,
       sparkConf: SparkConf
   ): Map[String, String] = {
-    resourceNameMapping.map {
+    getResourceNameMapping(sparkConf).map {
       case (rName, yarnName) =>
         (yarnName -> sparkConf.get(new ResourceID(confPrefix, rName).amountConf, "0"))
     }.filter { case (_, count) => count.toLong > 0 }
@@ -113,13 +113,13 @@ private object ResourceRequestHelper extends Logging {
       (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
       (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
       (new ResourceID(SPARK_EXECUTOR_PREFIX, "fpga").amountConf,
-        s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
+        s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${sparkConf.get(YARN_FPGA_DEVICE)}"),
       (new ResourceID(SPARK_DRIVER_PREFIX, "fpga").amountConf,
-        s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
+        s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${sparkConf.get(YARN_FPGA_DEVICE)}"),
       (new ResourceID(SPARK_EXECUTOR_PREFIX, "gpu").amountConf,
-        s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"),
+        s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${sparkConf.get(YARN_GPU_DEVICE)}"),
       (new ResourceID(SPARK_DRIVER_PREFIX, "gpu").amountConf,
-        s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"))
+        s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${sparkConf.get(YARN_GPU_DEVICE)}"))
 
     val errorMessage = new mutable.StringBuilder()
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index f236fc3..54ab643 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -171,6 +171,8 @@ private[yarn] class YarnAllocator(
 
   private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
+  private val resourceNameMapping = ResourceRequestHelper.getResourceNameMapping(sparkConf)
+
   // A container placement strategy based on pending tasks' locality preference
   private[yarn] val containerPlacementStrategy =
     new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
@@ -280,8 +282,7 @@ private[yarn] class YarnAllocator(
       logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
       val resourcesWithDefaults =
         ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources,
-          MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp,
-          ResourceRequestHelper.resourceNameMapping)
+          MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp, resourceNameMapping)
       val customSparkResources =
         resourcesWithDefaults.customResources.map { case (name, execReq) =>
           (name, execReq.amount.toString)
@@ -309,9 +310,11 @@ private[yarn] class YarnAllocator(
       // to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires
       // that the name of any custom resources you specify match what they are defined as in YARN.
       val customResources = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) {
+        val gpuResource = sparkConf.get(YARN_GPU_DEVICE)
+        val fpgaResource = sparkConf.get(YARN_FPGA_DEVICE)
         getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
           customSparkResources.filterKeys { r =>
-            (r == YARN_GPU_RESOURCE_CONFIG || r == YARN_FPGA_RESOURCE_CONFIG)
+            (r == gpuResource || r == fpgaResource)
           }
       } else {
         customSparkResources
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 89a4af2..dc4b694 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -403,6 +403,24 @@ package object config extends Logging {
     .toSequence
     .createWithDefault(Nil)
 
+  private[spark] val YARN_GPU_DEVICE = ConfigBuilder("spark.yarn.resourceGpuDeviceName")
+    .version("3.2.1")
+    .doc("Specify the mapping of the Spark resource type of gpu to the YARN resource "
+      + "representing a GPU. By default YARN uses yarn.io/gpu but if YARN has been "
+      + "configured with a custom resource type, this allows remapping it. "
+      + "Applies when using the <code>spark.{driver/executor}.resource.gpu.*</code> configs.")
+    .stringConf
+    .createWithDefault("yarn.io/gpu")
+
+  private[spark] val YARN_FPGA_DEVICE = ConfigBuilder("spark.yarn.resourceFpgaDeviceName")
+    .version("3.2.1")
+    .doc("Specify the mapping of the Spark resource type of fpga to the YARN resource "
+      + "representing a FPGA. By default YARN uses yarn.io/fpga but if YARN has been "
+      + "configured with a custom resource type, this allows remapping it. "
+      + "Applies when using the <code>spark.{driver/executor}.resource.fpga.*</code> configs.")
+    .stringConf
+    .createWithDefault("yarn.io/fpga")
+
   private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource."
   private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
   private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index ea3acec..32dab6d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -474,10 +474,10 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
   test("custom driver resource request yarn config and spark config fails") {
     assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-    val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga")
-    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
 
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
+    val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", conf.get(YARN_FPGA_DEVICE) -> "fpga")
+    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
     resources.keys.foreach { yarnName =>
       conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
     }
@@ -497,10 +497,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
   test("custom executor resource request yarn config and spark config fails") {
     assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-    val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga")
-    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
-
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
+    val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", conf.get(YARN_FPGA_DEVICE) -> "fpga")
+    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
     resources.keys.foreach { yarnName =>
       conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
     }
@@ -521,13 +520,14 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
   test("custom resources spark config mapped to yarn config") {
     assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+    val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val yarnMadeupResource = "yarn.io/madeup"
-    val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu",
-      YARN_FPGA_RESOURCE_CONFIG -> "fpga",
+    val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu",
+      conf.get(YARN_FPGA_DEVICE) -> "fpga",
       yarnMadeupResource -> "madeup")
+
     ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
 
-    val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     resources.values.foreach { rName =>
       conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
     }
@@ -544,14 +544,45 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
     val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource)
     val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap
-    assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
-    assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
-    assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty)
-    assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 3)
+    assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty)
+    assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3)
+    assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty)
+    assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).get === 3)
     assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
     assert(allResourceInfo.get(yarnMadeupResource).get === 5)
   }
 
+  test("gpu/fpga spark resources mapped to custom yarn resources") {
+    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+    val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
+    val gpuCustomName = "custom/gpu"
+    val fpgaCustomName = "custom/fpga"
+    conf.set(YARN_GPU_DEVICE.key, gpuCustomName)
+    conf.set(YARN_FPGA_DEVICE.key, fpgaCustomName)
+    val resources = Map(gpuCustomName -> "gpu",
+      fpgaCustomName -> "fpga")
+
+    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
+    resources.values.foreach { rName =>
+      conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
+    }
+    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+    val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
+    val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
+
+    val client = new Client(new ClientArguments(Array()), conf, null)
+    val newContext = client.createApplicationSubmissionContext(
+      new YarnClientApplication(getNewApplicationResponse, appContext),
+      containerLaunchContext)
+
+    val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource)
+    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap
+    assert(allResourceInfo.get(gpuCustomName).nonEmpty)
+    assert(allResourceInfo.get(gpuCustomName).get === 3)
+    assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
+    assert(allResourceInfo.get(fpgaCustomName).get === 3)
+  }
+
   test("test yarn jars path not exists") {
     withTempDir { dir =>
       val conf = new SparkConf().set(SPARK_JARS, Seq(dir.getAbsolutePath + "/test"))
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 3e54dd0..874cc08 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -60,8 +60,8 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
 
   test("get yarn resources from configs") {
     val sparkConf = new SparkConf()
-    val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "2G",
-      YARN_FPGA_RESOURCE_CONFIG -> "3G", "custom" -> "4")
+    val resources = Map(sparkConf.get(YARN_GPU_DEVICE) -> "2G",
+      sparkConf.get(YARN_GPU_DEVICE) -> "3G", "custom" -> "4")
     resources.foreach { case (name, value) =>
       sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
       sparkConf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 9a7eed6..db65d12 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -181,7 +181,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   test("single container allocated with ResourceProfile") {
     assume(isYarnResourceTypesAvailable())
-    val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG)
+    val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE))
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     // create default profile so we get a different id to test below
     val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
@@ -216,7 +216,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
   test("multiple containers allocated with ResourceProfiles") {
     assume(isYarnResourceTypesAvailable())
-    val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG)
+    val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), sparkConf.get(YARN_FPGA_DEVICE))
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     // create default profile so we get a different id to test below
     val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
@@ -293,7 +293,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
   test("custom spark resource mapped to yarn resource configs") {
     assume(isYarnResourceTypesAvailable())
     val yarnMadeupResource = "yarn.io/madeup"
-    val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource)
+    val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), sparkConf.get(YARN_FPGA_DEVICE),
+      yarnMadeupResource)
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
     val madeupConfigName = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}"
@@ -307,14 +308,45 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
     val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
     val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap
-    assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
-    assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
-    assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty)
-    assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 2)
+    assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty)
+    assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3)
+    assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty)
+    assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).get === 2)
     assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
     assert(allResourceInfo.get(yarnMadeupResource).get === 5)
   }
 
+  test("gpu/fpga spark resource mapped to custom yarn resource") {
+    assume(isYarnResourceTypesAvailable())
+    val gpuCustomName = "custom/gpu"
+    val fpgaCustomName = "custom/fpga"
+    val originalGpu = sparkConf.get(YARN_GPU_DEVICE)
+    val originalFpga = sparkConf.get(YARN_FPGA_DEVICE)
+    try {
+      sparkConf.set(YARN_GPU_DEVICE.key, gpuCustomName)
+      sparkConf.set(YARN_FPGA_DEVICE.key, fpgaCustomName)
+      val yarnResources = Seq(gpuCustomName, fpgaCustomName)
+      ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
+      val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+      val sparkResources =
+        Map(EXECUTOR_GPU_ID.amountConf -> "3",
+          EXECUTOR_FPGA_ID.amountConf -> "2")
+      val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
+
+      handler.updateResourceRequests()
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
+      val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap
+      assert(allResourceInfo.get(gpuCustomName).nonEmpty)
+      assert(allResourceInfo.get(gpuCustomName).get === 3)
+      assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
+      assert(allResourceInfo.get(fpgaCustomName).get === 2)
+    } finally {
+      sparkConf.set(YARN_GPU_DEVICE.key, originalGpu)
+      sparkConf.set(YARN_FPGA_DEVICE.key, originalFpga)
+    }
+  }
+
   test("container should not be created if requested number if met") {
     // request a single container and receive it
     val (handler, _) = createAllocator(1)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org