You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/23 00:41:05 UTC

[spark] branch branch-3.4 updated: [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set in spark-defaults.conf

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 713248b9f9e [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set in spark-defaults.conf
713248b9f9e is described below

commit 713248b9f9e8eecd96db7f4282a208483fc75880
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Thu Jun 22 17:40:47 2023 -0700

    [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set in spark-defaults.conf
    
    ### What changes were proposed in this pull request?
    
    https://issues.apache.org/jira/browse/SPARK-44134
    
    With resource aware scheduling, if you specify a default value in the spark-defaults.conf, a user can't override that to set it to 0.
    
    Meaning spark-defaults.conf has something like:
    spark.executor.resource.{resourceName}.amount=1
    spark.task.resource.{resourceName}.amount =1
    
    If the user tries to override when submitting an application with spark.executor.resource.{resourceName}.amount=0 and spark.task.resource.{resourceName}.amount =0, the applicatoin fails to submit.  it should submit and just not try to allocate those resources. This worked back in Spark 3.0 but was broken when the stage level scheduling feature was added.
    
    Here I fixed it by simply removing any task resources from the list if they are set to 0.
    
    Note I also fixed a typo in the exception message when no executor resources are specified but task resources are.
    
    Note, ideally this is backported to all of the maintenance releases
    
    ### Why are the changes needed?
    
    Fix a bug described above
    
    ### Does this PR introduce _any_ user-facing change?
    
    no api changes
    
    ### How was this patch tested?
    
    Added unit test and then ran manually on standalone and YARN clusters to verify overriding the configs now works.
    
    Closes #41703 from tgravescs/fixResource0.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit cf6e90c205bd76ff9e1fc2d88757d9d44ec93162)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/resource/ResourceProfile.scala      |  2 +-
 .../scala/org/apache/spark/resource/ResourceUtils.scala  |  7 +++++--
 .../test/scala/org/apache/spark/SparkContextSuite.scala  |  2 +-
 .../org/apache/spark/resource/ResourceProfileSuite.scala | 16 ++++++++++++++++
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index afd612433a7..60c541f5b7e 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -215,7 +215,7 @@ class ResourceProfile(
       }
     }
     if (taskResourcesToCheck.nonEmpty) {
-      throw new SparkException("No executor resource configs were not specified for the " +
+      throw new SparkException("No executor resource configs were specified for the " +
         s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
     }
     val limiting =
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 6e294397a3c..d19f413598b 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -190,12 +190,15 @@ private[spark] object ResourceUtils extends Logging {
   def addTaskResourceRequests(
       sparkConf: SparkConf,
       treqs: TaskResourceRequests): Unit = {
-    listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
+    val nonZeroTaskReqs = listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
       val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
       val amountDouble = settings.getOrElse(AMOUNT,
         throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}")
       ).toDouble
-      treqs.resource(resourceId.resourceName, amountDouble)
+      (resourceId.resourceName, amountDouble)
+    }.toMap.filter { case (_, amount) => amount > 0.0 }
+    nonZeroTaskReqs.foreach { case (resourceName, amount) =>
+      treqs.resource(resourceName, amount)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index f9869d35382..e706b224375 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -945,7 +945,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("No executor resource configs were not specified for the following " +
+    assert(error.contains("No executor resource configs were specified for the following " +
       "task configs: gpu"))
   }
 
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index d07b85847e7..9a2e47e64ea 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -146,6 +146,22 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
     assert(immrprof.taskResources.get("gpu").get.amount == 0.33)
   }
 
+  test("test default profile task gpus 0") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_GPU_ID.amountConf, "2")
+      .set(TASK_GPU_ID.amountConf, "0")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.taskResources.get("gpu") == None)
+  }
+
+  test("test default profile executor gpus 0") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_GPU_ID.amountConf, "0")
+      .set(TASK_GPU_ID.amountConf, "1")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.executorResources.get("gpu") == None)
+  }
+
   test("maxTasksPerExecutor cpus") {
     val sparkConf = new SparkConf()
       .set(EXECUTOR_CORES, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org