You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/23 00:41:15 UTC

[spark] branch branch-3.3 updated: [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set in spark-defaults.conf

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a7bbaca013a [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set in spark-defaults.conf
a7bbaca013a is described below

commit a7bbaca013ad1ae92a437b12206fadfe93fea10f
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Thu Jun 22 17:40:47 2023 -0700

    [SPARK-44134][CORE] Fix setting resources (GPU/FPGA) to 0 when they are set in spark-defaults.conf
    
    ### What changes were proposed in this pull request?
    
    https://issues.apache.org/jira/browse/SPARK-44134
    
    With resource aware scheduling, if you specify a default value in the spark-defaults.conf, a user can't override that to set it to 0.
    
    Meaning spark-defaults.conf has something like:
    spark.executor.resource.{resourceName}.amount=1
    spark.task.resource.{resourceName}.amount =1
    
    If the user tries to override when submitting an application with spark.executor.resource.{resourceName}.amount=0 and spark.task.resource.{resourceName}.amount =0, the applicatoin fails to submit.  it should submit and just not try to allocate those resources. This worked back in Spark 3.0 but was broken when the stage level scheduling feature was added.
    
    Here I fixed it by simply removing any task resources from the list if they are set to 0.
    
    Note I also fixed a typo in the exception message when no executor resources are specified but task resources are.
    
    Note, ideally this is backported to all of the maintenance releases
    
    ### Why are the changes needed?
    
    Fix a bug described above
    
    ### Does this PR introduce _any_ user-facing change?
    
    no api changes
    
    ### How was this patch tested?
    
    Added unit test and then ran manually on standalone and YARN clusters to verify overriding the configs now works.
    
    Closes #41703 from tgravescs/fixResource0.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit cf6e90c205bd76ff9e1fc2d88757d9d44ec93162)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/resource/ResourceProfile.scala      |  2 +-
 .../scala/org/apache/spark/resource/ResourceUtils.scala  |  7 +++++--
 .../test/scala/org/apache/spark/SparkContextSuite.scala  |  2 +-
 .../org/apache/spark/resource/ResourceProfileSuite.scala | 16 ++++++++++++++++
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 087897ff730..b556fde28e4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -202,7 +202,7 @@ class ResourceProfile(
       }
     }
     if (taskResourcesToCheck.nonEmpty) {
-      throw new SparkException("No executor resource configs were not specified for the " +
+      throw new SparkException("No executor resource configs were specified for the " +
         s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
     }
     val limiting =
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 0762ee9c73e..1d66171ad99 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -190,12 +190,15 @@ private[spark] object ResourceUtils extends Logging {
   def addTaskResourceRequests(
       sparkConf: SparkConf,
       treqs: TaskResourceRequests): Unit = {
-    listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
+    val nonZeroTaskReqs = listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
       val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
       val amountDouble = settings.getOrElse(AMOUNT,
         throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}")
       ).toDouble
-      treqs.resource(resourceId.resourceName, amountDouble)
+      (resourceId.resourceName, amountDouble)
+    }.toMap.filter { case (_, amount) => amount > 0.0 }
+    nonZeroTaskReqs.foreach { case (resourceName, amount) =>
+      treqs.resource(resourceName, amount)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 411a3b155bf..1a05050a5a2 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -945,7 +945,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("No executor resource configs were not specified for the following " +
+    assert(error.contains("No executor resource configs were specified for the following " +
       "task configs: gpu"))
   }
 
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index 27cc44a099d..c108724cd57 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -107,6 +107,22 @@ class ResourceProfileSuite extends SparkFunSuite {
     assert(immrprof.taskResources.get("gpu").get.amount == 0.33)
   }
 
+  test("test default profile task gpus 0") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_GPU_ID.amountConf, "2")
+      .set(TASK_GPU_ID.amountConf, "0")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.taskResources.get("gpu") == None)
+  }
+
+  test("test default profile executor gpus 0") {
+    val sparkConf = new SparkConf()
+      .set(EXECUTOR_GPU_ID.amountConf, "0")
+      .set(TASK_GPU_ID.amountConf, "1")
+    val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    assert(immrprof.executorResources.get("gpu") == None)
+  }
+
   test("maxTasksPerExecutor cpus") {
     val sparkConf = new SparkConf()
       .set(EXECUTOR_CORES, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org