You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wbo4958 (via GitHub)" <gi...@apache.org> on 2024/03/15 02:37:58 UTC

[PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

wbo4958 opened a new pull request, #45528:
URL: https://github.com/apache/spark/pull/45528

   ### What changes were proposed in this pull request?
   
   This PR addresses the problem of calculating the maximum concurrent tasks while evaluating the number of slots for barrier stages, specifically for the case when the task resource amount is greater than 1.
   
   
   ### Why are the changes needed?
   
   ``` scala
     test("problem of calculating the maximum concurrent task") {
       withTempDir { dir =>
         val discoveryScript = createTempScriptWithExpectedOutput(
           dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
   
         val conf = new SparkConf()
           // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
           .setMaster("local-cluster[1, 6, 1024]")
           .setAppName("test-cluster")
           .set(WORKER_GPU_ID.amountConf, "4")
           .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
           .set(EXECUTOR_GPU_ID.amountConf, "4")
           .set(TASK_GPU_ID.amountConf, "2")
           // disable barrier stage retry to fail the application as soon as possible
           .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
         sc = new SparkContext(conf)
         TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
   
         // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
         // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
         // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
         assert(sc.parallelize(Range(1, 10), 2)
           .barrier()
           .mapPartitions { iter => iter }
           .collect() sameElements Range(1, 10).toArray[Int])
       }
     }
   ```
   
   In the described test scenario, the executor has 6 CPU cores and 4 GPUs, and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum number of concurrent tasks should be 2. However, the issue arises when attempting to launch the subsequent 2 barrier tasks, as the 'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task limit that is 1 instead of 2. The bug needs to be fixed.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   The existing and newly added unit tests should pass
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #45528: [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage
URL: https://github.com/apache/spark/pull/45528


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1526253282


##########
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala:
##########
@@ -137,6 +138,43 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     }
   }
 
+  test("SPARK-45527 compute max number of concurrent tasks with resources limiting") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
+      val conf = new SparkConf()
+        .set(CPUS_PER_TASK, 1)
+        .setMaster("local-cluster[1, 20, 1024]")
+        .setAppName("test")
+        .set(WORKER_GPU_ID.amountConf, "4")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "4")
+        .set(TASK_GPU_ID.amountConf, "0.2")
+      sc = new SparkContext(conf)
+      eventually(timeout(executorUpTimeout)) {
+        // Ensure all executors have been launched.
+        assert(sc.getExecutorIds().length == 1)
+      }
+      // Each executor can only launch one task since `spark.task.cpus` is 2.

Review Comment:
   is the comment wrong.. above sets CPUS_PER_TASK to 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #45528:
