You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2023/01/05 14:32:00 UTC

[jira] [Assigned] (SPARK-41848) Tasks are over-scheduled with TaskResourceProfile

     [ https://issues.apache.org/jira/browse/SPARK-41848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-41848:
------------------------------------

    Assignee: Apache Spark

> Tasks are over-scheduled with TaskResourceProfile
> -------------------------------------------------
>
>                 Key: SPARK-41848
>                 URL: https://issues.apache.org/jira/browse/SPARK-41848
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.0
>            Reporter: wuyi
>            Assignee: Apache Spark
>            Priority: Blocker
>
> {code:java}
> test("SPARK-XXX") {
>   val conf = new SparkConf().setAppName("test").setMaster("local-cluster[1,4,1024]")
>   sc = new SparkContext(conf)
>   val req = new TaskResourceRequests().cpus(3)
>   val rp = new ResourceProfileBuilder().require(req).build()
>   val res = sc.parallelize(Seq(0, 1), 2).withResources(rp).map { x =>
>     Thread.sleep(5000)
>     x * 2
>   }.collect()
>   assert(res === Array(0, 2))
> } {code}
> In this test, tasks are supposed to be scheduled in order since each task requires 3 cores but the executor only has 4 cores. However, we noticed 2 tasks are launched concurrently from the logs.
> It turns out that we used the TaskResourceProfile (taskCpus=3) of the taskset for task scheduling:
> {code:java}
> val rpId = taskSet.taskSet.resourceProfileId
> val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
> val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf) {code}
> but the ResourceProfile (taskCpus=1) of the executor for updating the free cores in ExecutorData:
> {code:java}
> val rpId = executorData.resourceProfileId
> val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
> val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
> executorData.freeCores -= taskCpus {code}
> which results in the inconsistency of the available cores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org