You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wbo4958 (via GitHub)" <gi...@apache.org> on 2024/01/02 11:51:12 UTC

Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

wbo4958 commented on PR #43494:
URL: https://github.com/apache/spark/pull/43494#issuecomment-1873931486

   # Manual test on Spark Standalone Cluster
   
   ## Environment
   
   The Spark Standalone cluster consists of a single worker node equipped with 8 CPU cores but lacks physical GPUs. However, Spark is capable of managing GPU resources by utilizing GPU IDs instead of actual GPUs. To achieve this, you can configure the `spark.worker.resource.gpu.discoveryScript` setting with a script that can retrieve the GPU IDs. For instance,
   
   ```bash
   cat <<EOF
   {"name": "gpu","addresses":["0", "1"]}
   EOF
   ```
   
   So we can change the above script to get 1 GPU/ 2 GPUs or any kind of GPUs.
   
   ## with dynamic allocation off
   
   ### 1 GPU
   
   - configurations
   
   add below configurations in the `SPARK_HOME/conf/spark-defaults.conf`
   
   ``` xml
   spark.worker.resource.gpu.amount 1
   spark.worker.resource.gpu.discoveryScript /tmp/gpu_discovery.sh
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master spark://192.168.0.103:7077 --conf spark.executor.cores=8 --conf spark.task.cpus=1 \
      --conf spark.executor.resource.gpu.amount=1 --conf spark.task.resource.gpu.amount=0.125 \
      --conf spark.dynamicAllocation.enabled=false
   ```
   
   The aforementioned spark-submit configurations will launch a single executor with 8 CPU cores and 1 GPU. The tasks requires 1 CPU core and 0.125 GPUs each, allowing for the concurrent execution of 8 tasks.
   
   - test code
   
   ``` scala
   import org.apache.spark.TaskContext
   import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 12).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     assert(tc.resources()("gpu").addresses sameElements Array("0"))
     iter
   }}
   
   val rdd1 = rdd.repartition(2)
   val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
   val rp = new ResourceProfileBuilder().require(treqs).build
   val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     assert(tc.resources()("gpu").addresses sameElements Array("0"))
     iter
   }
   }
   rdd2.collect()
   ```
   
   The provided Spark job will be split into two stages. The first stage comprises 12 tasks, each requiring 1 CPU core and 0.125 GPUs. As a result, the first 8 tasks can run concurrently, and then run the remaining 4 tasks. 
   
   ![index_0-dyn-off-1-gpu-shuffle-stages](https://github.com/apache/spark/assets/1320706/36a887f3-75a1-440f-a9f6-dfdf046a34b1)
   
   
   In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU core and 0.6 GPUs. Consequently, only one task will run at any given time, while the remaining 2 tasks will execute sequentially.
   
   
   ![index_1-dyn-off-1-gpu-result-stages](https://github.com/apache/spark/assets/1320706/f1fed300-a86a-4925-9a38-997934493d9e)
   
   
   ### 2 GPUs
   
   - configurations
   
   add below configurations in the `SPARK_HOME/conf/spark-defaults.conf`
   
   ``` xml
   spark.worker.resource.gpu.amount 2
   spark.worker.resource.gpu.discoveryScript /tmp/gpu_discovery.sh
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master spark://192.168.0.103:7077 --conf spark.executor.cores=8 --conf spark.task.cpus=1 \
      --conf spark.executor.resource.gpu.amount=2 --conf spark.task.resource.gpu.amount=0.25 \
      --conf spark.dynamicAllocation.enabled=false
   ```
   
   The aforementioned spark-submit configurations will launch a single executor with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each, allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when offering the resources.
   
   - test code
   
   ```scala
   import org.apache.spark.TaskContext
   import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
   
   val rdd = sc.range(0, 100, 1, 8).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     if (tid >= 4) {
       assert(tc.resources()("gpu").addresses sameElements Array("1"))
     } else {
       assert(tc.resources()("gpu").addresses sameElements Array("0"))
     }
     iter
   }}
   
   val rdd1 = rdd.repartition(2)
   val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
   val rp = new ResourceProfileBuilder().require(treqs).build
   val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
     val tc = TaskContext.get()
     val tid = tc.partitionId()
     if (tid > 0) {
       assert(tc.resources()("gpu").addresses sameElements Array("1"))
     } else {
       assert(tc.resources()("gpu").addresses sameElements Array("0"))
     }
     iter
   }
   }
   rdd2.collect()
   ```
   
   The provided Spark job will be split into two stages. The first stage comprises 8 tasks, each requiring 1 CPU core and 0.25 GPUs. As a result, the total 8 tasks can run concurrently. The first 4 tasks will grab GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when offering the resources. The assert line can ensure this policy.
    
   ![index_2-dyn-off-2-gpus-shuffle-stage](https://github.com/apache/spark/assets/1320706/9ed3c43d-57e3-446c-8c8d-ebdc0d7f9bb6)
   
   
   In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU core and 0.6 GPUs, since there're 2 GPUs availabe, so the total 2 tasks can run concurrently, each grabs 1 different GPU, the assert line can ensure that.
   
   ![index_3-dyn-off-2-gpus-result-stage](https://github.com/apache/spark/assets/1320706/fbaac8c2-15be-4f66-9945-af03caebf3e5)
   
   ### concurrent spark jobs
   
   This test case is to ensure the other spark job can still grab the left gpu resources and run alongside the other spark job.
   
   - configurations
   
   add below configurations in the `SPARK_HOME/conf/spark-defaults.conf`
   
   ``` xml
   spark.worker.resource.gpu.amount 2
   spark.worker.resource.gpu.discoveryScript /tmp/gpu_discovery.sh
   ```
   
   - spark-submit configurations
   
   ```bash
   spark-shell --master spark://192.168.0.103:7077 --conf spark.executor.cores=8 --conf spark.task.cpus=1 \
      --conf spark.executor.resource.gpu.amount=2 --conf spark.task.resource.gpu.amount=0.25 \
      --conf spark.dynamicAllocation.enabled=false
   ```
   
   The aforementioned spark-submit configurations will launch a single executor with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each, allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when offering the resources.
   
   - test code
   
   ```scala
       import org.apache.spark.TaskContext
       import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
   
       // Submit Spark Job 0 in thread1.
       val thread1 = new Thread(() => {
         val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           if (tid >= 4) {
             assert(tc.resources()("gpu").addresses sameElements Array("1"))
           } else {
             assert(tc.resources()("gpu").addresses sameElements Array("0"))
           }
           iter
         }
         }
   
         val rdd1 = rdd.repartition(2)
   
         val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
         val rp = new ResourceProfileBuilder().require(treqs).build
   
         val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           assert(tc.resources()("gpu").addresses sameElements Array(tid.toString))
           println("sleeping 20s")
           Thread.sleep(20000)
           iter
         }
         }
         rdd2.collect()
       })
   
       thread1.start()
       // sleep 5s in main thread to make sure the spark result tasks launched in thread1 are running
       Thread.sleep(5000)
   
       // Submit Spark Job 1 in main thread.
       // Each spark result task in thread1 takes 0.6 gpus, so there is only 0.4 gpus (for each gpu) left.
       // since the default task gpu amount = 0.25, the concurrent spark tasks in Spark Job 1
       // will be 1(0.4/0.25) * 2 (2 gpus)
       val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
         Thread.sleep(10000)
         val tc = TaskContext.get()
         val tid = tc.partitionId()
         if (tid % 2 == 1) {
           assert(tc.resources()("gpu").addresses sameElements Array("1"))
         } else {
           assert(tc.resources()("gpu").addresses sameElements Array("0"))
         }
         iter
       })
       rdd.collect()
   
       thread1.join()
   ```
   
   The given Spark application consists of two spark jobs. The first spark job 0 is submitted in thread1, while the second spark job 1 is submitted in the main thread. To guarantee that the spark job 0 runs prior to the spark job 1, a `sleep 5s` is included in the main thread. As a result, the result tasks in spark job 0 will pause for 20 seconds to await the completion of spark job 1. This is done to test whether spark job 1 can utilize the remaining GPU and execute concurrently with spark job 0.
   
   **Event timeline**
   
   ![index_6-dyn-off-concurrent-total-events](https://github.com/apache/spark/assets/1320706/e18fba0a-3aa7-4f5f-bbda-552677271e52)
   
   from the picture, we can see, the Spark Job 1 was running alongside Spark Job 0 and finished before spark job 0.
   
   
   **spark job 0**
   ![index_7-dyn-off-concurrent-job-0-shuffle](https://github.com/apache/spark/assets/1320706/b24db20c-8691-4268-862f-bfe37795750a)
   ![index_8-dyn-off-concurrent-job-0-result](https://github.com/apache/spark/assets/1320706/bb115d22-3b6f-4cb5-bcdf-f3dab5646b7f)
   
   
   **spark job 1**
   
   ![index_9-dyn-off-concurrent-job1-result](https://github.com/apache/spark/assets/1320706/79e8b828-213c-4f40-8b87-40dbcf161254)
   
   
   
   ---
   
   If we change `val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)` to require 1 each GPU for each task, then the spark job 1 will not grab any gpus because the left available GPUs is 0 after spark job is running.
   
   ```scala
       import org.apache.spark.TaskContext
       import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
   
       // Submit Spark Job 0 in thread1.
       val thread1 = new Thread(() => {
         val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           if (tid >= 4) {
             assert(tc.resources()("gpu").addresses sameElements Array("1"))
           } else {
             assert(tc.resources()("gpu").addresses sameElements Array("0"))
           }
           iter
         }
         }
   
         val rdd1 = rdd.repartition(2)
   
         val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
         val rp = new ResourceProfileBuilder().require(treqs).build
   
         val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
           val tc = TaskContext.get()
           val tid = tc.partitionId()
           assert(tc.resources()("gpu").addresses sameElements Array(tid.toString))
           println("sleeping 20s")
           Thread.sleep(20000)
           iter
         }
         }
         rdd2.collect()
       })
   
       thread1.start()
       // sleep 5s in main thread to make sure the spark result tasks launched in thread1 are running
       Thread.sleep(5000)
   
       // Submit Spark Job 1 in main thread.
       // Each spark result task in thread1 takes 1 gpus, so there is no available gpus left for spark job 1.
       // The spark job 1 will run after spark job 0 finished, but we can't ensure which gpu the task will grab. 
       val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
         Thread.sleep(10000)
         val tc = TaskContext.get()
         assert(tc.resources().contains("gpu"))
         iter
       })
       rdd.collect()
   
       thread1.join()
   ```
   ![index_zzz-dyn-off-concurrent-job-sequentially](https://github.com/apache/spark/assets/1320706/0b4dd9eb-91d2-4d78-abb6-15d37e43522a)
   
   From the picture,  we can see, spark job 1 was submitted when spark job 0 was running, but the tasks on spark job 1 didn't run because of a lack of GPU resources. After spark job 0 is finished and releases the GPU, then tasks on spark job 1 can grab the GPUs and run. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org