You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/08/30 20:15:43 UTC

spark git commit: [SPARK-17304] Fix perf. issue caused by TaskSetManager.abortIfCompletelyBlacklisted

Repository: spark
Updated Branches:
  refs/heads/master 4b4e329e4 -> fb2008431


[SPARK-17304] Fix perf. issue caused by TaskSetManager.abortIfCompletelyBlacklisted

This patch addresses a minor scheduler performance issue that was introduced in #13603. If you run

```
sc.parallelize(1 to 100000, 100000).map(identity).count()
```

then most of the time ends up being spent in `TaskSetManager.abortIfCompletelyBlacklisted()`:

![image](https://cloud.githubusercontent.com/assets/50748/18071032/428732b0-6e07-11e6-88b2-c9423cd61f53.png)

When processing resource offers, the scheduler uses a nested loop which considers every task set at multiple locality levels:

```scala
   for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }
```

In order to prevent jobs with globally blacklisted tasks from hanging, #13603 added a `taskSet.abortIfCompletelyBlacklisted` call inside of  `resourceOfferSingleTaskSet`; if a call to `resourceOfferSingleTaskSet` fails to schedule any tasks, then `abortIfCompletelyBlacklisted` checks whether the tasks are completely blacklisted in order to figure out whether they will ever be schedulable. The problem with this placement of the call is that the last call to `resourceOfferSingleTaskSet` in the `while` loop will return `false`, implying that  `resourceOfferSingleTaskSet` will call `abortIfCompletelyBlacklisted`, so almost every call to `resourceOffers` will trigger the `abortIfCompletelyBlacklisted` check for every task set.

Instead, I think that this call should be moved out of the innermost loop and should be called _at most_ once per task set in case none of the task set's tasks can be scheduled at any locality level.

Before this patch's changes, the microbenchmark example that I posted above took 35 seconds to run, but it now only takes 15 seconds after this change.

/cc squito and kayousterhout for review.

Author: Josh Rosen <jo...@databricks.com>

Closes #14871 from JoshRosen/bail-early-if-no-cpus.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb200843
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb200843
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb200843

Branch: refs/heads/master
Commit: fb20084313470593d8507a43fcb2cde2a4c854d9
Parents: 4b4e329
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Aug 30 13:15:21 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Aug 30 13:15:21 2016 -0700

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     | 22 ++++++++++++--------
 1 file changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb200843/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index dc05e76..7d90553 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -278,9 +278,6 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
-    if (!launchedTask) {
-      taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
-    }
     return launchedTask
   }
 
@@ -326,12 +323,19 @@ private[spark] class TaskSchedulerImpl(
     // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
     // of locality levels so that it gets a chance to launch local tasks on all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
-    var launchedTask = false
-    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
-      do {
-        launchedTask = resourceOfferSingleTaskSet(
-            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
-      } while (launchedTask)
+    for (taskSet <- sortedTaskSets) {
+      var launchedAnyTask = false
+      var launchedTaskAtCurrentMaxLocality = false
+      for (currentMaxLocality <- taskSet.myLocalityLevels) {
+        do {
+          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
+            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
+          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
+        } while (launchedTaskAtCurrentMaxLocality)
+      }
+      if (!launchedAnyTask) {
+        taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+      }
     }
 
     if (tasks.size > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org