URL: https://github.com/apache/spark/pull/45528#issuecomment-2006853561

   > so I just noticed this was linked to already closed issue - [SPARK-45527](https://issues.apache.org/jira/browse/SPARK-45527), personally I think this should be separate issue as a bug referencing that issue, can you please file a new issue and update description,etc.
   
   Done. Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on PR #45528:
URL: https://github.com/apache/spark/pull/45528#issuecomment-2007446340

   merged to master, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "09306677806 (via GitHub)" <gi...@apache.org>.
09306677806 commented on PR #45528:
URL: https://github.com/apache/spark/pull/45528#issuecomment-2004362256

   TAMhesabhay har sHAkhs ro fagh baYAd az hesab khoDEshon bashe va KEsi
   ejazeh bardASht nadareh.va GHoshi va hesAB shakh si va fEZiki hast.
   
   ShahrzadMahro
   
   در تاریخ جمعه ۱۵ مارس ۲۰۲۴،‏ ۰۶:۱۰ Bobby Wang ***@***.***>
   نوشت:
   
   > What changes were proposed in this pull request?
   >
   > This PR addresses the problem of calculating the maximum concurrent tasks
   > while evaluating the number of slots for barrier stages, specifically for
   > the case when the task resource amount is greater than 1.
   > Why are the changes needed?
   >
   >   test("problem of calculating the maximum concurrent task") {
   >     withTempDir { dir =>
   >       val discoveryScript = createTempScriptWithExpectedOutput(
   >         dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
   >
   >       val conf = new SparkConf()
   >         // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
   >         .setMaster("local-cluster[1, 6, 1024]")
   >         .setAppName("test-cluster")
   >         .set(WORKER_GPU_ID.amountConf, "4")
   >         .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
   >         .set(EXECUTOR_GPU_ID.amountConf, "4")
   >         .set(TASK_GPU_ID.amountConf, "2")
   >         // disable barrier stage retry to fail the application as soon as possible
   >         .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
   >       sc = new SparkContext(conf)
   >       TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
   >
   >       // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
   >       // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
   >       // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
   >       assert(sc.parallelize(Range(1, 10), 2)
   >         .barrier()
   >         .mapPartitions { iter => iter }
   >         .collect() sameElements Range(1, 10).toArray[Int])
   >     }
   >   }
   >
   > In the described test scenario, the executor has 6 CPU cores and 4 GPUs,
   > and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum
   > number of concurrent tasks should be 2. However, the issue arises when
   > attempting to launch the subsequent 2 barrier tasks, as the
   > 'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task
   > limit that is 1 instead of 2. The bug needs to be fixed.
   > Does this PR introduce *any* user-facing change?
   >
   > No
   > How was this patch tested?
   >
   > The existing and newly added unit tests should pass
   > Was this patch authored or co-authored using generative AI tooling?
   >
   > No
   > ------------------------------
   > You can view, comment on, or merge this pull request online at:
   >
   >   https://github.com/apache/spark/pull/45528
   > Commit Summary
   >
   >    - c9cebda
   >    <https://github.com/apache/spark/pull/45528/commits/c9cebda2dfd097bfc9723acaaefaa5dea1f23e7f>
   >    fix bug to calculate max concurrent number
   >
   > File Changes
   >
   > (5 files <https://github.com/apache/spark/pull/45528/files>)
   >
   >    - *M*
   >    core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
   >    <https://github.com/apache/spark/pull/45528/files#diff-2bb5245e65acbbec81b84431183cd78ab221d881ca5eae4476b95c8d1b5ec10b>
   >    (15)
   >    - *M*
   >    core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
   >    <https://github.com/apache/spark/pull/45528/files#diff-9877d4af98ed5226c370732243e409ec50467b3d5d5f10395275f740403b1d7d>
   >    (6)
   >    - *M*
   >    core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
   >    <https://github.com/apache/spark/pull/45528/files#diff-47d45f7f9e3eed57893ae5441eda5a1e3874941559b7e12546bccac70b0ae5ba>
   >    (3)
   >    - *M*
   >    core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
   >    <https://github.com/apache/spark/pull/45528/files#diff-e0f4abe2cf43afe189c820d2a0d1e3d5e465a3fe6698473a85d3897fac6b1683>
   >    (40)
   >    - *M*
   >    core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
   >    <https://github.com/apache/spark/pull/45528/files#diff-9c73792f401d6969f0a4c33839f22d37354a4ee4e33020287bb4b3c2973fd4ec>
   >    (51)
   >
   > Patch Links:
   >
   >    - https://github.com/apache/spark/pull/45528.patch
   >    - https://github.com/apache/spark/pull/45528.diff
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/spark/pull/45528>, or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/ANEZZRAYL2ZXP2472SFTDUDYYJNSFAVCNFSM6AAAAABEXHFI3OVHI2DSMVQWIX3LMV43ASLTON2WKOZSGE4DONRQGY3TMNA>
   > .
   > You are receiving this because you are subscribed to this thread.Message
   > ID: ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on PR #45528:
URL: https://github.com/apache/spark/pull/45528#issuecomment-2003994355

   so I just noticed this was linked to already closed issue - SPARK-45527, personally I think this should be separate issue as a bug referencing that issue, can you please file a new issue and update description,etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1545905575


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2687,4 +2687,58 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(3 === taskDescriptions.length)
   }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "in barrier" else ""
+    Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
+      test(s"SPARK-47458 GPU fraction resource should work when " +
+        s"gpu task amount = ${gpuTaskAmount} $barrier") {
+
+        val executorCpus = 10 // cpu will not limit the concurrent tasks number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskNum = 4 / gpuTaskAmount
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(taskNum)
+        } else {
+          FakeTask.createTaskSet(10)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+        taskDescriptions = taskDescriptions.sortBy(t => t.index)
+        assert(taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+
+        // The key is gpuTaskAmount
+        // The values are the gpu addresses of each task.
+        val gpuTaskAmountToExpected = Map(
+          1 -> Seq(Array("0"), Array("1"), Array("2"), Array("3")),
+          2 -> Seq(Array("0", "1"), Array("2", "3")),
+          3 -> Seq(Array("0", "1", "2")),
+          4 -> Seq(Array("0", "1", "2", "3"))
+        )
+
+        taskDescriptions.foreach { task =>
+          val taskResources = task.resources(GPU).keys.toArray.sorted
+          val expected = gpuTaskAmountToExpected(gpuTaskAmount)(task.index)
+          assert(taskResources sameElements expected)
+        }
+      }
+    }
+  }

Review Comment:
   Can we add end-to-end tests ?
   
   i.e. starting a spark job in local-cluster mode, and check GPUs allocated to each spark tasks .
   
   We can test the following typical cases:
   
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=4
   spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=4
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1527017635


##########
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala:
##########
@@ -137,6 +138,43 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     }
   }
 
