You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/07/16 17:56:56 UTC

[spark] branch master updated: [SPARK-27959][YARN] Change YARN resource configs to use .amount

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43d68cd  [SPARK-27959][YARN] Change YARN resource configs to use .amount
43d68cd is described below

commit 43d68cd4ff84530c3d597f07352984225ab1db7c
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Tue Jul 16 10:56:07 2019 -0700

    [SPARK-27959][YARN] Change YARN resource configs to use .amount
    
    ## What changes were proposed in this pull request?
    
    we are adding in generic resource support into spark where we have suffix for the amount of the resource so that we could support other configs.
    
    Spark on yarn already had added configs to request resources via the configs spark.yarn.{executor/driver/am}.resource=<some amount>, where the <some amount> is value and unit together.  We should change those configs to have a `.amount` suffix on them to match the spark configs and to allow future configs to be more easily added. YARN itself already supports tags and attributes so if we want the user to be able to pass those from spark at some point having a suffix makes sense. it wou [...]
    
    ## How was this patch tested?
    
    Tested via unit tests and manually on a yarn 3.x cluster with GPU resources configured on.
    
    Closes #24989 from tgravescs/SPARK-27959-yarn-resourceconfigs.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 docs/running-on-yarn.md                            | 14 ++--
 .../org/apache/spark/deploy/yarn/Client.scala      | 14 ++--
 .../spark/deploy/yarn/ResourceRequestHelper.scala  | 46 ++++++++++++-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  5 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    | 18 -----
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 19 +++---
 .../deploy/yarn/ResourceRequestHelperSuite.scala   | 77 +++++++++++++++++-----
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 13 ++--
 8 files changed, 137 insertions(+), 69 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index dc93e9c..9d9b253 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -142,20 +142,20 @@ To use a custom metrics.properties for the application master and executors, upd
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.am.resource.{resource-type}</code></td>
+  <td><code>spark.yarn.am.resource.{resource-type}.amount</code></td>
   <td><code>(none)</code></td>
   <td>
     Amount of resource to use for the YARN Application Master in client mode.
-    In cluster mode, use <code>spark.yarn.driver.resource.&lt;resource-type&gt;</code> instead.
+    In cluster mode, use <code>spark.yarn.driver.resource.&lt;resource-type&gt;.amount</code> instead.
     Please note that this feature can be used only with YARN 3.0+
     For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
     <p/>
     Example: 
-    To request GPU resources from YARN, use: <code>spark.yarn.am.resource.yarn.io/gpu</code>
+    To request GPU resources from YARN, use: <code>spark.yarn.am.resource.yarn.io/gpu.amount</code>
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.driver.resource.{resource-type}</code></td>
+  <td><code>spark.yarn.driver.resource.{resource-type}.amount</code></td>
   <td><code>(none)</code></td>
   <td>
     Amount of resource to use for the YARN Application Master in cluster mode.
@@ -163,11 +163,11 @@ To use a custom metrics.properties for the application master and executors, upd
     For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
     <p/>
     Example: 
-    To request GPU resources from YARN, use: <code>spark.yarn.driver.resource.yarn.io/gpu</code>
+    To request GPU resources from YARN, use: <code>spark.yarn.driver.resource.yarn.io/gpu.amount</code>
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.executor.resource.{resource-type}</code></td>
+  <td><code>spark.yarn.executor.resource.{resource-type}.amount</code></td>
   <td><code>(none)</code></td>
  <td>
      Amount of resource to use per executor process.
@@ -175,7 +175,7 @@ To use a custom metrics.properties for the application master and executors, upd
      For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
      <p/>
      Example: 
