You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/04/22 13:25:57 UTC

[spark] branch master updated: [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets even if no task was launched

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b77b31  [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets even if no task was launched
8b77b31 is described below

commit 8b77b3183576da9c85d3939f5759e011c93d4fc5
Author: Nicholas Marcott <48...@users.noreply.github.com>
AuthorDate: Wed Apr 22 08:25:24 2020 -0500

    [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets even if no task was launched
    
    ### What changes were proposed in this pull request?
    Remove the requirement to launch a task in order to reset locality wait timer.
    
    ### Why are the changes needed?
    Recently https://github.com/apache/spark/pull/27207 was merged, but contained a bug which leads to undesirable behavior.
    
    The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
    This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.
    
    Noting down here the downsides of using below reset conditions, in case we want to follow up.
    As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
    The format is:
    
    > **Reset condition**
    >  - the unwanted side effect
    >      - the cause/use case
    
    Below references to locality increase/decrease mean:
    ```
    PROCESS_LOCAL, NODE_LOCAL ... .. ANY
        ------ locality decrease --->
       <----- locality increase -----
    ```
    
    **Task launch:**
    - locality decrease:
       - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
     - locality increase:
       - single task launch decreases locality despite many tasks remaining
    
    **No delay schedule reject since last allFreeResource offer**
    - locality decrease:
       - locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
    - locality increase:
       - single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)
    
    **Current impl - No delay schedule reject since last (allFreeResource offer + task launch)**
    - locality decrease:
      - all from above
    - locality increase:
       - single resource accepted and task launched despite many tasks remaining
    
    The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.
    
    For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.
    
    **If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
    Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer**
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    TaskSchedulerImplSuite
    
    Also manually tested similar to how I tested in https://github.com/apache/spark/pull/27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).
    
    With the new changes, given locality wait of 10s the behavior is generally:
    10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on
    
    If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)
    
    cloud-fan
    tgravescs
    
    Closes #28188 from bmarcott/nmarcott-locality-fix.
    
    Authored-by: Nicholas Marcott <48...@users.noreply.github.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 16 ++---
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 69 +++++++++++++++++++++-
 2 files changed, 75 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2e3e1cc..d327099 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -111,8 +111,9 @@ private[spark] class TaskSchedulerImpl(
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
 
   // keyed by taskset
-  // value is true if the task set's locality wait timer was reset on the last resource offer
-  private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]()
+  // value is true if the task set has not rejected any resources due to locality
+  // since the timer was last reset
+  private val noRejectsSinceLastReset = new mutable.HashMap[TaskSet, Boolean]()
   private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)
 
   // Protected by `this`
@@ -336,7 +337,7 @@ private[spark] class TaskSchedulerImpl(
         taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
       }
     }
-    resetOnPreviousOffer -= manager.taskSet
+    noRejectsSinceLastReset -= manager.taskSet
     manager.parent.removeSchedulable(manager)
     logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
       s" ${manager.parent.name}")
@@ -615,13 +616,14 @@ private[spark] class TaskSchedulerImpl(
         }
 
         if (!legacyLocalityWaitReset) {
-          if (noDelaySchedulingRejects && launchedAnyTask) {
-            if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) {
+          if (noDelaySchedulingRejects) {
+            if (launchedAnyTask &&
+              (isAllFreeResources || noRejectsSinceLastReset.getOrElse(taskSet.taskSet, true))) {
               taskSet.resetDelayScheduleTimer(globalMinLocality)
-              resetOnPreviousOffer.update(taskSet.taskSet, true)
+              noRejectsSinceLastReset.update(taskSet.taskSet, true)
             }
           } else {
-            resetOnPreviousOffer.update(taskSet.taskSet, false)
+            noRejectsSinceLastReset.update(taskSet.taskSet, false)
           }
         }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 056c342..a8541cb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -296,6 +296,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
       .flatten.isEmpty)
   }
 
+  test("SPARK-18886 - task set with no locality requirements should not starve one with them") {
+    val clock = new ManualClock()
+    // All tasks created here are local to exec1, host1.
+    // Locality level starts at PROCESS_LOCAL.
+    val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
+    // Locality levels increase at 3000 ms.
+    val advanceAmount = 2000
+
+    val taskSet2 = FakeTask.createTaskSet(8, 2, 0)
+    taskScheduler.submitTasks(taskSet2)
+
+    // Stage 2 takes resource since it has no locality requirements
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten
+      .headOption
+      .map(_.name)
+      .getOrElse("")
+      .contains("stage 2.0"))
+
+    // Clock advances to 2s. No locality changes yet.
+    clock.advance(advanceAmount)
+
+    // Stage 2 takes resource since it has no locality requirements
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten
+      .headOption
+      .map(_.name)
+      .getOrElse("")
+      .contains("stage 2.0"))
+
+    // Simulates:
+    // 1. stage 2 has taken all resource offers through single resource offers
+    // 2. stage 1 is offered 0 cpus on allResourceOffer.
+    // This should not reset timer.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 0)),
+        isAllFreeResources = true)
+      .flatten.length === 0)
+
+    // This should move stage 1 to NODE_LOCAL.
+    clock.advance(advanceAmount)
+
+    // Stage 1 should now accept NODE_LOCAL resource.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten
+      .headOption
+      .map(_.name)
+      .getOrElse("")
+      .contains("stage 1.1"))
+  }
+
   test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " +
     "time if last full resource offer (isAllResources = true) was accepted as well as any " +
     "following partial resource offers") {
@@ -306,12 +367,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     // Locality levels increase at 3000 ms.
     val advanceAmount = 3000
 
-    // PROCESS_LOCAL full resource offer is accepted.
+    // PROCESS_LOCAL full resource offer is not rejected due to locality.
+    // It has 0 available cores, so no task is launched.
+    // Timer is reset and locality level remains at PROCESS_LOCAL.
     assert(taskScheduler
       .resourceOffers(
-        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        IndexedSeq(WorkerOffer("exec1", "host1", 0)),
         isAllFreeResources = true)
-      .flatten.length === 1)
+      .flatten.length === 0)
 
     // Advancing clock increases locality level to NODE_LOCAL.
     clock.advance(advanceAmount)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org