+  test("SPARK-45527 compute max number of concurrent tasks with resources limiting") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
+      val conf = new SparkConf()
+        .set(CPUS_PER_TASK, 1)
+        .setMaster("local-cluster[1, 20, 1024]")
+        .setAppName("test")
+        .set(WORKER_GPU_ID.amountConf, "4")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "4")
+        .set(TASK_GPU_ID.amountConf, "0.2")
+      sc = new SparkContext(conf)
+      eventually(timeout(executorUpTimeout)) {
+        // Ensure all executors have been launched.
+        assert(sc.getExecutorIds().length == 1)
+      }
+      // Each executor can only launch one task since `spark.task.cpus` is 2.
+      assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 20)
+
+      // GPU resources limits the concurrent number
+      Seq(0.3, 0.4, 0.5, 0.8, 1.0, 2.0, 3.0, 4.0).foreach { taskGpu =>
+        // GPU resources limits the concurrent number
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, taskGpu)
+        val rp: ResourceProfile = new ResourceProfileBuilder().require(treqs).build()
+        sc.resourceProfileManager.addResourceProfile(rp)
+
+        val expected = if (taskGpu >= 1.0) {

Review Comment:
   Good suggestion. Done.



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2687,4 +2687,53 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(3 === taskDescriptions.length)
   }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "in barrier" else ""
+    Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
+      test(s"SPARK-45527 GPU fraction resource should work when " +
+        s"gpu task amount = ${gpuTaskAmount} $barrier") {
+
+        val executorCpus = 10 // cpu will not limit the concurrent tasks number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskNum = 4 / gpuTaskAmount
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(taskNum)
+        } else {
+          FakeTask.createTaskSet(10)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+        taskDescriptions = taskDescriptions.sortBy(t => t.index)
+        assert(taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+
+        taskDescriptions.foreach { task =>
+          val taskResources = task.resources(GPU).keys.toArray.sorted
+          val expected = if (gpuTaskAmount == 2) {
+            (task.index * 2 until task.index * 2 + gpuTaskAmount).map(_.toString).toArray

Review Comment:
   Good suggestion. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1545905575


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2687,4 +2687,58 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(3 === taskDescriptions.length)
   }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "in barrier" else ""
+    Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
+      test(s"SPARK-47458 GPU fraction resource should work when " +
+        s"gpu task amount = ${gpuTaskAmount} $barrier") {
+
+        val executorCpus = 10 // cpu will not limit the concurrent tasks number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskNum = 4 / gpuTaskAmount
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(taskNum)
+        } else {
+          FakeTask.createTaskSet(10)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+        taskDescriptions = taskDescriptions.sortBy(t => t.index)
+        assert(taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+
+        // The key is gpuTaskAmount
+        // The values are the gpu addresses of each task.
+        val gpuTaskAmountToExpected = Map(
+          1 -> Seq(Array("0"), Array("1"), Array("2"), Array("3")),
+          2 -> Seq(Array("0", "1"), Array("2", "3")),
+          3 -> Seq(Array("0", "1", "2")),
+          4 -> Seq(Array("0", "1", "2", "3"))
+        )
+
+        taskDescriptions.foreach { task =>
+          val taskResources = task.resources(GPU).keys.toArray.sorted
+          val expected = gpuTaskAmountToExpected(gpuTaskAmount)(task.index)
+          assert(taskResources sameElements expected)
+        }
+      }
+    }
+  }

Review Comment:
   Can we add end-to-end tests ?
   
   i.e. starting a spark job in local-cluster mode, and check task numbers and GPUs allocated to each spark tasks .
   
   We can test the following typical cases:
   
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=4
   spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=4
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=3
   spark worker cpus=4, spark worker gpus=4, task-cpus = 3, task-gpus=1
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1526271148


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2687,4 +2687,53 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(3 === taskDescriptions.length)
   }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "in barrier" else ""
+    Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
+      test(s"SPARK-45527 GPU fraction resource should work when " +
+        s"gpu task amount = ${gpuTaskAmount} $barrier") {
+
+        val executorCpus = 10 // cpu will not limit the concurrent tasks number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskNum = 4 / gpuTaskAmount
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(taskNum)
+        } else {
+          FakeTask.createTaskSet(10)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+        taskDescriptions = taskDescriptions.sortBy(t => t.index)
+        assert(taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+
+        taskDescriptions.foreach { task =>
+          val taskResources = task.resources(GPU).keys.toArray.sorted
+          val expected = if (gpuTaskAmount == 2) {
+            (task.index * 2 until task.index * 2 + gpuTaskAmount).map(_.toString).toArray

Review Comment:
   its not obvious what this is doing and why, comment or make hardcoded for easier understanding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1545905575


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2687,4 +2687,58 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(3 === taskDescriptions.length)
   }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "in barrier" else ""
+    Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
+      test(s"SPARK-47458 GPU fraction resource should work when " +
+        s"gpu task amount = ${gpuTaskAmount} $barrier") {
+
+        val executorCpus = 10 // cpu will not limit the concurrent tasks number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskNum = 4 / gpuTaskAmount
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(taskNum)
+        } else {
+          FakeTask.createTaskSet(10)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+        taskDescriptions = taskDescriptions.sortBy(t => t.index)
+        assert(taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+
+        // The key is gpuTaskAmount
+        // The values are the gpu addresses of each task.
+        val gpuTaskAmountToExpected = Map(
+          1 -> Seq(Array("0"), Array("1"), Array("2"), Array("3")),
+          2 -> Seq(Array("0", "1"), Array("2", "3")),
+          3 -> Seq(Array("0", "1", "2")),
+          4 -> Seq(Array("0", "1", "2", "3"))
+        )
+
+        taskDescriptions.foreach { task =>
+          val taskResources = task.resources(GPU).keys.toArray.sorted
+          val expected = gpuTaskAmountToExpected(gpuTaskAmount)(task.index)
+          assert(taskResources sameElements expected)
+        }
+      }
+    }
+  }

Review Comment:
   Can we add end-to-end tests ?
   
   i.e. starting a spark job in local-cluster mode, and check GPUs allocated to each spark tasks .
   
   We can test the following typical cases:
   
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=4
   spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=1
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=2
   spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=4
   spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=3
   spark worker cpus=4, spark worker gpus=4, task-cpus = 3, task-gpus=1
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #45528:
URL: https://github.com/apache/spark/pull/45528#issuecomment-1998914538

   Hi @tgravescs, @Ngone51, Could you help review this PR? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1527017620


##########
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala:
##########
@@ -137,6 +138,43 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     }
   }
 
+  test("SPARK-45527 compute max number of concurrent tasks with resources limiting") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
+      val conf = new SparkConf()
+        .set(CPUS_PER_TASK, 1)
+        .setMaster("local-cluster[1, 20, 1024]")
+        .setAppName("test")
+        .set(WORKER_GPU_ID.amountConf, "4")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "4")
+        .set(TASK_GPU_ID.amountConf, "0.2")
+      sc = new SparkContext(conf)
+      eventually(timeout(executorUpTimeout)) {
+        // Ensure all executors have been launched.
+        assert(sc.getExecutorIds().length == 1)
+      }
+      // Each executor can only launch one task since `spark.task.cpus` is 2.