-     To request GPU resources from YARN, use: <code>spark.yarn.executor.resource.yarn.io/gpu</code>
+     To request GPU resources from YARN, use: <code>spark.yarn.executor.resource.yarn.io/gpu.amount</code>
  </td>
 </tr>
 <tr>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5b361d1..651e706 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -51,7 +51,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -241,12 +241,12 @@ private[spark] class Client(
       newApp: YarnClientApplication,
       containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
 
-    val yarnAMResources =
-      if (isClusterMode) {
-        sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap
-      } else {
-        sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
-      }
+    val componentName = if (isClusterMode) {
+      config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
+    } else {
+      config.YARN_AM_RESOURCE_TYPES_PREFIX
+    }
+    val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
     val amResources = yarnAMResources ++
       getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
     logDebug(s"AM resources: $amResources")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index cb0c68d..522c16b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -26,11 +26,11 @@ import scala.util.Try
 import org.apache.hadoop.yarn.api.records.Resource
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU}
 import org.apache.spark.util.{CausedBy, Utils}
 
 /**
@@ -40,6 +40,45 @@ import org.apache.spark.util.{CausedBy, Utils}
 private object ResourceRequestHelper extends Logging {
   private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
   private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation"
+  val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
+  val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
+
+  private[yarn] def getYarnResourcesAndAmounts(
+      sparkConf: SparkConf,
+      componentName: String): Map[String, String] = {
+    sparkConf.getAllWithPrefix(s"$componentName").map { case (key, value) =>
+      val splitIndex = key.lastIndexOf('.')
+      if (splitIndex == -1) {
+        val errorMessage = s"Missing suffix for ${componentName}${key}, you must specify" +
+          s" a suffix - $AMOUNT is currently the only supported suffix."
+        throw new IllegalArgumentException(errorMessage.toString())
+      }
+      val resourceName = key.substring(0, splitIndex)
+      val resourceSuffix = key.substring(splitIndex + 1)
+      if (!AMOUNT.equals(resourceSuffix)) {
+        val errorMessage = s"Unsupported suffix: $resourceSuffix in: ${componentName}${key}, " +
+          s"only .$AMOUNT is supported."
+        throw new IllegalArgumentException(errorMessage.toString())
+      }
+      (resourceName, value)
+    }.toMap
+  }
+
+  /**
+   * Convert Spark resources into YARN resources.
+   * The only resources we know how to map from spark configs to yarn configs are
+   * gpus and fpgas, everything else the user has to specify them in both the
+   * spark.yarn.*.resource and the spark.*.resource configs.
+   */
+  private[yarn] def getYarnResourcesFromSparkResources(
+      confPrefix: String,
+      sparkConf: SparkConf
+  ): Map[String, String] = {
+    Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
+      case (rName, yarnName) =>
+        (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0"))
+    }.filter { case (_, count) => count.toLong > 0 }
+  }
 
   /**
    * Validates sparkConf and throws a SparkException if any of standard resources (memory or cores)
@@ -81,8 +120,9 @@ private object ResourceRequestHelper extends Logging {
     val errorMessage = new mutable.StringBuilder()
 
     resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
-      if (sparkConf.contains(resourceRequest)) {
-        errorMessage.append(s"Error: Do not use $resourceRequest, " +
+      val resourceRequestAmount = s"${resourceRequest}.${AMOUNT}"
+      if (sparkConf.contains(resourceRequestAmount)) {
+        errorMessage.append(s"Error: Do not use $resourceRequestAmount, " +
             s"please use $sparkName instead!\n")
       }
     }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6e634b9..8ec7bd6 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
@@ -142,8 +143,8 @@ private[yarn] class YarnAllocator(
   protected val executorCores = sparkConf.get(EXECUTOR_CORES)
 
   private val executorResourceRequests =
-    sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++
-      getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
+    getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
+    getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
 
   // Resource capability requested for each executor
   private[yarn] val resource: Resource = {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 6b87eec..1103552 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -41,24 +41,6 @@ object YarnSparkHadoopUtil {
   val MEMORY_OVERHEAD_MIN = 384L
 
   val ANY_HOST = "*"
-  val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
-  val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
-
-  /**
-   * Convert Spark resources into YARN resources.
-   * The only resources we know how to map from spark configs to yarn configs are
-   * gpus and fpgas, everything else the user has to specify them in both the
-   * spark.yarn.*.resource and the spark.*.resource configs.
-   */
-  private[yarn] def getYarnResourcesFromSparkResources(
-      confPrefix: String,
-      sparkConf: SparkConf
-      ): Map[String, String] = {
-    Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
-      case (rName, yarnName) =>
-        (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0"))
-    }.filter { case (_, count) => count.toLong > 0 }
-  }
 
   // All RM requests are issued with same priority : we do not (yet) have any distinction between
   // request types (like map/reduce in hadoop for example)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index d5f1992..847fc37 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -38,10 +38,11 @@ import org.mockito.Mockito.{spy, verify}
 import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils.AMOUNT
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
 class ClientSuite extends SparkFunSuite with Matchers {
@@ -372,7 +373,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
       val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
       resources.foreach { case (name, v) =>
-        conf.set(prefix + name, v.toString)
+        conf.set(s"${prefix}${name}.${AMOUNT}", v.toString)
       }
 
       val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
@@ -397,7 +398,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     resources.keys.foreach { yarnName =>
-      conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
+      conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
     }
     resources.values.foreach { rName =>
       conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
@@ -407,9 +408,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
       ResourceRequestHelper.validateResources(conf)
     }.getMessage()
 
-    assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," +
+    assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga.amount," +
       " please use spark.driver.resource.fpga.amount"))
