You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/06 05:27:59 UTC
[spark] branch branch-3.0 updated: [SPARK-30729][CORE] Eagerly
filter out zombie TaskSetManager before offering resources
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new baf1a07 [SPARK-30729][CORE] Eagerly filter out zombie TaskSetManager before offering resources
baf1a07 is described below
commit baf1a07704a6404313ebf558652e26acf137c5b4
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Feb 6 12:48:27 2020 +0800
[SPARK-30729][CORE] Eagerly filter out zombie TaskSetManager before offering resources
### What changes were proposed in this pull request?
Eagerly filter out zombie `TaskSetManager` before offering resources to reduce any overhead as possible.
And this PR also avoid doing `recomputeLocality` and `addPendingTask` when `TaskSetManager` is zombie.
### Why are the changes needed?
Zombie `TaskSetManager` could still exist in Pool's `schedulableQueue` when it has running tasks. Offering resources on a zombie `TaskSetManager` could bring unnecessary overhead and is meaningless.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass Jenkins.
Closes #27455 from Ngone51/exclude-zombie-tsm.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit aebabf0bed712511eaa8844cab3a0c562219b2d0)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++++
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f25a36c..6a1d460 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -430,7 +430,7 @@ private[spark] class TaskSchedulerImpl(
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableResources = shuffledOffers.map(_.resources).toArray
val availableCpus = shuffledOffers.map(o => o.cores).toArray
- val sortedTaskSets = rootPool.getSortedTaskSetQueue
+ val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3b620ec..2ce1134 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -229,6 +229,8 @@ private[spark] class TaskSetManager(
index: Int,
resolveRacks: Boolean = true,
speculatable: Boolean = false): Unit = {
+ // A zombie TaskSetManager may reach here while handling failed task.
+ if (isZombie) return
val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
for (loc <- tasks(index).preferredLocations) {
loc match {
@@ -1082,6 +1084,8 @@ private[spark] class TaskSetManager(
}
def recomputeLocality(): Unit = {
+ // A zombie TaskSetManager may reach here while executorLost happens
+ if (isZombie) return
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org