You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/06 05:27:59 UTC

[spark] branch branch-3.0 updated: [SPARK-30729][CORE] Eagerly filter out zombie TaskSetManager before offering resources

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new baf1a07  [SPARK-30729][CORE] Eagerly filter out zombie TaskSetManager before offering resources
baf1a07 is described below

commit baf1a07704a6404313ebf558652e26acf137c5b4
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Feb 6 12:48:27 2020 +0800

    [SPARK-30729][CORE] Eagerly filter out zombie TaskSetManager before offering resources
    
    ### What changes were proposed in this pull request?
    
    Eagerly filter out zombie `TaskSetManager` before offering resources to reduce any overhead as possible.
    
    And this PR also avoid doing `recomputeLocality` and `addPendingTask` when `TaskSetManager` is zombie.
    
    ### Why are the changes needed?
    
    Zombie `TaskSetManager` could still exist in Pool's `schedulableQueue` when it has running tasks. Offering resources on a zombie `TaskSetManager` could bring unnecessary overhead and is meaningless.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass Jenkins.
    
    Closes #27455 from Ngone51/exclude-zombie-tsm.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit aebabf0bed712511eaa8844cab3a0c562219b2d0)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
 core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f25a36c..6a1d460 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -430,7 +430,7 @@ private[spark] class TaskSchedulerImpl(
     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
     val availableResources = shuffledOffers.map(_.resources).toArray
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
-    val sortedTaskSets = rootPool.getSortedTaskSetQueue
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
     for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
         taskSet.parent.name, taskSet.name, taskSet.runningTasks))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3b620ec..2ce1134 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -229,6 +229,8 @@ private[spark] class TaskSetManager(
       index: Int,
       resolveRacks: Boolean = true,
       speculatable: Boolean = false): Unit = {
+    // A zombie TaskSetManager may reach here while handling failed task.
+    if (isZombie) return
     val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
     for (loc <- tasks(index).preferredLocations) {
       loc match {
@@ -1082,6 +1084,8 @@ private[spark] class TaskSetManager(
   }
 
   def recomputeLocality(): Unit = {
+    // A zombie TaskSetManager may reach here while executorLost happens
+    if (isZombie) return
     val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
     myLocalityLevels = computeValidLocalityLevels()
     localityWaits = myLocalityLevels.map(getLocalityWait)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org