Review Comment:
   Oops, I copied it from somewhere and forgot to delete it. My bad. Fixed in the new commit. Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45527][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on code in PR #45528:
URL: https://github.com/apache/spark/pull/45528#discussion_r1526255925


##########
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala:
##########
@@ -137,6 +138,43 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     }
   }
 
+  test("SPARK-45527 compute max number of concurrent tasks with resources limiting") {
+    withTempDir { dir =>
+      val discoveryScript = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
+      val conf = new SparkConf()
+        .set(CPUS_PER_TASK, 1)
+        .setMaster("local-cluster[1, 20, 1024]")
+        .setAppName("test")
+        .set(WORKER_GPU_ID.amountConf, "4")
+        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
+        .set(EXECUTOR_GPU_ID.amountConf, "4")
+        .set(TASK_GPU_ID.amountConf, "0.2")
+      sc = new SparkContext(conf)
+      eventually(timeout(executorUpTimeout)) {
+        // Ensure all executors have been launched.
+        assert(sc.getExecutorIds().length == 1)
+      }
+      // Each executor can only launch one task since `spark.task.cpus` is 2.
+      assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 20)
+
+      // GPU resources limits the concurrent number
+      Seq(0.3, 0.4, 0.5, 0.8, 1.0, 2.0, 3.0, 4.0).foreach { taskGpu =>
+        // GPU resources limits the concurrent number
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, taskGpu)
+        val rp: ResourceProfile = new ResourceProfileBuilder().require(treqs).build()
+        sc.resourceProfileManager.addResourceProfile(rp)
+
+        val expected = if (taskGpu >= 1.0) {

Review Comment:
   while this is nice in some ways, I would almost rather see a map with gpu task amount -> expected max value to ensure you really get the number you expect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org