You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "liangyongyuan (Jira)" <ji...@apache.org> on 2023/10/13 08:17:00 UTC

[jira] [Created] (SPARK-45537) The last Task may get stuck in multi resource-profile

liangyongyuan created SPARK-45537:
-------------------------------------

             Summary: The last Task may get stuck in multi resource-profile
                 Key: SPARK-45537
                 URL: https://issues.apache.org/jira/browse/SPARK-45537
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 4.0.0
            Reporter: liangyongyuan


Description:
In scenarios involving multiple resource profiles (e.g., prof1 and prof2), when a taskset (prof1) has only one remaining task (task0) awaiting scheduling, and there are executors (executor0 with prof0 and executor1 with prof1), if executor1 fails to run task0, executor1 gets blacklisted. Consequently, task0 becomes unschedulable, leading to a blockage in the task scheduling process.

Example Code:


{code:java}
val rprof = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.memory("4g")
ereqs.memoryOverhead("2g").offHeapMemory("1g")
val resourceProfile = rprof.require(ereqs).build()
val rdd = sc.parallelize(1 to 10, 1).withResources(resourceProfile)
rdd.map(num => {
  if (TaskContext.get().attemptNumber() == 0) {
    throw new RuntimeException("First attempt encounters an error")
  } else {
    num / 2
  }
}).collect()
{code}

Issue:
The issue arises when the taskSet becomes unschedulable. The logic attempts to find a task that cannot be scheduled on any executor across all profiles. However, when determining whether an executor can schedule the task, there is no distinction made based on the resource profile. This leads to an incorrect assumption that executor0 (prof0) can schedule the task, which is not the case.

Relevant Code:

{code:java}
if (!launchedAnyTask) {
  taskSet.getCompletelyExcludedTaskIfAny(hostToExecutors) ........
}

def getCompletelyExcludedTaskIfAny(
      hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
........
  pendingTask.find { indexInTaskSet =>
    // try to find some executor this task can run on.
    // It's possible that some *other* task isn't schedulable anywhere,
    // but we will discover that in some later call, when that unschedulable task is the last task remaining.
    hostToExecutors.forall { case (host, execsOnHost) =>
      // Check if the task can run on the node
      val nodeExcluded =
        appHealthTracker.isNodeExcluded(host) ||
          taskSetExcludelist.isNodeExcludedForTaskSet(host) ||
          taskSetExcludelist.isNodeExcludedForTask(host, indexInTaskSet)
      if (nodeExcluded) {
        true
      } else {
        // Check if the task can run on any of the executors
        execsOnHost.forall { exec =>
          appHealthTracker.isExecutorExcluded(exec) ||
            taskSetExcludelist.isExecutorExcludedForTaskSet(exec) ||
            taskSetExcludelist.isExecutorExcludedForTask(exec, indexInTaskSet)
        }
      }
    }
  }
}
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org