-    assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," +
+    assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu.amount," +
       " please use spark.driver.resource.gpu.amount"))
   }
 
@@ -420,7 +421,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     resources.keys.foreach { yarnName =>
-      conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
+      conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
     }
     resources.values.foreach { rName =>
       conf.set(ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3")
@@ -430,9 +431,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
       ResourceRequestHelper.validateResources(conf)
     }.getMessage()
 
-    assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga," +
+    assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga.amount," +
       " please use spark.executor.resource.fpga.amount"))
-    assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu," +
+    assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu.amount," +
       " please use spark.executor.resource.gpu.amount"))
   }
 
@@ -450,7 +451,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
     }
     // also just set yarn one that we don't convert
-    conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")
+    conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}", "5")
     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
     val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
     val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 9e3cc6e..f5ec531 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -22,9 +22,11 @@ import org.apache.hadoop.yarn.util.Records
 import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY}
+import org.apache.spark.resource.ResourceUtils.AMOUNT
 
 class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
 
@@ -32,16 +34,18 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
   private val CUSTOM_RES_2 = "custom-resource-type-2"
   private val MEMORY = "memory"
   private val CORES = "cores"
-  private val NEW_CONFIG_EXECUTOR_MEMORY = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
-  private val NEW_CONFIG_EXECUTOR_CORES = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
-  private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
-  private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
-  private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX + MEMORY
-  private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX + CORES
+  private val NEW_CONFIG_EXECUTOR_MEMORY =
+    s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}"
+  private val NEW_CONFIG_EXECUTOR_CORES =
+    s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}"
+  private val NEW_CONFIG_AM_MEMORY = s"${YARN_AM_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}"
+  private val NEW_CONFIG_AM_CORES = s"${YARN_AM_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}"
+  private val NEW_CONFIG_DRIVER_MEMORY = s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}"
+  private val NEW_CONFIG_DRIVER_CORES = s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}"
 
   test("empty SparkConf should be valid") {
     val sparkConf = new SparkConf()
-    ResourceRequestHelper.validateResources(sparkConf)
+    validateResources(sparkConf)
   }
 
   test("just normal resources are defined") {
@@ -50,7 +54,44 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
     sparkConf.set(DRIVER_CORES.key, "4")
     sparkConf.set(EXECUTOR_MEMORY.key, "4G")
     sparkConf.set(EXECUTOR_CORES.key, "2")
-    ResourceRequestHelper.validateResources(sparkConf)
+    validateResources(sparkConf)
+  }
+
+  test("get yarn resources from configs") {
+    val sparkConf = new SparkConf()
+    val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "2G",
+      YARN_FPGA_RESOURCE_CONFIG -> "3G", "custom" -> "4")
+    resources.foreach { case (name, value) =>
+      sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
+      sparkConf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
+      sparkConf.set(s"${YARN_AM_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
+    }
+    var parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+    assert(parsedResources === resources)
+    parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_DRIVER_RESOURCE_TYPES_PREFIX)
+    assert(parsedResources === resources)
+    parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_AM_RESOURCE_TYPES_PREFIX)
+    assert(parsedResources === resources)
+  }
+
+  test("get invalid yarn resources from configs") {
+    val sparkConf = new SparkConf()
+
+    val missingAmountConfig = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}missingAmount"
+    // missing .amount
+    sparkConf.set(missingAmountConfig, "2g")
+    var thrown = intercept[IllegalArgumentException] {
+      getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+    }
+    thrown.getMessage should include("Missing suffix for")
+
+    sparkConf.remove(missingAmountConfig)
+    sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}customResource.invalid", "2g")
+
+    thrown = intercept[IllegalArgumentException] {
+      getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+    }
+    thrown.getMessage should include("Unsupported suffix")
   }
 
   Seq(
@@ -60,14 +101,14 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
       ResourceInformation(CUSTOM_RES_2, 10, "G"))
   ).foreach { case (name, resources) =>
     test(s"valid request: $name") {
-      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      assume(isYarnResourceTypesAvailable())
       val resourceDefs = resources.map { r => r.name }
       val requests = resources.map { r => (r.name, r.value.toString + r.unit) }.toMap
 
       ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
 
       val resource = createResource()
-      ResourceRequestHelper.setResourceRequests(requests, resource)
+      setResourceRequests(requests, resource)
 
       resources.foreach { r =>
         val requested = ResourceRequestTestHelper.getResourceInformationByName(resource, r.name)
@@ -82,12 +123,12 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
     ("invalid unit", CUSTOM_RES_1, "123ppp")
   ).foreach { case (name, key, value) =>
     test(s"invalid request: $name") {
-      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      assume(isYarnResourceTypesAvailable())
       ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
 
       val resource = createResource()
       val thrown = intercept[IllegalArgumentException] {
-        ResourceRequestHelper.setResourceRequests(Map(key -> value), resource)
+        setResourceRequests(Map(key -> value), resource)
       }
       thrown.getMessage should include (key)
     }
@@ -95,20 +136,20 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
 
   Seq(
     NEW_CONFIG_EXECUTOR_MEMORY -> "30G",
-    YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb" -> "30G",
-    YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb" -> "30G",
+    s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}memory-mb.$AMOUNT" -> "30G",
+    s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}mb.$AMOUNT" -> "30G",
     NEW_CONFIG_EXECUTOR_CORES -> "5",
-    YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores" -> "5",
+    s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}vcores.$AMOUNT" -> "5",
     NEW_CONFIG_AM_MEMORY -> "1G",
     NEW_CONFIG_DRIVER_MEMORY -> "1G",
     NEW_CONFIG_AM_CORES -> "3",
     NEW_CONFIG_DRIVER_CORES -> "1G"
   ).foreach { case (key, value) =>
     test(s"disallowed resource request: $key") {
-      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      assume(isYarnResourceTypesAvailable())
       val conf = new SparkConf(false).set(key, value)
       val thrown = intercept[SparkException] {
-        ResourceRequestHelper.validateResources(conf)
+        validateResources(conf)
       }
       thrown.getMessage should include (key)
     }
@@ -126,7 +167,7 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
     sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "2G")
 
     val thrown = intercept[SparkException] {
-      ResourceRequestHelper.validateResources(sparkConf)
+      validateResources(sparkConf)
     }
     thrown.getMessage should (
       include(NEW_CONFIG_EXECUTOR_MEMORY) and
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index ca89af2..4ac27ed 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -31,9 +31,11 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU}
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.SplitInfo
@@ -160,12 +162,12 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
   }
 
   test("custom resource requested from yarn") {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+    assume(isYarnResourceTypesAvailable())
     ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
 
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
     val handler = createAllocator(1, mockAmClient,
-      Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
+      Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
 
     handler.updateResourceRequests()
     val container = createContainer("host1", resource = handler.resource)
@@ -174,7 +176,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     // get amount of memory and vcores from resource, so effectively skipping their validation
     val expectedResources = Resource.newInstance(handler.resource.getMemory(),
       handler.resource.getVirtualCores)
-    ResourceRequestHelper.setResourceRequests(Map("gpu" -> "2G"), expectedResources)
+    setResourceRequests(Map("gpu" -> "2G"), expectedResources)
     val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
 
     verify(mockAmClient).addContainerRequest(captor.capture())
@@ -183,15 +185,16 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
   }
 
   test("custom spark resource mapped to yarn resource configs") {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+    assume(isYarnResourceTypesAvailable())
     val yarnMadeupResource = "yarn.io/madeup"
     val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource)
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+    val madeupConfigName = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}"
     val sparkResources =
       Map(EXECUTOR_GPU_ID.amountConf -> "3",
         EXECUTOR_FPGA_ID.amountConf -> "2",
-        s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5")
+        madeupConfigName -> "5")
     val handler = createAllocator(1, mockAmClient, sparkResources)
 
     handler.updateResourceRequests()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org